From f1edea4ac42f03e1dfb75292be97030ae4fa8549 Mon Sep 17 00:00:00 2001 From: audiracmichelle Date: Sat, 13 Sep 2025 16:10:45 -0400 Subject: [PATCH 01/29] modify annual -> yearly washu -> randall --- Dockerfile | 6 +++--- README.md | 10 +++++----- Snakefile | 10 +++++----- conf/datapaths/cannon_datapaths.yaml | 16 ++++++++-------- conf/datapaths/datapaths.yaml | 4 ++-- notes/eda_input.ipynb | 4 ++-- src/aggregate_pm25.py | 10 +++++----- src/download_pm25.py | 2 +- 8 files changed, 31 insertions(+), 31 deletions(-) diff --git a/Dockerfile b/Dockerfile index c871f80..d69b3fd 100644 --- a/Dockerfile +++ b/Dockerfile @@ -13,8 +13,8 @@ RUN mamba env update -n base -f requirements.yaml #&& mamba clean -a # Create paths to data placeholders -RUN python utils/create_dir_paths.py datapaths.input.satellite_pm25.annual=null datapaths.input.satellite_pm25.monthly=null +RUN python utils/create_dir_paths.py datapaths.input.satellite_pm25.yearly=null datapaths.input.satellite_pm25.monthly=null -# snakemake --configfile conf/config.yaml --cores 4 -C temporal_freq=annual +# snakemake --configfile conf/config.yaml --cores 4 -C temporal_freq=yearly ENTRYPOINT ["snakemake", "--configfile", "conf/config.yaml"] -CMD ["--cores", "4", "-C", "polygon_name=county", "temporal_freq=annual"] +CMD ["--cores", "4", "-C", "polygon_name=county", "temporal_freq=yearly"] diff --git a/README.md b/README.md index cb3a85f..83af885 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# pm25_washu_raster2polygon +# pm25_randall_raster2polygon Code to produce spatial aggregations of pm25 estimates as generated by the [Atmospheric Composition Analysis Group](https://sites.wustl.edu/acag/datasets/surface-pm2-5/). The spatial aggregation are performed for satellite pm25 from grid/raster (NetCDF) to polygons (shp). @@ -10,7 +10,7 @@ The [Atmospheric Composition Analysis Group](https://sites.wustl.edu/acag/datase The version [V5.GL.04](https://sites.wustl.edu/acag/datasets/surface-pm2-5/#V5.GL.04) consists of mean PM2.5 (ug/m3) available at: -* Temporal frequency: Annual and monthly +* Temporal frequency: yearly and monthly * Grid resolutions: (0.1° × 0.1°) and (0.01° × 0.01°) * Geographic regions: North America, Europe, Asia, and Global @@ -47,7 +47,7 @@ The configuration structure withing the `/conf` folder allow you to modify the i * aggregate pm25: `src/aggregate_pm25.py` The key parameters are: -* `temporal_freq` which determines whether the original annual or monthly pm25 files will be aggregated. The options are: `annual` and `monthly`. +* `temporal_freq` which determines whether the original yearly or monthly pm25 files will be aggregated. The options are: `yearly` and `monthly`. * `polygon_name` which determines into which polygons the pm25 grid will the aggregated. The options are: `zcta` and `county`. --- @@ -98,7 +98,7 @@ python src/aggregate_pm25.py or run the pipeline: ```bash -snakemake --cores 4 -C polygon_name=county temporal_freq=annual +snakemake --cores 4 -C polygon_name=county temporal_freq=yearly ``` Modify `cores`, `polygon_name` and `temporal_freq` as you find convenient. @@ -115,7 +115,7 @@ mkdir /satellite_pm25_raster2polygon ```bash docker pull nsaph/satellite_pm25_raster2polygon -docker run -v :/app/data/input/satellite_pm25/annual /satellite_pm25_raster2polygon/:/app/data/output/satellite_pm25_raster2polygon nsaph/satellite_pm25_raster2polygon +docker run -v :/app/data/input/satellite_pm25/yearly /satellite_pm25_raster2polygon/:/app/data/output/satellite_pm25_raster2polygon nsaph/satellite_pm25_raster2polygon ``` If you are interested in storing the input raw and intermediate data run diff --git a/Snakefile b/Snakefile index df4aa44..5324af8 100644 --- a/Snakefile +++ b/Snakefile @@ -25,13 +25,13 @@ shapefiles_cfg = hydra_cfg.shapefiles shapefile_years_list = list(shapefiles_cfg[polygon_name].keys()) months_list = "01" if temporal_freq == 'yearly' else [str(i).zfill(2) for i in range(1, 12 + 1)] -years_list = list(range(1998, 2022 + 1)) +years_list = list(range(1998, 2023 + 1)) # == Define rules == rule all: input: expand( - f"data/output/pm25__washu/{polygon_name}_{temporal_freq}/pm25__washu__{polygon_name}_{temporal_freq}__" + + f"data/output/pm25__randall/{polygon_name}_{temporal_freq}/pm25__randall__{polygon_name}_{temporal_freq}__" + ("{year}.parquet" if temporal_freq == 'yearly' else "{year}_{month}.parquet"), year=years_list, month=months_list @@ -47,7 +47,7 @@ rule download_shapefiles: rule download_satellite_pm25: output: expand( - f"data/input/pm25__washu__raw/{temporal_freq}/{satellite_pm25_cfg[temporal_freq]['file_prefix']}." + + f"data/input/pm25__randall__raw/{temporal_freq}/{satellite_pm25_cfg[temporal_freq]['file_prefix']}." + ("{year}01-{year}12.nc" if temporal_freq == 'yearly' else "{year}{month}-{year}{month}.nc"), year=years_list, month=months_list) @@ -64,14 +64,14 @@ rule aggregate_pm25: input: get_shapefile_input, expand( - f"data/input/pm25__washu__raw/{temporal_freq}/{satellite_pm25_cfg[temporal_freq]['file_prefix']}." + + f"data/input/pm25__randall__raw/{temporal_freq}/{satellite_pm25_cfg[temporal_freq]['file_prefix']}." + ("{{year}}01-{{year}}12.nc" if temporal_freq == 'yearly' else "{{year}}{month}-{{year}}{month}.nc"), month=months_list ) output: expand( - f"data/output/pm25__washu/{polygon_name}_{temporal_freq}/pm25__washu__{polygon_name}_{temporal_freq}__" + + f"data/output/pm25__randall/{polygon_name}_{temporal_freq}/pm25__randall__{polygon_name}_{temporal_freq}__" + ("{{year}}.parquet" if temporal_freq == 'yearly' else "{{year}}_{month}.parquet"), month=months_list # we only want to expand months_list and keep year as wildcard ) diff --git a/conf/datapaths/cannon_datapaths.yaml b/conf/datapaths/cannon_datapaths.yaml index c41a3fe..454d1a1 100644 --- a/conf/datapaths/cannon_datapaths.yaml +++ b/conf/datapaths/cannon_datapaths.yaml @@ -1,13 +1,13 @@ # if files are stored within the local copy of the repository, then use null: input: - pm25__washu__raw: - yearly: /n/netscratch/dominici_lab/Lab/pm25__washu__raw/yearly/ #/n/dominici_lab/lab/lego/environmnetal/pm25__washu/raw/annual - monthly: /n/netscratch/dominici_lab/Lab/pm25__washu__raw/monthly/ #/n/dominici_lab/lab/lego/environmnetal/pm25__washu/raw/monthly + pm25__randall__raw: + yearly: /n/dominici_lab/lab/lego/environmnetal/pm25__randall/raw/yearly #/n/netscratch/dominici_lab/Lab/pm25__randall__raw/yearly + monthly: /n/dominici_lab/lab/lego/environmnetal/pm25__randall/raw/monthly #/n/netscratch/dominici_lab/Lab/pm25__randall__raw/monthly shapefiles: null output: - pm25__washu: - zcta_yearly: /n/dominici_lab/lab/lego/environmental/pm25__washu/zcta_yearly - zcta_monthly: /n/dominici_lab/lab/lego/environmental/pm25__washu/zcta_monthly - county_yearly: /n/dominici_lab/lab/lego/environmental/pm25__washu/county_yearly - county_monthly: /n/dominici_lab/lab/lego/environmental/pm25__washu/county_monthly + pm25__randall: + zcta_yearly: /n/dominici_lab/lab/lego/environmental/pm25__randall/zcta_yearly + zcta_monthly: /n/dominici_lab/lab/lego/environmental/pm25__randall/zcta_monthly + county_yearly: /n/dominici_lab/lab/lego/environmental/pm25__randall/county_yearly + county_monthly: /n/dominici_lab/lab/lego/environmental/pm25__randall/county_monthly diff --git a/conf/datapaths/datapaths.yaml b/conf/datapaths/datapaths.yaml index e9d1be7..3963f44 100644 --- a/conf/datapaths/datapaths.yaml +++ b/conf/datapaths/datapaths.yaml @@ -1,12 +1,12 @@ # if files are stored within the local copy of the repository, then use null: input: - pm25__washu__raw: + pm25__randall__raw: yearly: null monthly: null shapefiles: null output: - pm25__washu: + pm25__randall: zcta_yearly: null zcta_monthly: null county_yearly: null diff --git a/notes/eda_input.ipynb b/notes/eda_input.ipynb index 9bb877c..e83212f 100644 --- a/notes/eda_input.ipynb +++ b/notes/eda_input.ipynb @@ -19,7 +19,7 @@ }, { "cell_type": "code", - "execution_count": 4, + "execution_count": null, "metadata": {}, "outputs": [ { @@ -38,7 +38,7 @@ ], "source": [ "# Open the netCDF file\n", - "file_path = \"../data/input/satellite_pm25/annual/V5GL04.HybridPM25c_0p10.NorthAmerica.202201-202212.nc\"\n", + "file_path = \"../data/input/satellite_pm25/yearly/V5GL04.HybridPM25c_0p10.NorthAmerica.202201-202212.nc\"\n", "dataset = netCDF4.Dataset(file_path)\n", "\n", "# Print the global attributes\n", diff --git a/src/aggregate_pm25.py b/src/aggregate_pm25.py index 21a7dd1..d1ee472 100644 --- a/src/aggregate_pm25.py +++ b/src/aggregate_pm25.py @@ -62,7 +62,7 @@ def main(cfg): # load the first file to obtain the affine transform/boundaries LOGGER.info("Mapping polygons to raster cells.") - ds = xarray.open_dataset(f"data/input/pm25__washu__raw/{cfg.temporal_freq}/{filenames[0]}") + ds = xarray.open_dataset(f"data/input/pm25__randall__raw/{cfg.temporal_freq}/{filenames[0]}") layer = getattr(ds, cfg.satellite_pm25.layer) # obtain affine transform/boundaries @@ -90,7 +90,7 @@ def main(cfg): if i > 0: # reload the file only if it is different from the first one - ds = xarray.open_dataset(f"data/input/pm25__washu__raw/{cfg.temporal_freq}/{filename}") + ds = xarray.open_dataset(f"data/input/pm25__randall__raw/{cfg.temporal_freq}/{filename}") layer = getattr(ds, cfg.satellite_pm25.layer) # === obtain stats quickly using precomputed mapping @@ -111,15 +111,15 @@ def main(cfg): # == save output file if cfg.temporal_freq == "yearly": # ignore month since len(filenames) == 1 - output_filename = f"pm25__washu__{cfg.polygon_name}_{cfg.temporal_freq}__{cfg.year}.parquet" + output_filename = f"pm25__randall__{cfg.polygon_name}_{cfg.temporal_freq}__{cfg.year}.parquet" elif cfg.temporal_freq == "monthly": # use month in filename since len(filenames) = 12 month = f"{i + 1:02d}" df["month"] = month - output_filename = f"pm25__washu__{cfg.polygon_name}_{cfg.temporal_freq}__{cfg.year}_{month}.parquet" + output_filename = f"pm25__randall__{cfg.polygon_name}_{cfg.temporal_freq}__{cfg.year}_{month}.parquet" - output_path = f"data/output/pm25__washu/{cfg.polygon_name}_{cfg.temporal_freq}/{output_filename}" + output_path = f"data/output/pm25__randall/{cfg.polygon_name}_{cfg.temporal_freq}/{output_filename}" df.to_parquet(output_path) # plot aggregation map using geopandas diff --git a/src/download_pm25.py b/src/download_pm25.py index a063c41..cbb5ab6 100644 --- a/src/download_pm25.py +++ b/src/download_pm25.py @@ -25,7 +25,7 @@ def main(cfg): # == setup chrome driver # Expand the tilde to the user's home directory - download_dir = f"data/input/pm25__washu__raw/" + download_dir = f"data/input/pm25__randall__raw/" download_dir = os.path.abspath(download_dir) download_zip = f"{download_dir}/{cfg.satellite_pm25[cfg.temporal_freq].zipname}.zip" src_dir = f"{download_dir}/{cfg.satellite_pm25[cfg.temporal_freq].zipname}" From 486da5950187ae05c6bb50de7c43d7a844f8c75f Mon Sep 17 00:00:00 2001 From: audiracmichelle Date: Sat, 13 Sep 2025 16:14:29 -0400 Subject: [PATCH 02/29] updated env --- requirements.yaml | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/requirements.yaml b/requirements.yaml index 6f0ee4d..596e80a 100644 --- a/requirements.yaml +++ b/requirements.yaml @@ -1,4 +1,4 @@ -name: satellite_pm25_raster2polygon +name: pm25_randall channels: - conda-forge - defaults @@ -19,6 +19,3 @@ dependencies: - selenium==4.29.0 - chromedriver-binary==135.0.7030.0.0 - tqdm==4.67.1 - - torch==2.6.0 - - torchaudio==2.6.0 - - torchvision==0.21.0 From a5f0cb192d1f90de1545c8548c935b20762a4d61 Mon Sep 17 00:00:00 2001 From: audiracmichelle Date: Sat, 13 Sep 2025 16:14:53 -0400 Subject: [PATCH 03/29] renamed file --- requirements.yaml => environment.yaml | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename requirements.yaml => environment.yaml (100%) diff --git a/requirements.yaml b/environment.yaml similarity index 100% rename from requirements.yaml rename to environment.yaml From cd7a5cd0b005654022ce113cdf83ddd24e2ffff9 Mon Sep 17 00:00:00 2001 From: audiracmichelle Date: Sat, 13 Sep 2025 16:15:45 -0400 Subject: [PATCH 04/29] fixed typo --- conf/datapaths/cannon_datapaths.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/conf/datapaths/cannon_datapaths.yaml b/conf/datapaths/cannon_datapaths.yaml index 454d1a1..36a510e 100644 --- a/conf/datapaths/cannon_datapaths.yaml +++ b/conf/datapaths/cannon_datapaths.yaml @@ -1,8 +1,8 @@ # if files are stored within the local copy of the repository, then use null: input: pm25__randall__raw: - yearly: /n/dominici_lab/lab/lego/environmnetal/pm25__randall/raw/yearly #/n/netscratch/dominici_lab/Lab/pm25__randall__raw/yearly - monthly: /n/dominici_lab/lab/lego/environmnetal/pm25__randall/raw/monthly #/n/netscratch/dominici_lab/Lab/pm25__randall__raw/monthly + yearly: /n/dominici_lab/lab/lego/environmental/pm25__randall/raw/yearly #/n/netscratch/dominici_lab/Lab/pm25__randall__raw/yearly + monthly: /n/dominici_lab/lab/lego/environmental/pm25__randall/raw/monthly #/n/netscratch/dominici_lab/Lab/pm25__randall__raw/monthly shapefiles: null output: From cc423d471f9c209cd721cc42980c61b334a85aa8 Mon Sep 17 00:00:00 2001 From: audiracmichelle Date: Sat, 13 Sep 2025 17:52:44 -0400 Subject: [PATCH 05/29] distinguished between v5 versions --- ...V5GL04.HybridPM25c_0p10.NorthAmerica.yaml} | 0 ...5GL0502.HybridPM25c_0p05.NorthAmerica.yaml | 19 +++++++++++++++++++ 2 files changed, 19 insertions(+) rename conf/satellite_pm25/{us_pm25.yaml => V5GL04.HybridPM25c_0p10.NorthAmerica.yaml} (100%) create mode 100644 conf/satellite_pm25/V5GL0502.HybridPM25c_0p05.NorthAmerica.yaml diff --git a/conf/satellite_pm25/us_pm25.yaml b/conf/satellite_pm25/V5GL04.HybridPM25c_0p10.NorthAmerica.yaml similarity index 100% rename from conf/satellite_pm25/us_pm25.yaml rename to conf/satellite_pm25/V5GL04.HybridPM25c_0p10.NorthAmerica.yaml diff --git a/conf/satellite_pm25/V5GL0502.HybridPM25c_0p05.NorthAmerica.yaml b/conf/satellite_pm25/V5GL0502.HybridPM25c_0p05.NorthAmerica.yaml new file mode 100644 index 0000000..94ab872 --- /dev/null +++ b/conf/satellite_pm25/V5GL0502.HybridPM25c_0p05.NorthAmerica.yaml @@ -0,0 +1,19 @@ +yearly: + url: https://wustl.app.box.com/v/ACAG-V5GL0502-GWRPM25c0p05/folder/293383209520 + + zipname: Annual + + file_prefix: "V5GL0502.HybridPM25c_0p05.NorthAmerica" + #file name convention is V5GL0502.HybridPM25c_0p05.NorthAmerica.yyyymm-yyyymm.nc + +monthly: + url: https://wustl.app.box.com/v/ACAG-V5GL0502-GWRPM25c0p05/folder/293385030318 + + zipname: Monthly + + file_prefix: "V5GL0502.HybridPM25c_0p05.NorthAmerica" + #file name convention is V5GL0502.HybridPM25c_0p05.NorthAmerica.yyyymm-yyyymm.nc + +layer: "GWRPM25" #geographic weighted regression PM2.5 +latitude_layer: "lat" +longitude_layer: "lon" From b670581503347d5436f9294518cc84de55d864fa Mon Sep 17 00:00:00 2001 From: audiracmichelle Date: Sat, 13 Sep 2025 17:52:58 -0400 Subject: [PATCH 06/29] updated create_dir_paths --- utils/create_dir_paths.py | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/utils/create_dir_paths.py b/utils/create_dir_paths.py index 696390d..56d8b47 100644 --- a/utils/create_dir_paths.py +++ b/utils/create_dir_paths.py @@ -5,14 +5,27 @@ LOGGER = logging.getLogger(__name__) +def init_folder(datapath="data", folder_cfg=None): + folder_dict = folder_cfg.dirs + if not os.path.exists(datapath): + LOGGER.info(f"Error: {datapath} does not exists.") + return + + # appending name of geography to root datapath + if folder_cfg.name is not None: + datapath = os.path.join(datapath, folder_cfg.name) + os.makedirs(datapath, exist_ok=True) + + create_subfolders_and_links(datapath=datapath, folder_dict=folder_dict) def create_subfolders_and_links(datapath="data", folder_dict=None): """ Recursively create subfolders and symbolic links. """ if not os.path.exists(datapath): - LOGGER.info(f"Error: {datapath} does not exists.") + LOGGER.info(f"Error: {datapath} does not exist.") return + if isinstance(folder_dict, DictConfig): for path, subfolder_dict in folder_dict.items(): sub_datapath = os.path.join(datapath, path) @@ -50,7 +63,7 @@ def create_subfolders_and_links(datapath="data", folder_dict=None): @hydra.main(config_path="../conf", config_name="config", version_base=None) def main(cfg): """Create data subfolders and symbolic links as indicated in config file.""" - create_subfolders_and_links(folder_dict=cfg.datapaths) + init_folder(folder_cfg=cfg.datapaths) if __name__ == "__main__": - main() + main() \ No newline at end of file From b9f02986727128f1610c28ed42d6c05637342e8c Mon Sep 17 00:00:00 2001 From: audiracmichelle Date: Sat, 13 Sep 2025 17:57:24 -0400 Subject: [PATCH 07/29] utilized folder_cfg.base_path --- utils/create_dir_paths.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/utils/create_dir_paths.py b/utils/create_dir_paths.py index 56d8b47..a3d0d13 100644 --- a/utils/create_dir_paths.py +++ b/utils/create_dir_paths.py @@ -5,17 +5,21 @@ LOGGER = logging.getLogger(__name__) -def init_folder(datapath="data", folder_cfg=None): +def init_folder(folder_cfg=None): folder_dict = folder_cfg.dirs - if not os.path.exists(datapath): - LOGGER.info(f"Error: {datapath} does not exists.") - return - # appending name of geography to root datapath - if folder_cfg.name is not None: - datapath = os.path.join(datapath, folder_cfg.name) + # defines a base path for the data + datapath = folder_cfg.base_path + if datapath is None: + datapath = "data" + # check if datapath exists, if not create it + if os.path.exists(datapath): + LOGGER.info(f"Base path {datapath} already exists") + else: + LOGGER.info(f"Creating base path {datapath}") os.makedirs(datapath, exist_ok=True) + # create subfolders and symbolic links create_subfolders_and_links(datapath=datapath, folder_dict=folder_dict) def create_subfolders_and_links(datapath="data", folder_dict=None): From f52603646ab5f45fa94164bdcc3ce22472e42a26 Mon Sep 17 00:00:00 2001 From: audiracmichelle Date: Sat, 13 Sep 2025 18:05:42 -0400 Subject: [PATCH 08/29] modify structure --- conf/datapaths/cannon_datapaths.yaml | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/conf/datapaths/cannon_datapaths.yaml b/conf/datapaths/cannon_datapaths.yaml index 36a510e..45845f8 100644 --- a/conf/datapaths/cannon_datapaths.yaml +++ b/conf/datapaths/cannon_datapaths.yaml @@ -1,13 +1,14 @@ -# if files are stored within the local copy of the repository, then use null: -input: - pm25__randall__raw: - yearly: /n/dominici_lab/lab/lego/environmental/pm25__randall/raw/yearly #/n/netscratch/dominici_lab/Lab/pm25__randall__raw/yearly - monthly: /n/dominici_lab/lab/lego/environmental/pm25__randall/raw/monthly #/n/netscratch/dominici_lab/Lab/pm25__randall__raw/monthly - shapefiles: null +base_path: data/V5GL -output: - pm25__randall: - zcta_yearly: /n/dominici_lab/lab/lego/environmental/pm25__randall/zcta_yearly - zcta_monthly: /n/dominici_lab/lab/lego/environmental/pm25__randall/zcta_monthly - county_yearly: /n/dominici_lab/lab/lego/environmental/pm25__randall/county_yearly - county_monthly: /n/dominici_lab/lab/lego/environmental/pm25__randall/county_monthly +dirs: + input: + raw: + yearly: /n/dominici_lab/lab/lego/environmental/pm25__randall/V5GL/raw/yearly #/n/netscratch/dominici_lab/Lab/pm25__randall__raw/yearly + monthly: /n/dominici_lab/lab/lego/environmental/pm25__randall/V5GL/raw/monthly #/n/netscratch/dominici_lab/Lab/pm25__randall__raw/monthly + shapefiles: null + + output: + zcta_yearly: /n/dominici_lab/lab/lego/environmental/pm25__randall/V5GL/zcta_yearly + zcta_monthly: /n/dominici_lab/lab/lego/environmental/pm25__randall/V5GL/zcta_monthly + county_yearly: /n/dominici_lab/lab/lego/environmental/pm25__randall/V5GL/county_yearly + county_monthly: /n/dominici_lab/lab/lego/environmental/pm25__randall/V5GL/county_monthly From 9d7a5ac986b436cd7098d73bd1fb824ea2152483 Mon Sep 17 00:00:00 2001 From: audiracmichelle Date: Sat, 13 Sep 2025 19:37:41 -0400 Subject: [PATCH 09/29] moved file --- utils/create_dir_paths.py => src/create_datapaths.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename utils/create_dir_paths.py => src/create_datapaths.py (100%) diff --git a/utils/create_dir_paths.py b/src/create_datapaths.py similarity index 100% rename from utils/create_dir_paths.py rename to src/create_datapaths.py From 2a75e31095b46568e515d60087ced837b3b22742 Mon Sep 17 00:00:00 2001 From: audiracmichelle Date: Sat, 13 Sep 2025 19:38:10 -0400 Subject: [PATCH 10/29] updated confs for v5 and v6 --- conf/config.yaml | 2 +- ...{cannon_datapaths.yaml => cannon_v5gl.yaml} | 0 conf/datapaths/cannon_v6gl.yaml | 14 ++++++++++++++ conf/datapaths/datapaths.yaml | 17 +++++++++-------- .../V6GL02.04.CNNPM25.0p10.NA.yaml | 18 ++++++++++++++++++ 5 files changed, 42 insertions(+), 9 deletions(-) rename conf/datapaths/{cannon_datapaths.yaml => cannon_v5gl.yaml} (100%) create mode 100644 conf/datapaths/cannon_v6gl.yaml create mode 100644 conf/satellite_pm25/V6GL02.04.CNNPM25.0p10.NA.yaml diff --git a/conf/config.yaml b/conf/config.yaml index c3c4483..ad6c759 100644 --- a/conf/config.yaml +++ b/conf/config.yaml @@ -2,7 +2,7 @@ defaults: - _self_ - datapaths: cannon_datapaths - shapefiles: shapefiles - - satellite_pm25: us_pm25 + - satellite_pm25: V5GL0502.HybridPM25c_0p05.NorthAmerica # == aggregation args temporal_freq: yearly # yearly, monthly to be matched with cfg.satellite_pm25 diff --git a/conf/datapaths/cannon_datapaths.yaml b/conf/datapaths/cannon_v5gl.yaml similarity index 100% rename from conf/datapaths/cannon_datapaths.yaml rename to conf/datapaths/cannon_v5gl.yaml diff --git a/conf/datapaths/cannon_v6gl.yaml b/conf/datapaths/cannon_v6gl.yaml new file mode 100644 index 0000000..50fcbf7 --- /dev/null +++ b/conf/datapaths/cannon_v6gl.yaml @@ -0,0 +1,14 @@ +base_path: data/V6GL + +dirs: + input: + raw: + yearly: /n/dominici_lab/lab/lego/environmental/pm25__randall/V6GL/raw/yearly #/n/netscratch/dominici_lab/Lab/pm25__randall__raw/yearly + monthly: /n/dominici_lab/lab/lego/environmental/pm25__randall/V6GL/raw/monthly #/n/netscratch/dominici_lab/Lab/pm25__randall__raw/monthly + shapefiles: null + + output: + zcta_yearly: /n/dominici_lab/lab/lego/environmental/pm25__randall/V6GL/zcta_yearly + zcta_monthly: /n/dominici_lab/lab/lego/environmental/pm25__randall/V6GL/zcta_monthly + county_yearly: /n/dominici_lab/lab/lego/environmental/pm25__randall/V6GL/county_yearly + county_monthly: /n/dominici_lab/lab/lego/environmental/pm25__randall/V6GL/county_monthly diff --git a/conf/datapaths/datapaths.yaml b/conf/datapaths/datapaths.yaml index 3963f44..3250093 100644 --- a/conf/datapaths/datapaths.yaml +++ b/conf/datapaths/datapaths.yaml @@ -1,12 +1,13 @@ -# if files are stored within the local copy of the repository, then use null: -input: - pm25__randall__raw: - yearly: null - monthly: null - shapefiles: null +base_path: data/V6GL -output: - pm25__randall: +dirs: + input: + raw: + yearly: null + monthly: null + shapefiles: null + + output: zcta_yearly: null zcta_monthly: null county_yearly: null diff --git a/conf/satellite_pm25/V6GL02.04.CNNPM25.0p10.NA.yaml b/conf/satellite_pm25/V6GL02.04.CNNPM25.0p10.NA.yaml new file mode 100644 index 0000000..97cb511 --- /dev/null +++ b/conf/satellite_pm25/V6GL02.04.CNNPM25.0p10.NA.yaml @@ -0,0 +1,18 @@ +yearly: + url: https://wustl.app.box.com/s/s7eiaxytjr9w1z7glat45cesitcemprv/folder/327763225614 + + zipname: Annual + + file_prefix: "V6GL02.04.CNNPM25.0p10.NA" + #file name convention is V6GL02.04.CNNPM25.0p10.NA.yyyymm-yyyymm.nc + +monthly: + url: https://wustl.app.box.com/s/s7eiaxytjr9w1z7glat45cesitcemprv/folder/327764742544 + zipname: Monthly + + file_prefix: "V6GL02.04.CNNPM25.0p10.NA" + #file name convention is V6GL02.04.CNNPM25.0p10.NA.yyyymm-yyyymm.nc + +layer: "GWRPM25" #geographic weighted regression PM2.5 +latitude_layer: "lat" +longitude_layer: "lon" From 26d017c521ad1465040bc2cb06c85b9c98461fe4 Mon Sep 17 00:00:00 2001 From: audiracmichelle Date: Sat, 13 Sep 2025 20:26:21 -0400 Subject: [PATCH 11/29] tested basepaths --- conf/config.yaml | 4 ++-- src/download_shapefile.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/conf/config.yaml b/conf/config.yaml index ad6c759..3c502cd 100644 --- a/conf/config.yaml +++ b/conf/config.yaml @@ -1,6 +1,6 @@ defaults: - _self_ - - datapaths: cannon_datapaths + - datapaths: cannon_v5gl - shapefiles: shapefiles - satellite_pm25: V5GL0502.HybridPM25c_0p05.NorthAmerica @@ -9,7 +9,7 @@ temporal_freq: yearly # yearly, monthly to be matched with cfg.satellite_pm25 year: 2020 # == shapefile download args -polygon_name: zcta # zcta, county to be matched with cfg.shapefiles +polygon_name: county # zcta, county to be matched with cfg.shapefiles shapefile_year: 2020 #to be matched with cfg.shapefiles show_progress: false diff --git a/src/download_shapefile.py b/src/download_shapefile.py index 492c9ca..0ea0059 100644 --- a/src/download_shapefile.py +++ b/src/download_shapefile.py @@ -8,7 +8,7 @@ def main(cfg): url = cfg.shapefiles[cfg.polygon_name][cfg.shapefile_year].url - tgt = f"data/input/shapefiles/shapefile_{cfg.polygon_name}_{cfg.shapefile_year}" + tgt = f"{cfg.datapaths.base_path}/input/shapefiles/shapefile_{cfg.polygon_name}_{cfg.shapefile_year}" tgtdir = os.path.dirname(tgt) tgtfile = os.path.basename(tgt) From 3e2c4baf7c5051230ed5b095b032c4ce3317bd08 Mon Sep 17 00:00:00 2001 From: audiracmichelle Date: Sat, 13 Sep 2025 20:26:42 -0400 Subject: [PATCH 12/29] modified paths --- notes/eda_input.ipynb | 12 ++++++------ notes/eda_output.ipynb | 18 +++++++++--------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/notes/eda_input.ipynb b/notes/eda_input.ipynb index e83212f..55a1ae2 100644 --- a/notes/eda_input.ipynb +++ b/notes/eda_input.ipynb @@ -38,7 +38,7 @@ ], "source": [ "# Open the netCDF file\n", - "file_path = \"../data/input/satellite_pm25/yearly/V5GL04.HybridPM25c_0p10.NorthAmerica.202201-202212.nc\"\n", + "file_path = f\"../{cfg.datapaths.base_path}/input/satellite_pm25/yearly/V5GL04.HybridPM25c_0p10.NorthAmerica.202201-202212.nc\"\n", "dataset = netCDF4.Dataset(file_path)\n", "\n", "# Print the global attributes\n", @@ -183,7 +183,7 @@ }, { "cell_type": "code", - "execution_count": 36, + "execution_count": null, "metadata": {}, "outputs": [ { @@ -210,7 +210,7 @@ "import matplotlib.pyplot as plt\n", "\n", "# Open the netCDF file\n", - "file_path = \"data/V5GL04.HybridPM25.NorthAmerica.202201-202212.nc\"\n", + "file_path = f\"{cfg.datapaths.base_path}/V5GL04.HybridPM25.NorthAmerica.202201-202212.nc\"\n", "dataset = netCDF4.Dataset(file_path)\n", "\n", "# Get the latitude and longitude variables\n", @@ -260,7 +260,7 @@ }, { "cell_type": "code", - "execution_count": 45, + "execution_count": null, "metadata": {}, "outputs": [ { @@ -280,12 +280,12 @@ "import matplotlib.pyplot as plt\n", "\n", "# Read the CSV file\n", - "pm25_data = pd.read_csv('data/county_pm25.csv')\n", + "pm25_data = pd.read_csv(f'{cfg.datapaths.base_path}/county_pm25.csv')\n", "# Convert GEOID to string using trailing zeros\n", "pm25_data['GEOID'] = pm25_data['GEOID'].astype(str).str.zfill(5)\n", "\n", "# Read the shapefile\n", - "shapefile = gpd.read_file('data/shapefile_cb_county_2015/shapefile.shp')\n", + "shapefile = gpd.read_file(f\"{cfg.datapaths.base_path}/shapefile_cb_county_2015/shapefile.shp\")\n", "\n", "# Merge the data\n", "merged_data = shapefile.merge(pm25_data, on='GEOID', how='left')\n", diff --git a/notes/eda_output.ipynb b/notes/eda_output.ipynb index 9ab63ca..dcc5f7e 100644 --- a/notes/eda_output.ipynb +++ b/notes/eda_output.ipynb @@ -19,7 +19,7 @@ }, { "cell_type": "code", - "execution_count": 14, + "execution_count": null, "metadata": {}, "outputs": [ { @@ -47,7 +47,7 @@ ], "source": [ "# Open the netCDF file\n", - "file_path = \"data/V5GL04.HybridPM25.NorthAmerica.202201-202212.nc\"\n", + "file_path = f\"{cfg.datapaths.base_path}/input/satellite_pm25/yearly/V5GL04.HybridPM25c_0p10.NorthAmerica.202201-202212.nc\"\n", "dataset = netCDF4.Dataset(file_path)\n", "\n", "# Print the global attributes\n", @@ -192,7 +192,7 @@ }, { "cell_type": "code", - "execution_count": 36, + "execution_count": null, "metadata": {}, "outputs": [ { @@ -219,7 +219,7 @@ "import matplotlib.pyplot as plt\n", "\n", "# Open the netCDF file\n", - "file_path = \"data/V5GL04.HybridPM25.NorthAmerica.202201-202212.nc\"\n", + "file_path = f\"{cfg.datapaths.base_path}/input/satellite_pm25/yearly/V5GL04.HybridPM25c_0p10.NorthAmerica.202201-202212.nc\"\n", "dataset = netCDF4.Dataset(file_path)\n", "\n", "# Get the latitude and longitude variables\n", @@ -269,7 +269,7 @@ }, { "cell_type": "code", - "execution_count": 11, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -279,7 +279,7 @@ "import pyarrow.parquet as pq\n", "\n", "# Read parquet file with pm25 at county level for 2015\n", - "pm25_data = pq.read_table(\"data/output/satellite_pm25_raster2polygon/monthly/satellite_pm25_zcta_2015_01.parquet\").to_pandas()" + "pm25_data = pq.read_table(f\"{cfg.datapaths.base_path}/datapaths.base_path}/datapaths.base_path}/datapaths.base_path}/output/satellite_pm25_raster2polygon/monthly/satellite_pm25_zcta_2015_01.parquet\").to_pandas()" ] }, { @@ -368,7 +368,7 @@ }, { "cell_type": "code", - "execution_count": 45, + "execution_count": null, "metadata": {}, "outputs": [ { @@ -388,12 +388,12 @@ "import matplotlib.pyplot as plt\n", "\n", "# Read the CSV file\n", - "pm25_data = pd.read_csv('data/county_pm25.csv')\n", + "pm25_data = pd.read_csv(f'{cfg.datapaths.base_path}/county_pm25.csv')\n", "# Convert GEOID to string using trailing zeros\n", "pm25_data['GEOID'] = pm25_data['GEOID'].astype(str).str.zfill(5)\n", "\n", "# Read the shapefile\n", - "shapefile = gpd.read_file('data/shapefile_cb_county_2015/shapefile.shp')\n", + "shapefile = gpd.read_file('{cfg.datapaths.base_path}/datapaths.base_path}/datapaths.base_path}/shapefile_cb_county_2015/shapefile.shp')\n", "\n", "# Merge the data\n", "merged_data = shapefile.merge(pm25_data, on='GEOID', how='left')\n", From 2c02695f2c872cf1d1b9af6e406a693130cfea97 Mon Sep 17 00:00:00 2001 From: audiracmichelle Date: Sat, 13 Sep 2025 20:46:06 -0400 Subject: [PATCH 13/29] tested basepaths --- src/download_pm25.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/download_pm25.py b/src/download_pm25.py index cbb5ab6..2466aec 100644 --- a/src/download_pm25.py +++ b/src/download_pm25.py @@ -25,7 +25,7 @@ def main(cfg): # == setup chrome driver # Expand the tilde to the user's home directory - download_dir = f"data/input/pm25__randall__raw/" + download_dir = f"{cfg.datapaths.base_path}/input/raw/" download_dir = os.path.abspath(download_dir) download_zip = f"{download_dir}/{cfg.satellite_pm25[cfg.temporal_freq].zipname}.zip" src_dir = f"{download_dir}/{cfg.satellite_pm25[cfg.temporal_freq].zipname}" From 7d318ccf20d8c948bb21580bc21ae67c729b8082 Mon Sep 17 00:00:00 2001 From: audiracmichelle Date: Sat, 13 Sep 2025 22:54:04 -0400 Subject: [PATCH 14/29] tested base paths --- src/aggregate_pm25.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/aggregate_pm25.py b/src/aggregate_pm25.py index d1ee472..cb320b5 100644 --- a/src/aggregate_pm25.py +++ b/src/aggregate_pm25.py @@ -39,7 +39,7 @@ def main(cfg): #use previously available shapefile shapefile_year = available_shapefile_year(cfg.year, shapefile_years_list) - shape_path = f'data/input/shapefiles/shapefile_{cfg.polygon_name}_{shapefile_year}/shapefile.shp' + shape_path = f'{cfg.datapaths.base_path}/input/shapefiles/shapefile_{cfg.polygon_name}_{shapefile_year}/shapefile.shp' polygon = gpd.read_file(shape_path) polygon_ids = polygon[cfg.shapefiles[cfg.polygon_name][shapefile_year].idvar].values @@ -62,7 +62,7 @@ def main(cfg): # load the first file to obtain the affine transform/boundaries LOGGER.info("Mapping polygons to raster cells.") - ds = xarray.open_dataset(f"data/input/pm25__randall__raw/{cfg.temporal_freq}/{filenames[0]}") + ds = xarray.open_dataset(f"{cfg.datapaths.base_path}/input/raw/{cfg.temporal_freq}/{filenames[0]}") layer = getattr(ds, cfg.satellite_pm25.layer) # obtain affine transform/boundaries @@ -90,7 +90,7 @@ def main(cfg): if i > 0: # reload the file only if it is different from the first one - ds = xarray.open_dataset(f"data/input/pm25__randall__raw/{cfg.temporal_freq}/{filename}") + ds = xarray.open_dataset(f"{cfg.datapaths.base_path}/input/raw/{cfg.temporal_freq}/{filename}") layer = getattr(ds, cfg.satellite_pm25.layer) # === obtain stats quickly using precomputed mapping @@ -119,7 +119,7 @@ def main(cfg): df["month"] = month output_filename = f"pm25__randall__{cfg.polygon_name}_{cfg.temporal_freq}__{cfg.year}_{month}.parquet" - output_path = f"data/output/pm25__randall/{cfg.polygon_name}_{cfg.temporal_freq}/{output_filename}" + output_path = f"{cfg.datapaths.base_path}/output/{cfg.polygon_name}_{cfg.temporal_freq}/{output_filename}" df.to_parquet(output_path) # plot aggregation map using geopandas From 453fa67cf2554e44f7d37fb70e5e0a436ab2ded6 Mon Sep 17 00:00:00 2001 From: audiracmichelle Date: Sat, 13 Sep 2025 23:00:45 -0400 Subject: [PATCH 15/29] tested base paths --- Snakefile | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/Snakefile b/Snakefile index 5324af8..b6e63a1 100644 --- a/Snakefile +++ b/Snakefile @@ -17,10 +17,10 @@ temporal_freq = config['temporal_freq'] polygon_name = config['polygon_name'] with initialize(version_base=None, config_path="conf"): - hydra_cfg = compose(config_name="config", overrides=[f"temporal_freq={temporal_freq}", f"polygon_name={polygon_name}"]) + cfg = compose(config_name="config", overrides=[f"temporal_freq={temporal_freq}", f"polygon_name={polygon_name}"]) -satellite_pm25_cfg = hydra_cfg.satellite_pm25 -shapefiles_cfg = hydra_cfg.shapefiles +satellite_pm25_cfg = cfg.satellite_pm25 +shapefiles_cfg = cfg.shapefiles shapefile_years_list = list(shapefiles_cfg[polygon_name].keys()) @@ -31,7 +31,7 @@ years_list = list(range(1998, 2023 + 1)) rule all: input: expand( - f"data/output/pm25__randall/{polygon_name}_{temporal_freq}/pm25__randall__{polygon_name}_{temporal_freq}__" + + f"{cfg.datapaths.base_path}/output/{polygon_name}_{temporal_freq}/pm25__randall__{polygon_name}_{temporal_freq}__" + ("{year}.parquet" if temporal_freq == 'yearly' else "{year}_{month}.parquet"), year=years_list, month=months_list @@ -40,14 +40,14 @@ rule all: # remove and use symlink to the us census geoboundaries rule download_shapefiles: output: - f"data/input/shapefiles/shapefile_{polygon_name}_" + "{shapefile_year}/shapefile.shp" + f"{cfg.datapaths.base_path}/input/shapefiles/shapefile_{polygon_name}_" + "{shapefile_year}/shapefile.shp" shell: f"python src/download_shapefile.py polygon_name={polygon_name} " + "shapefile_year={wildcards.shapefile_year}" rule download_satellite_pm25: output: expand( - f"data/input/pm25__randall__raw/{temporal_freq}/{satellite_pm25_cfg[temporal_freq]['file_prefix']}." + + f"{cfg.datapaths.base_path}/input/raw/{temporal_freq}/{satellite_pm25_cfg[temporal_freq]['file_prefix']}." + ("{year}01-{year}12.nc" if temporal_freq == 'yearly' else "{year}{month}-{year}{month}.nc"), year=years_list, month=months_list) @@ -58,20 +58,20 @@ rule download_satellite_pm25: def get_shapefile_input(wildcards): shapefile_year = available_shapefile_year(int(wildcards.year), shapefile_years_list) - return f"data/input/shapefiles/shapefile_{polygon_name}_{shapefile_year}/shapefile.shp" + return f"{cfg.datapaths.base_path}/input/shapefiles/shapefile_{polygon_name}_{shapefile_year}/shapefile.shp" rule aggregate_pm25: input: get_shapefile_input, expand( - f"data/input/pm25__randall__raw/{temporal_freq}/{satellite_pm25_cfg[temporal_freq]['file_prefix']}." + + f"{cfg.datapaths.base_path}/input/raw/{temporal_freq}/{satellite_pm25_cfg[temporal_freq]['file_prefix']}." + ("{{year}}01-{{year}}12.nc" if temporal_freq == 'yearly' else "{{year}}{month}-{{year}}{month}.nc"), month=months_list ) output: expand( - f"data/output/pm25__randall/{polygon_name}_{temporal_freq}/pm25__randall__{polygon_name}_{temporal_freq}__" + + f"{cfg.datapaths.base_path}/output/{polygon_name}_{temporal_freq}/pm25__randall__{polygon_name}_{temporal_freq}__" + ("{{year}}.parquet" if temporal_freq == 'yearly' else "{{year}}_{month}.parquet"), month=months_list # we only want to expand months_list and keep year as wildcard ) From b05e2ee3e0f7ce0103ff288002919048786ec580 Mon Sep 17 00:00:00 2001 From: audiracmichelle Date: Sat, 13 Sep 2025 23:04:05 -0400 Subject: [PATCH 16/29] renamed files --- {fasrc_jobs => jobs}/README.md | 0 {fasrc_jobs => jobs}/county_monthly.sbatch | 0 {fasrc_jobs => jobs}/zcta_monthly.sbatch | 0 3 files changed, 0 insertions(+), 0 deletions(-) rename {fasrc_jobs => jobs}/README.md (100%) rename {fasrc_jobs => jobs}/county_monthly.sbatch (100%) rename {fasrc_jobs => jobs}/zcta_monthly.sbatch (100%) diff --git a/fasrc_jobs/README.md b/jobs/README.md similarity index 100% rename from fasrc_jobs/README.md rename to jobs/README.md diff --git a/fasrc_jobs/county_monthly.sbatch b/jobs/county_monthly.sbatch similarity index 100% rename from fasrc_jobs/county_monthly.sbatch rename to jobs/county_monthly.sbatch diff --git a/fasrc_jobs/zcta_monthly.sbatch b/jobs/zcta_monthly.sbatch similarity index 100% rename from fasrc_jobs/zcta_monthly.sbatch rename to jobs/zcta_monthly.sbatch From 1c36ecb7f5a337a7b862204c90b01d9ed5e05388 Mon Sep 17 00:00:00 2001 From: audiracmichelle Date: Sat, 13 Sep 2025 23:31:53 -0400 Subject: [PATCH 17/29] updated jobs --- jobs/county_monthly.sbatch | 4 +++- jobs/v5gl.sbatch | 13 +++++++++++++ jobs/zcta_monthly.sbatch | 5 ++++- jobs/zcta_yearly.sbatch | 10 ++++++++++ 4 files changed, 30 insertions(+), 2 deletions(-) create mode 100644 jobs/v5gl.sbatch create mode 100644 jobs/zcta_yearly.sbatch diff --git a/jobs/county_monthly.sbatch b/jobs/county_monthly.sbatch index 2f47486..4e28318 100644 --- a/jobs/county_monthly.sbatch +++ b/jobs/county_monthly.sbatch @@ -5,4 +5,6 @@ #SBATCH --mem 96GB # memory #SBATCH -t 0-02:00 # time (D-HH:MM) -singularity exec $HOME/singularity_images/satellite_pm25_raster2polygon_latest.sif snakemake --cores 16 -C polygon_name=county temporal_freq=monthly +#singularity exec $HOME/singularity_images/satellite_pm25_raster2polygon_latest.sif snakemake --cores 16 -C polygon_name=county temporal_freq=monthly + +snakemake --cores 16 -C polygon_name=county temporal_freq=monthly diff --git a/jobs/v5gl.sbatch b/jobs/v5gl.sbatch new file mode 100644 index 0000000..8bb89cc --- /dev/null +++ b/jobs/v5gl.sbatch @@ -0,0 +1,13 @@ +#!/bin/bash +# +#SBATCH -p serial_requeue # partition (queue) +#SBATCH -c 48 # number of cores +#SBATCH --mem 184GB # memory +#SBATCH -t 0-12:00 # time (D-HH:MM) + +#singularity exec $HOME/singularity_images/satellite_pm25_raster2polygon_latest.sif snakemake --cores 16 -C polygon_name=county temporal_freq=monthly + +snakemake --cores 24 -C polygon_name=county temporal_freq=yearly +snakemake --cores 24 -C polygon_name=county temporal_freq=monthly +snakemake --cores 24 -C polygon_name=zcta temporal_freq=yearly +snakemake --cores 24 -C polygon_name=zcta temporal_freq=monthly diff --git a/jobs/zcta_monthly.sbatch b/jobs/zcta_monthly.sbatch index 38c04a4..2f1d953 100644 --- a/jobs/zcta_monthly.sbatch +++ b/jobs/zcta_monthly.sbatch @@ -5,4 +5,7 @@ #SBATCH --mem 96GB # memory #SBATCH -t 0-01:00 # time (D-HH:MM) -singularity exec $HOME/singularity_images/satellite_pm25_raster2polygon_latest.sif snakemake --cores 16 -C polygon_name=zcta temporal_freq=monthly +#singularity exec $HOME/singularity_images/satellite_pm25_raster2polygon_latest.sif snakemake --cores 16 -C polygon_name=zcta temporal_freq=monthly + +snakemake --cores 32 -C polygon_name=zcta temporal_freq=yearly + diff --git a/jobs/zcta_yearly.sbatch b/jobs/zcta_yearly.sbatch new file mode 100644 index 0000000..9d94155 --- /dev/null +++ b/jobs/zcta_yearly.sbatch @@ -0,0 +1,10 @@ +#!/bin/bash +# +#SBATCH -p shared # partition (queue) +#SBATCH -c 32 # number of cores +#SBATCH --mem 96GB # memory +#SBATCH -t 0-01:00 # time (D-HH:MM) + +#singularity exec $HOME/singularity_images/satellite_pm25_raster2polygon_latest.sif snakemake --cores 16 -C polygon_name=zcta temporal_freq=monthly + +snakemake --cores 32 -C polygon_name=zcta temporal_freq=yearly From 20e9a3b7da40a6425c2bb4989274ca9658b57d74 Mon Sep 17 00:00:00 2001 From: shreyanalluri Date: Thu, 6 Nov 2025 14:09:36 -0500 Subject: [PATCH 18/29] Trying to flatten monthly download directory so that there aren't any subfolders --- src/download_pm25.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/src/download_pm25.py b/src/download_pm25.py index 2466aec..2fc0fa8 100644 --- a/src/download_pm25.py +++ b/src/download_pm25.py @@ -78,16 +78,20 @@ def main(cfg): with zipfile.ZipFile(download_zip, "r") as zip_ref: zip_ref.extractall(download_dir) - # Move all files from the src_dir to dest_dir + # Move all files from the src_dir to dest_dir, flattening the directory structure os.makedirs(dest_dir, exist_ok=True) - for file in os.listdir(src_dir): - shutil.move(os.path.join(src_dir, file), dest_dir) - - # Remove the zip file and the empty folder + for root, dirs, files in os.walk(src_dir): + for file in files: + src_file = os.path.join(root, file) + dest_file = os.path.join(dest_dir, file) + shutil.move(src_file, dest_file) + logger.info(f"Moved {file} to {dest_dir}") + + # Remove the zip file and the source folder (including any empty subfolders) os.remove(download_zip) - os.rmdir(src_dir) + shutil.rmtree(src_dir) - # Remove anyfile starging with Unconfirmed (this might be a Chrome bug/artifact) + # Remove any file starting with Unconfirmed (this might be a Chrome bug/artifact) for file in os.listdir(download_dir): if file.startswith("Unconfirmed"): os.remove(os.path.join(download_dir, file)) From 2c49563d900cb780531a106f4daed9e14abffc5d Mon Sep 17 00:00:00 2001 From: shreyanalluri Date: Thu, 6 Nov 2025 14:36:21 -0500 Subject: [PATCH 19/29] layer name for V6GL is "PM25" not "GWRPM25" --- conf/satellite_pm25/V6GL02.04.CNNPM25.0p10.NA.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/conf/satellite_pm25/V6GL02.04.CNNPM25.0p10.NA.yaml b/conf/satellite_pm25/V6GL02.04.CNNPM25.0p10.NA.yaml index 97cb511..926f845 100644 --- a/conf/satellite_pm25/V6GL02.04.CNNPM25.0p10.NA.yaml +++ b/conf/satellite_pm25/V6GL02.04.CNNPM25.0p10.NA.yaml @@ -13,6 +13,6 @@ monthly: file_prefix: "V6GL02.04.CNNPM25.0p10.NA" #file name convention is V6GL02.04.CNNPM25.0p10.NA.yyyymm-yyyymm.nc -layer: "GWRPM25" #geographic weighted regression PM2.5 +layer: "PM25" #CNN PM2.5 latitude_layer: "lat" longitude_layer: "lon" From 4f1ae9d2bd7694f3cf3ea6e1d1a270995b70bcd6 Mon Sep 17 00:00:00 2001 From: shreyanalluri Date: Thu, 6 Nov 2025 15:49:37 -0500 Subject: [PATCH 20/29] starter script to concatenate monthly files into one yearly file --- src/concat_monthly.py | 186 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 186 insertions(+) create mode 100644 src/concat_monthly.py diff --git a/src/concat_monthly.py b/src/concat_monthly.py new file mode 100644 index 0000000..1f9eb90 --- /dev/null +++ b/src/concat_monthly.py @@ -0,0 +1,186 @@ +#!/usr/bin/env python3 +""" +Consolidate monthly PM2.5 aggregation files into yearly files. + +This script takes the monthly parquet files output from the aggregation pipeline +and combines all 12 months for each year into a single yearly file. +After consolidation, moves monthly files to an intermediate subdirectory. + +Usage: + python src/consolidate_monthly_to_yearly.py + + # With overrides: + python src/consolidate_monthly_to_yearly.py polygon_name=zcta +""" + +import pandas as pd +import os +import hydra +import logging +import re +import shutil +from pathlib import Path + +logging.basicConfig(level=logging.INFO, format='[%(asctime)s][%(levelname)s] - %(message)s') +LOGGER = logging.getLogger(__name__) + + +def discover_available_years(input_dir, polygon_name): + """ + Discover all years that have monthly data available. + + Args: + input_dir: Directory containing monthly parquet files + polygon_name: Name of polygon type (county, zcta, etc.) + + Returns: + Set of years (integers) that have at least one monthly file + """ + years = set() + # Pattern: pm25__randall__county_monthly__2020_01.parquet + pattern = re.compile(rf"pm25__randall__{polygon_name}_monthly__(\d{{4}})_\d{{2}}\.parquet") + + if not os.path.exists(input_dir): + LOGGER.warning(f"Input directory does not exist: {input_dir}") + return years + + for filename in os.listdir(input_dir): + match = pattern.match(filename) + if match: + year = int(match.group(1)) + years.add(year) + + return sorted(years) + + +def consolidate_year(input_dir, year, polygon_name, input_freq="monthly"): + """ + Consolidate all monthly files for a given year into a single yearly file. + Output goes to the same directory as input. + + Args: + input_dir: Directory containing monthly parquet files (also used for output) + year: Year to consolidate + polygon_name: Name of polygon type (county, zcta, etc.) + input_freq: Frequency of input files (monthly) + + Returns: + Tuple of (success: bool, monthly_files: list) - list of files that were consolidated + """ + monthly_files = [] + + # Pattern: pm25__randall__county_monthly__2020_01.parquet + # Collect all monthly files for this year + for month in range(1, 13): + month_str = str(month).zfill(2) + filename = f"pm25__randall__{polygon_name}_{input_freq}__{year}_{month_str}.parquet" + filepath = os.path.join(input_dir, filename) + + if os.path.exists(filepath): + monthly_files.append(filepath) + else: + LOGGER.warning(f"Missing file: {filepath}") + + if not monthly_files: + LOGGER.error(f"No monthly files found for year {year}") + return False, [] + + if len(monthly_files) != 12: + LOGGER.warning(f"Only found {len(monthly_files)}/12 months for year {year}") + + # Read and concatenate all monthly files + LOGGER.info(f"Reading {len(monthly_files)} monthly files for year {year}") + dfs = [pd.read_parquet(f) for f in monthly_files] + yearly_df = pd.concat(dfs, ignore_index=False) + + # Sort by polygon ID and month for consistent ordering + if 'month' in yearly_df.columns: + yearly_df = yearly_df.sort_values(['month']) + + # Save yearly file in the same directory + # Pattern: pm25__randall__county_monthly__2020.parquet (note: keeping "monthly" in parent dir name) + output_file = os.path.join(input_dir, f"pm25__randall__{polygon_name}_monthly__{year}.parquet") + LOGGER.info(f"Saving consolidated file: {output_file} ({len(yearly_df)} rows)") + yearly_df.to_parquet(output_file) + + return True, monthly_files + + +def move_to_intermediate(monthly_files, input_dir): + """ + Move monthly files to an intermediate subdirectory. + + Args: + monthly_files: List of monthly file paths to move + input_dir: Base directory containing the files + """ + intermediate_dir = os.path.join(input_dir, "intermediate") + os.makedirs(intermediate_dir, exist_ok=True) + + LOGGER.info(f"Moving {len(monthly_files)} monthly files to {intermediate_dir}") + + for filepath in monthly_files: + filename = os.path.basename(filepath) + dest_path = os.path.join(intermediate_dir, filename) + shutil.move(filepath, dest_path) + LOGGER.debug(f"Moved {filename} to intermediate/") + + LOGGER.info(f"Successfully moved {len(monthly_files)} files to intermediate/") + + + +@hydra.main(config_path="../conf", config_name="config", version_base=None) +def main(cfg): + """ + Consolidate monthly PM2.5 files into yearly files using Hydra configuration. + Automatically discovers all available years and processes them. + After consolidation, moves monthly files to intermediate subdirectory. + """ + + polygon_name = cfg.polygon_name + + # Build path using datapaths configuration - output to same dir as input + monthly_dir = f"{cfg.datapaths.base_path}/output/{polygon_name}_monthly" + + LOGGER.info(f"Polygon name: {polygon_name}") + LOGGER.info(f"Monthly directory: {monthly_dir}") + + # Discover available years + available_years = discover_available_years(monthly_dir, polygon_name) + + if not available_years: + LOGGER.error(f"No monthly files found in {monthly_dir}") + return + + LOGGER.info(f"Found {len(available_years)} years with monthly data: {min(available_years)}-{max(available_years)}") + + success_count = 0 + fail_count = 0 + all_monthly_files = [] + + # Process each year + for year in available_years: + try: + success, monthly_files = consolidate_year(monthly_dir, year, polygon_name) + if success: + success_count += 1 + all_monthly_files.extend(monthly_files) + else: + fail_count += 1 + except Exception as e: + LOGGER.error(f"Error processing year {year}: {e}") + fail_count += 1 + + LOGGER.info(f"Consolidation complete: {success_count} years successful, {fail_count} failed") + + # Move all monthly files to intermediate directory + if all_monthly_files: + LOGGER.info(f"Moving {len(all_monthly_files)} monthly files to intermediate/") + move_to_intermediate(all_monthly_files, monthly_dir) + else: + LOGGER.warning("No monthly files to move to intermediate directory") + + +if __name__ == "__main__": + main() + From 3036cada4473ca44be36bcbfa671583ffc7f3ee4 Mon Sep 17 00:00:00 2001 From: shreyanalluri Date: Thu, 6 Nov 2025 15:56:36 -0500 Subject: [PATCH 21/29] Updating snakefile to include monthly file concatenation --- Snakefile | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/Snakefile b/Snakefile index b6e63a1..615bb9e 100644 --- a/Snakefile +++ b/Snakefile @@ -83,3 +83,18 @@ rule aggregate_pm25: ("year={wildcards.year}" if temporal_freq == 'yearly' else "year={wildcards.year}") + " &> {log}" ) + +rule concat_monthly: + input: + expand( + f"{cfg.datapaths.base_path}/output/{polygon_name}_monthly/pm25__randall__{polygon_name}_monthly__{{year}}_{month}.parquet", + month=[str(i).zfill(2) for i in range(1, 12 + 1)] + ) + output: + yearly_file=f"{cfg.datapaths.base_path}/output/{polygon_name}_monthly/pm25__randall__{polygon_name}_monthly__{{year}}.parquet", + intermediate_dir=directory(f"{cfg.datapaths.base_path}/output/{polygon_name}_monthly/intermediate") + log: + f"logs/concat_monthly_{polygon_name}_{{year}}.log" + shell: + f"PYTHONPATH=. python src/concat_monthly.py polygon_name={polygon_name} &> {{log}}" + From 877d8ca4a97f328062d9e3bdcb6f308f2e426731 Mon Sep 17 00:00:00 2001 From: shreyanalluri Date: Wed, 12 Nov 2025 15:54:16 -0500 Subject: [PATCH 22/29] Updating concatenation script to only run for a single year, updating rule all to reflect output structure of concat_monthly.py --- Snakefile | 26 ++++++++++--- src/concat_monthly.py | 91 ++++++++++--------------------------------- 2 files changed, 40 insertions(+), 77 deletions(-) diff --git a/Snakefile b/Snakefile index 615bb9e..90e7cc0 100644 --- a/Snakefile +++ b/Snakefile @@ -25,7 +25,7 @@ shapefiles_cfg = cfg.shapefiles shapefile_years_list = list(shapefiles_cfg[polygon_name].keys()) months_list = "01" if temporal_freq == 'yearly' else [str(i).zfill(2) for i in range(1, 12 + 1)] -years_list = list(range(1998, 2023 + 1)) +years_list = list(range(2022, 2023 + 1)) # == Define rules == rule all: @@ -35,6 +35,16 @@ rule all: ("{year}.parquet" if temporal_freq == 'yearly' else "{year}_{month}.parquet"), year=years_list, month=months_list + ) if temporal_freq == 'yearly' else ( + expand( + cfg.datapaths.base_path + "/output/" + polygon_name + "_monthly/pm25__randall__" + polygon_name + "_monthly__{year}_{month}.parquet", + year=years_list, + month=[str(i).zfill(2) for i in range(1, 12 + 1)] + ) + + expand( + cfg.datapaths.base_path + "/output/" + polygon_name + "_monthly/pm25__randall__" + polygon_name + "_monthly__{year}.parquet", + year=years_list + ) ) # remove and use symlink to the us census geoboundaries @@ -85,16 +95,20 @@ rule aggregate_pm25: ) rule concat_monthly: + # This rule is only needed when temporal_freq is 'monthly' to create yearly files + # Combines monthly parquet files into a single yearly parquet file for each year. input: - expand( - f"{cfg.datapaths.base_path}/output/{polygon_name}_monthly/pm25__randall__{polygon_name}_monthly__{{year}}_{month}.parquet", + lambda wildcards: expand( + cfg.datapaths.base_path + "/output/" + polygon_name + "_monthly/pm25__randall__" + polygon_name + "_monthly__" + wildcards.year + "_{month}.parquet", month=[str(i).zfill(2) for i in range(1, 12 + 1)] ) output: - yearly_file=f"{cfg.datapaths.base_path}/output/{polygon_name}_monthly/pm25__randall__{polygon_name}_monthly__{{year}}.parquet", - intermediate_dir=directory(f"{cfg.datapaths.base_path}/output/{polygon_name}_monthly/intermediate") + yearly_file=f"{cfg.datapaths.base_path}/output/{polygon_name}_monthly/pm25__randall__{polygon_name}_monthly__{{year}}.parquet" log: f"logs/concat_monthly_{polygon_name}_{{year}}.log" shell: - f"PYTHONPATH=. python src/concat_monthly.py polygon_name={polygon_name} &> {{log}}" + f"PYTHONPATH=. python src/concat_monthly.py polygon_name={polygon_name} year={{wildcards.year}} &> {{log}}" + + + diff --git a/src/concat_monthly.py b/src/concat_monthly.py index 1f9eb90..8e2d6d7 100644 --- a/src/concat_monthly.py +++ b/src/concat_monthly.py @@ -7,17 +7,16 @@ After consolidation, moves monthly files to an intermediate subdirectory. Usage: - python src/consolidate_monthly_to_yearly.py + python src/concat_monthly.py year=2020 # With overrides: - python src/consolidate_monthly_to_yearly.py polygon_name=zcta + python src/concat_monthly.py polygon_name=zcta year=2020 """ import pandas as pd import os import hydra import logging -import re import shutil from pathlib import Path @@ -25,34 +24,6 @@ LOGGER = logging.getLogger(__name__) -def discover_available_years(input_dir, polygon_name): - """ - Discover all years that have monthly data available. - - Args: - input_dir: Directory containing monthly parquet files - polygon_name: Name of polygon type (county, zcta, etc.) - - Returns: - Set of years (integers) that have at least one monthly file - """ - years = set() - # Pattern: pm25__randall__county_monthly__2020_01.parquet - pattern = re.compile(rf"pm25__randall__{polygon_name}_monthly__(\d{{4}})_\d{{2}}\.parquet") - - if not os.path.exists(input_dir): - LOGGER.warning(f"Input directory does not exist: {input_dir}") - return years - - for filename in os.listdir(input_dir): - match = pattern.match(filename) - if match: - year = int(match.group(1)) - years.add(year) - - return sorted(years) - - def consolidate_year(input_dir, year, polygon_name, input_freq="monthly"): """ Consolidate all monthly files for a given year into a single yearly file. @@ -65,7 +36,7 @@ def consolidate_year(input_dir, year, polygon_name, input_freq="monthly"): input_freq: Frequency of input files (monthly) Returns: - Tuple of (success: bool, monthly_files: list) - list of files that were consolidated + List of monthly files that were consolidated """ monthly_files = [] @@ -83,7 +54,7 @@ def consolidate_year(input_dir, year, polygon_name, input_freq="monthly"): if not monthly_files: LOGGER.error(f"No monthly files found for year {year}") - return False, [] + raise FileNotFoundError(f"No monthly files found for year {year}") if len(monthly_files) != 12: LOGGER.warning(f"Only found {len(monthly_files)}/12 months for year {year}") @@ -103,7 +74,8 @@ def consolidate_year(input_dir, year, polygon_name, input_freq="monthly"): LOGGER.info(f"Saving consolidated file: {output_file} ({len(yearly_df)} rows)") yearly_df.to_parquet(output_file) - return True, monthly_files + return monthly_files + def move_to_intermediate(monthly_files, input_dir): @@ -132,55 +104,32 @@ def move_to_intermediate(monthly_files, input_dir): @hydra.main(config_path="../conf", config_name="config", version_base=None) def main(cfg): """ - Consolidate monthly PM2.5 files into yearly files using Hydra configuration. - Automatically discovers all available years and processes them. + Consolidate monthly PM2.5 files for a specific year into a yearly file using Hydra configuration. After consolidation, moves monthly files to intermediate subdirectory. """ polygon_name = cfg.polygon_name + year = cfg.year # Build path using datapaths configuration - output to same dir as input monthly_dir = f"{cfg.datapaths.base_path}/output/{polygon_name}_monthly" LOGGER.info(f"Polygon name: {polygon_name}") + LOGGER.info(f"Year: {year}") LOGGER.info(f"Monthly directory: {monthly_dir}") - # Discover available years - available_years = discover_available_years(monthly_dir, polygon_name) - - if not available_years: - LOGGER.error(f"No monthly files found in {monthly_dir}") - return - - LOGGER.info(f"Found {len(available_years)} years with monthly data: {min(available_years)}-{max(available_years)}") - - success_count = 0 - fail_count = 0 - all_monthly_files = [] - - # Process each year - for year in available_years: - try: - success, monthly_files = consolidate_year(monthly_dir, year, polygon_name) - if success: - success_count += 1 - all_monthly_files.extend(monthly_files) - else: - fail_count += 1 - except Exception as e: - LOGGER.error(f"Error processing year {year}: {e}") - fail_count += 1 - - LOGGER.info(f"Consolidation complete: {success_count} years successful, {fail_count} failed") - - # Move all monthly files to intermediate directory - if all_monthly_files: - LOGGER.info(f"Moving {len(all_monthly_files)} monthly files to intermediate/") - move_to_intermediate(all_monthly_files, monthly_dir) - else: - LOGGER.warning("No monthly files to move to intermediate directory") + # Process the specified year + try: + monthly_files = consolidate_year(monthly_dir, year, polygon_name) + LOGGER.info(f"Successfully consolidated {len(monthly_files)} monthly files for year {year}") + + # Move monthly files to intermediate directory + move_to_intermediate(monthly_files, monthly_dir) + + except Exception as e: + LOGGER.error(f"Error processing year {year}: {e}") + raise if __name__ == "__main__": main() - From d110a26b8d124561f8cf920c7a43867f782c4b2f Mon Sep 17 00:00:00 2001 From: shreyanalluri Date: Thu, 13 Nov 2025 07:44:25 -0500 Subject: [PATCH 23/29] output monthly aggregations to intermediate directory --- src/aggregate_pm25.py | 4 ++- src/concat_monthly.py | 60 +++++++++++++------------------------------ 2 files changed, 21 insertions(+), 43 deletions(-) diff --git a/src/aggregate_pm25.py b/src/aggregate_pm25.py index cb320b5..b2fe378 100644 --- a/src/aggregate_pm25.py +++ b/src/aggregate_pm25.py @@ -112,14 +112,16 @@ def main(cfg): if cfg.temporal_freq == "yearly": # ignore month since len(filenames) == 1 output_filename = f"pm25__randall__{cfg.polygon_name}_{cfg.temporal_freq}__{cfg.year}.parquet" + output_path = f"{cfg.datapaths.base_path}/output/{cfg.polygon_name}_{cfg.temporal_freq}/{output_filename}" elif cfg.temporal_freq == "monthly": # use month in filename since len(filenames) = 12 month = f"{i + 1:02d}" df["month"] = month output_filename = f"pm25__randall__{cfg.polygon_name}_{cfg.temporal_freq}__{cfg.year}_{month}.parquet" + # Save monthly outputs to intermediate folder + output_path = f"{cfg.datapaths.base_path}/output/{cfg.polygon_name}_{cfg.temporal_freq}/intermediate/{output_filename}" - output_path = f"{cfg.datapaths.base_path}/output/{cfg.polygon_name}_{cfg.temporal_freq}/{output_filename}" df.to_parquet(output_path) # plot aggregation map using geopandas diff --git a/src/concat_monthly.py b/src/concat_monthly.py index 8e2d6d7..d499298 100644 --- a/src/concat_monthly.py +++ b/src/concat_monthly.py @@ -2,9 +2,8 @@ """ Consolidate monthly PM2.5 aggregation files into yearly files. -This script takes the monthly parquet files output from the aggregation pipeline -and combines all 12 months for each year into a single yearly file. -After consolidation, moves monthly files to an intermediate subdirectory. +This script takes the monthly parquet files from the intermediate directory +and combines all 12 months for each year into a single yearly file in the output directory. Usage: python src/concat_monthly.py year=2020 @@ -17,20 +16,20 @@ import os import hydra import logging -import shutil from pathlib import Path logging.basicConfig(level=logging.INFO, format='[%(asctime)s][%(levelname)s] - %(message)s') LOGGER = logging.getLogger(__name__) -def consolidate_year(input_dir, year, polygon_name, input_freq="monthly"): +def consolidate_year(intermediate_dir, output_dir, year, polygon_name, input_freq="monthly"): """ Consolidate all monthly files for a given year into a single yearly file. - Output goes to the same directory as input. + Reads from intermediate directory, outputs to main output directory. Args: - input_dir: Directory containing monthly parquet files (also used for output) + intermediate_dir: Directory containing monthly parquet files + output_dir: Directory to save yearly parquet file year: Year to consolidate polygon_name: Name of polygon type (county, zcta, etc.) input_freq: Frequency of input files (monthly) @@ -41,11 +40,11 @@ def consolidate_year(input_dir, year, polygon_name, input_freq="monthly"): monthly_files = [] # Pattern: pm25__randall__county_monthly__2020_01.parquet - # Collect all monthly files for this year + # Collect all monthly files for this year from intermediate directory for month in range(1, 13): month_str = str(month).zfill(2) filename = f"pm25__randall__{polygon_name}_{input_freq}__{year}_{month_str}.parquet" - filepath = os.path.join(input_dir, filename) + filepath = os.path.join(intermediate_dir, filename) if os.path.exists(filepath): monthly_files.append(filepath) @@ -53,7 +52,7 @@ def consolidate_year(input_dir, year, polygon_name, input_freq="monthly"): LOGGER.warning(f"Missing file: {filepath}") if not monthly_files: - LOGGER.error(f"No monthly files found for year {year}") + LOGGER.error(f"No monthly files found for year {year} in {intermediate_dir}") raise FileNotFoundError(f"No monthly files found for year {year}") if len(monthly_files) != 12: @@ -68,9 +67,9 @@ def consolidate_year(input_dir, year, polygon_name, input_freq="monthly"): if 'month' in yearly_df.columns: yearly_df = yearly_df.sort_values(['month']) - # Save yearly file in the same directory - # Pattern: pm25__randall__county_monthly__2020.parquet (note: keeping "monthly" in parent dir name) - output_file = os.path.join(input_dir, f"pm25__randall__{polygon_name}_monthly__{year}.parquet") + # Save yearly file to output directory (not intermediate) + # Pattern: pm25__randall__county_monthly__2020.parquet + output_file = os.path.join(output_dir, f"pm25__randall__{polygon_name}_monthly__{year}.parquet") LOGGER.info(f"Saving consolidated file: {output_file} ({len(yearly_df)} rows)") yearly_df.to_parquet(output_file) @@ -78,54 +77,31 @@ def consolidate_year(input_dir, year, polygon_name, input_freq="monthly"): -def move_to_intermediate(monthly_files, input_dir): - """ - Move monthly files to an intermediate subdirectory. - - Args: - monthly_files: List of monthly file paths to move - input_dir: Base directory containing the files - """ - intermediate_dir = os.path.join(input_dir, "intermediate") - os.makedirs(intermediate_dir, exist_ok=True) - - LOGGER.info(f"Moving {len(monthly_files)} monthly files to {intermediate_dir}") - - for filepath in monthly_files: - filename = os.path.basename(filepath) - dest_path = os.path.join(intermediate_dir, filename) - shutil.move(filepath, dest_path) - LOGGER.debug(f"Moved {filename} to intermediate/") - - LOGGER.info(f"Successfully moved {len(monthly_files)} files to intermediate/") - - @hydra.main(config_path="../conf", config_name="config", version_base=None) def main(cfg): """ Consolidate monthly PM2.5 files for a specific year into a yearly file using Hydra configuration. - After consolidation, moves monthly files to intermediate subdirectory. + Reads from intermediate directory, outputs to main output directory. """ polygon_name = cfg.polygon_name year = cfg.year - # Build path using datapaths configuration - output to same dir as input + # Build paths using datapaths configuration monthly_dir = f"{cfg.datapaths.base_path}/output/{polygon_name}_monthly" + intermediate_dir = f"{monthly_dir}/intermediate" LOGGER.info(f"Polygon name: {polygon_name}") LOGGER.info(f"Year: {year}") - LOGGER.info(f"Monthly directory: {monthly_dir}") + LOGGER.info(f"Input directory (intermediate): {intermediate_dir}") + LOGGER.info(f"Output directory: {monthly_dir}") # Process the specified year try: - monthly_files = consolidate_year(monthly_dir, year, polygon_name) + monthly_files = consolidate_year(intermediate_dir, monthly_dir, year, polygon_name) LOGGER.info(f"Successfully consolidated {len(monthly_files)} monthly files for year {year}") - # Move monthly files to intermediate directory - move_to_intermediate(monthly_files, monthly_dir) - except Exception as e: LOGGER.error(f"Error processing year {year}: {e}") raise From 4c371b3b149890f4d520ce4c8c89dc3a277938f2 Mon Sep 17 00:00:00 2001 From: shreyanalluri Date: Thu, 13 Nov 2025 08:52:00 -0500 Subject: [PATCH 24/29] updating where intermediate monthly aggregations are stored --- conf/datapaths/cannon_v5gl.yaml | 4 ++++ conf/datapaths/cannon_v6gl.yaml | 4 +++- conf/datapaths/datapaths.yaml | 3 +++ conf/shapefiles/shapefiles.yaml | 3 +++ src/aggregate_pm25.py | 2 +- src/concat_monthly.py | 4 ++-- 6 files changed, 16 insertions(+), 4 deletions(-) diff --git a/conf/datapaths/cannon_v5gl.yaml b/conf/datapaths/cannon_v5gl.yaml index 45845f8..e0c8490 100644 --- a/conf/datapaths/cannon_v5gl.yaml +++ b/conf/datapaths/cannon_v5gl.yaml @@ -6,6 +6,10 @@ dirs: yearly: /n/dominici_lab/lab/lego/environmental/pm25__randall/V5GL/raw/yearly #/n/netscratch/dominici_lab/Lab/pm25__randall__raw/yearly monthly: /n/dominici_lab/lab/lego/environmental/pm25__randall/V5GL/raw/monthly #/n/netscratch/dominici_lab/Lab/pm25__randall__raw/monthly shapefiles: null + + intermediate: + zcta_monthly: /n/dominici_lab/lab/lego/environmental/pm25__randall/V5GL/intermediate/zcta_monthly + county_monthly: /n/dominici_lab/lab/lego/environmental/pm25__randall/V5GL/intermediate/county_monthly output: zcta_yearly: /n/dominici_lab/lab/lego/environmental/pm25__randall/V5GL/zcta_yearly diff --git a/conf/datapaths/cannon_v6gl.yaml b/conf/datapaths/cannon_v6gl.yaml index 50fcbf7..aeb34d7 100644 --- a/conf/datapaths/cannon_v6gl.yaml +++ b/conf/datapaths/cannon_v6gl.yaml @@ -6,7 +6,9 @@ dirs: yearly: /n/dominici_lab/lab/lego/environmental/pm25__randall/V6GL/raw/yearly #/n/netscratch/dominici_lab/Lab/pm25__randall__raw/yearly monthly: /n/dominici_lab/lab/lego/environmental/pm25__randall/V6GL/raw/monthly #/n/netscratch/dominici_lab/Lab/pm25__randall__raw/monthly shapefiles: null - + intermediate: + zcta_monthly: /n/dominici_lab/lab/lego/environmental/pm25__randall/V6GL/intermediate/zcta_monthly + county_monthly: /n/dominici_lab/lab/lego/environmental/pm25__randall/V6GL/intermediate/county_monthly output: zcta_yearly: /n/dominici_lab/lab/lego/environmental/pm25__randall/V6GL/zcta_yearly zcta_monthly: /n/dominici_lab/lab/lego/environmental/pm25__randall/V6GL/zcta_monthly diff --git a/conf/datapaths/datapaths.yaml b/conf/datapaths/datapaths.yaml index 3250093..b732073 100644 --- a/conf/datapaths/datapaths.yaml +++ b/conf/datapaths/datapaths.yaml @@ -7,6 +7,9 @@ dirs: monthly: null shapefiles: null + intermediate: + zcta_monthly: null + county_monthly: null output: zcta_yearly: null zcta_monthly: null diff --git a/conf/shapefiles/shapefiles.yaml b/conf/shapefiles/shapefiles.yaml index 420e8b4..4d9e4f6 100644 --- a/conf/shapefiles/shapefiles.yaml +++ b/conf/shapefiles/shapefiles.yaml @@ -42,6 +42,9 @@ county: # County shapefiles (cartographic boundaries) https://www.census.gov/pro 2022: url: https://www2.census.gov/geo/tiger/GENZ2022/shp/cb_2022_us_county_500k.zip idvar: GEOID + 2023: + url: https://www2.census.gov/geo/tiger/GENZ2023/shp/cb_2023_us_county_500k.zip + idvar: GEOID zcta: # ZCTA shapefiles (cartographic boudaries) https://www.census.gov/programs-surveys/geography/guidance/tiger-data-products-guide.html 2000: diff --git a/src/aggregate_pm25.py b/src/aggregate_pm25.py index b2fe378..2c179bb 100644 --- a/src/aggregate_pm25.py +++ b/src/aggregate_pm25.py @@ -120,7 +120,7 @@ def main(cfg): df["month"] = month output_filename = f"pm25__randall__{cfg.polygon_name}_{cfg.temporal_freq}__{cfg.year}_{month}.parquet" # Save monthly outputs to intermediate folder - output_path = f"{cfg.datapaths.base_path}/output/{cfg.polygon_name}_{cfg.temporal_freq}/intermediate/{output_filename}" + output_path = f"{cfg.datapaths.base_path}/intermediate/{cfg.polygon_name}_{cfg.temporal_freq}/{output_filename}" df.to_parquet(output_path) diff --git a/src/concat_monthly.py b/src/concat_monthly.py index d499298..873b7cd 100644 --- a/src/concat_monthly.py +++ b/src/concat_monthly.py @@ -67,7 +67,7 @@ def consolidate_year(intermediate_dir, output_dir, year, polygon_name, input_fre if 'month' in yearly_df.columns: yearly_df = yearly_df.sort_values(['month']) - # Save yearly file to output directory (not intermediate) + # Save yearly file to output directory # Pattern: pm25__randall__county_monthly__2020.parquet output_file = os.path.join(output_dir, f"pm25__randall__{polygon_name}_monthly__{year}.parquet") LOGGER.info(f"Saving consolidated file: {output_file} ({len(yearly_df)} rows)") @@ -90,7 +90,7 @@ def main(cfg): # Build paths using datapaths configuration monthly_dir = f"{cfg.datapaths.base_path}/output/{polygon_name}_monthly" - intermediate_dir = f"{monthly_dir}/intermediate" + intermediate_dir = f"{cfg.datapaths.base_path}/intermediate/{polygon_name}_monthly" LOGGER.info(f"Polygon name: {polygon_name}") LOGGER.info(f"Year: {year}") From 5fb24d04c8ebcce74e18e5c914d56d40fb34288e Mon Sep 17 00:00:00 2001 From: shreyanalluri Date: Thu, 13 Nov 2025 09:43:43 -0500 Subject: [PATCH 25/29] Updating snakefile inputs and outputs for intermediate directory --- Snakefile | 37 ++++++++++++++----------------------- 1 file changed, 14 insertions(+), 23 deletions(-) diff --git a/Snakefile b/Snakefile index 90e7cc0..55971b5 100644 --- a/Snakefile +++ b/Snakefile @@ -32,19 +32,11 @@ rule all: input: expand( f"{cfg.datapaths.base_path}/output/{polygon_name}_{temporal_freq}/pm25__randall__{polygon_name}_{temporal_freq}__" + - ("{year}.parquet" if temporal_freq == 'yearly' else "{year}_{month}.parquet"), - year=years_list, - month=months_list - ) if temporal_freq == 'yearly' else ( - expand( - cfg.datapaths.base_path + "/output/" + polygon_name + "_monthly/pm25__randall__" + polygon_name + "_monthly__{year}_{month}.parquet", - year=years_list, - month=[str(i).zfill(2) for i in range(1, 12 + 1)] - ) + - expand( - cfg.datapaths.base_path + "/output/" + polygon_name + "_monthly/pm25__randall__" + polygon_name + "_monthly__{year}.parquet", - year=years_list - ) + "{year}.parquet", + year=years_list + ) if temporal_freq == 'yearly' else expand( + f"{cfg.datapaths.base_path}/output/{polygon_name}_monthly/pm25__randall__{polygon_name}_monthly__" + "{year}.parquet", + year=years_list ) # remove and use symlink to the us census geoboundaries @@ -80,11 +72,10 @@ rule aggregate_pm25: ) output: - expand( - f"{cfg.datapaths.base_path}/output/{polygon_name}_{temporal_freq}/pm25__randall__{polygon_name}_{temporal_freq}__" + - ("{{year}}.parquet" if temporal_freq == 'yearly' else "{{year}}_{month}.parquet"), - month=months_list # we only want to expand months_list and keep year as wildcard - ) + [f"{cfg.datapaths.base_path}/output/{polygon_name}_{temporal_freq}/pm25__randall__{polygon_name}_{temporal_freq}__{{year}}.parquet"] if temporal_freq == 'yearly' else [ + f"{cfg.datapaths.base_path}/intermediate/{polygon_name}_{temporal_freq}/pm25__randall__{polygon_name}_{temporal_freq}__{{year}}_{month}.parquet" + for month in months_list + ] log: f"logs/satellite_pm25_{polygon_name}_{{year}}.log" shell: @@ -96,18 +87,18 @@ rule aggregate_pm25: rule concat_monthly: # This rule is only needed when temporal_freq is 'monthly' to create yearly files - # Combines monthly parquet files into a single yearly parquet file for each year. + # Combines monthly parquet files from intermediate directory into a single yearly parquet file input: lambda wildcards: expand( - cfg.datapaths.base_path + "/output/" + polygon_name + "_monthly/pm25__randall__" + polygon_name + "_monthly__" + wildcards.year + "_{month}.parquet", + f"{cfg.datapaths.base_path}/intermediate/{polygon_name}_monthly/pm25__randall__{polygon_name}_monthly__" + wildcards.year + "_{month}.parquet", month=[str(i).zfill(2) for i in range(1, 12 + 1)] ) output: - yearly_file=f"{cfg.datapaths.base_path}/output/{polygon_name}_monthly/pm25__randall__{polygon_name}_monthly__{{year}}.parquet" + yearly_file=f"{cfg.datapaths.base_path}/output/{polygon_name}_monthly/pm25__randall__{polygon_name}_monthly__" + "{year}.parquet" log: - f"logs/concat_monthly_{polygon_name}_{{year}}.log" + f"logs/concat_monthly_{polygon_name}_" + "{year}.log" shell: - f"PYTHONPATH=. python src/concat_monthly.py polygon_name={polygon_name} year={{wildcards.year}} &> {{log}}" + f"PYTHONPATH=. python src/concat_monthly.py polygon_name={polygon_name} " + "year={wildcards.year} &> {log}" From 99df6e73ac159f3384a66b0c3abb223395a23eea Mon Sep 17 00:00:00 2001 From: shreyanalluri Date: Thu, 20 Nov 2025 16:49:55 -0500 Subject: [PATCH 26/29] Updating pipeline to work with lego shapefiles instead of shapefile download --- Snakefile | 10 +-- conf/shapefiles/shapefiles.yaml | 118 +++++++++++++++----------------- src/aggregate_pm25.py | 9 ++- 3 files changed, 66 insertions(+), 71 deletions(-) diff --git a/Snakefile b/Snakefile index 55971b5..94fdb01 100644 --- a/Snakefile +++ b/Snakefile @@ -22,10 +22,10 @@ with initialize(version_base=None, config_path="conf"): satellite_pm25_cfg = cfg.satellite_pm25 shapefiles_cfg = cfg.shapefiles -shapefile_years_list = list(shapefiles_cfg[polygon_name].keys()) +shapefile_years_list = shapefiles_cfg[polygon_name].years months_list = "01" if temporal_freq == 'yearly' else [str(i).zfill(2) for i in range(1, 12 + 1)] -years_list = list(range(2022, 2023 + 1)) +years_list = list(range(1998, 2023 + 1)) # == Define rules == rule all: @@ -42,7 +42,7 @@ rule all: # remove and use symlink to the us census geoboundaries rule download_shapefiles: output: - f"{cfg.datapaths.base_path}/input/shapefiles/shapefile_{polygon_name}_" + "{shapefile_year}/shapefile.shp" + f"{cfg.datapaths.base_path}/input/shapefiles/{shapefiles_cfg[polygon_name].prefix}" + "{shapefile_year}/{shapefiles_cfg[polygon_name].prefix}" + "{shapefile_year}.shp" shell: f"python src/download_shapefile.py polygon_name={polygon_name} " + "shapefile_year={wildcards.shapefile_year}" @@ -60,7 +60,9 @@ rule download_satellite_pm25: def get_shapefile_input(wildcards): shapefile_year = available_shapefile_year(int(wildcards.year), shapefile_years_list) - return f"{cfg.datapaths.base_path}/input/shapefiles/shapefile_{polygon_name}_{shapefile_year}/shapefile.shp" + shapefile_prefix = shapefiles_cfg[polygon_name].prefix + shapefile_name = f"{shapefile_prefix}{shapefile_year}" + return f"{cfg.datapaths.base_path}/input/shapefiles/{polygon_name}_yearly/{shapefile_name}/{shapefile_name}.shp" rule aggregate_pm25: input: diff --git a/conf/shapefiles/shapefiles.yaml b/conf/shapefiles/shapefiles.yaml index 4d9e4f6..e8ba035 100644 --- a/conf/shapefiles/shapefiles.yaml +++ b/conf/shapefiles/shapefiles.yaml @@ -1,69 +1,59 @@ -# cfg.shapefile_year and cfg.polygon_name are used to match the shapefile to dowload +# Shapefile metadata configuration following gridmet_raster2polygon pattern +# Each polygon type contains: +# - years: list of available shapefile years +# - idvar: ID column name in the shapefile +# - prefix: shapefile directory/file naming prefix +# - url_map (optional): URLs for downloading shapefiles by year +# Expected shapefile path structure: {prefix}{year}/{prefix}{year}.shp census_tract: - 2020: - url: https://www2.census.gov/geo/tiger/GENZ2020/shp/cb_2020_us_tract_500k.zip - idvar: GEOID - 2021: - url: https://www2.census.gov/geo/tiger/GENZ2021/shp/cb_2021_us_tract_500k.zip - idvar: GEOID - 2022: - url: https://www2.census.gov/geo/tiger/GENZ2022/shp/cb_2022_us_tract_500k.zip - idvar: GEOID + years: + - 2020 + - 2021 + - 2022 + idvar: "GEOID" + prefix: "cb_us_tract_" + url_map: + 2020: "https://www2.census.gov/geo/tiger/GENZ2020/shp/cb_2020_us_tract_500k.zip" + 2021: "https://www2.census.gov/geo/tiger/GENZ2021/shp/cb_2021_us_tract_500k.zip" + 2022: "https://www2.census.gov/geo/tiger/GENZ2022/shp/cb_2022_us_tract_500k.zip" county: # County shapefiles (cartographic boundaries) https://www.census.gov/programs-surveys/geography/guidance/tiger-data-products-guide.html - 2013: - url: https://www2.census.gov/geo/tiger/GENZ2013/cb_2013_us_county_500k.zip # readme https://www2.census.gov/geo/tiger/GENZ2013/2013_file_name_def.pdf - idvar: GEOID - 2014: - url: https://www2.census.gov/geo/tiger/GENZ2014/shp/cb_2014_us_county_500k.zip - idvar: GEOID - 2015: - url: https://www2.census.gov/geo/tiger/GENZ2015/shp/cb_2015_us_county_500k.zip - idvar: GEOID - 2016: - url: https://www2.census.gov/geo/tiger/GENZ2016/shp/cb_2016_us_county_500k.zip - idvar: GEOID - 2017: - url: https://www2.census.gov/geo/tiger/GENZ2017/shp/cb_2017_us_county_500k.zip - idvar: GEOID - 2018: - url: https://www2.census.gov/geo/tiger/GENZ2018/shp/cb_2018_us_county_500k.zip - idvar: GEOID - 2019: - url: https://www2.census.gov/geo/tiger/GENZ2019/shp/cb_2019_us_county_500k.zip - idvar: GEOID - 2020: - url: https://www2.census.gov/geo/tiger/GENZ2020/shp/cb_2020_us_county_500k.zip - idvar: GEOID - 2021: - url: https://www2.census.gov/geo/tiger/GENZ2021/shp/cb_2021_us_county_500k.zip - idvar: GEOID - 2022: - url: https://www2.census.gov/geo/tiger/GENZ2022/shp/cb_2022_us_county_500k.zip - idvar: GEOID - 2023: - url: https://www2.census.gov/geo/tiger/GENZ2023/shp/cb_2023_us_county_500k.zip - idvar: GEOID + years: + - 2013 + - 2014 + - 2015 + - 2016 + - 2017 + - 2018 + - 2019 + - 2020 + - 2021 + - 2022 + - 2023 + idvar: "county" + prefix: "us_shapefile__census__county_yearly__" + url_map: + 2013: "https://www2.census.gov/geo/tiger/GENZ2013/cb_2013_us_county_500k.zip" + 2014: "https://www2.census.gov/geo/tiger/GENZ2014/shp/cb_2014_us_county_500k.zip" + 2015: "https://www2.census.gov/geo/tiger/GENZ2015/shp/cb_2015_us_county_500k.zip" + 2016: "https://www2.census.gov/geo/tiger/GENZ2016/shp/cb_2016_us_county_500k.zip" + 2017: "https://www2.census.gov/geo/tiger/GENZ2017/shp/cb_2017_us_county_500k.zip" + 2018: "https://www2.census.gov/geo/tiger/GENZ2018/shp/cb_2018_us_county_500k.zip" + 2019: "https://www2.census.gov/geo/tiger/GENZ2019/shp/cb_2019_us_county_500k.zip" + 2020: "https://www2.census.gov/geo/tiger/GENZ2020/shp/cb_2020_us_county_500k.zip" + 2021: "https://www2.census.gov/geo/tiger/GENZ2021/shp/cb_2021_us_county_500k.zip" + 2022: "https://www2.census.gov/geo/tiger/GENZ2022/shp/cb_2022_us_county_500k.zip" + 2023: "https://www2.census.gov/geo/tiger/GENZ2023/shp/cb_2023_us_county_500k.zip" -zcta: # ZCTA shapefiles (cartographic boudaries) https://www.census.gov/programs-surveys/geography/guidance/tiger-data-products-guide.html - 2000: - url: https://www2.census.gov/geo/tiger/GENZ2010/gz_2010_us_860_00_500k.zip # readme https://www2.census.gov/geo/tiger/GENZ2010/ReadMe.pdf - idvar: ZCTA5 - 2010: - url: https://www2.census.gov/geo/tiger/GENZ2019/shp/cb_2019_us_zcta510_500k.zip # latest ZCTA5CE10 available - idvar: ZCTA5CE10 - 2020: - url: https://www2.census.gov/geo/tiger/GENZ2020/shp/cb_2020_us_zcta520_500k.zip # latest ZCTA5CE20 available (to date) - idvar: ZCTA5CE20 - -# zcta: # ZCTA shapefiles (tiger line) https://www.census.gov/programs-surveys/geography/guidance/tiger-data-products-guide.html -# 2000: -# url: https://www2.census.gov/geo/tiger/TIGER2010/ZCTA5/2000/tl_2010_us_zcta500.zip # latest ZCTA5CE00 available -# idvar: ZCTA5CE00 -# 2010: -# url: https://www2.census.gov/geo/tiger/TIGER2020/ZCTA5/tl_2020_us_zcta510.zip # latest ZCTA5CE10 available -# idvar: ZCTA5CE10 -# 2020: -# url: https://www2.census.gov/geo/tiger/TIGER2022/ZCTA520/tl_2022_us_zcta520.zip # latest ZCTA5CE20 available (to date) -# idvar: ZCTA5CE20 +zcta: # ZCTA shapefiles (cartographic boundaries) https://www.census.gov/programs-surveys/geography/guidance/tiger-data-products-guide.html + years: + - 2000 + - 2010 + - 2020 + idvar: "zcta" + prefix: "us_shapefile__census__zcta_yearly__" + url_map: + 2000: "https://www2.census.gov/geo/tiger/GENZ2010/gz_2010_us_860_00_500k.zip" + 2010: "https://www2.census.gov/geo/tiger/GENZ2019/shp/cb_2019_us_zcta510_500k.zip" + 2020: "https://www2.census.gov/geo/tiger/GENZ2020/shp/cb_2020_us_zcta520_500k.zip" diff --git a/src/aggregate_pm25.py b/src/aggregate_pm25.py index 2c179bb..0d722af 100644 --- a/src/aggregate_pm25.py +++ b/src/aggregate_pm25.py @@ -35,13 +35,16 @@ def main(cfg): # == load shapefile LOGGER.info("Loading shapefile.") - shapefile_years_list = list(cfg.shapefiles[cfg.polygon_name].keys()) + shapefile_years_list = cfg.shapefiles[cfg.polygon_name].years #use previously available shapefile shapefile_year = available_shapefile_year(cfg.year, shapefile_years_list) + shapefile_prefix = cfg.shapefiles[cfg.polygon_name].prefix + shapefile_name = f"{shapefile_prefix}{shapefile_year}" - shape_path = f'{cfg.datapaths.base_path}/input/shapefiles/shapefile_{cfg.polygon_name}_{shapefile_year}/shapefile.shp' + shape_path = f'{cfg.datapaths.base_path}/input/shapefiles/{cfg.polygon_name}_yearly/{shapefile_name}/{shapefile_name}.shp' + LOGGER.info(f"Loading shapefile from: {shape_path}") polygon = gpd.read_file(shape_path) - polygon_ids = polygon[cfg.shapefiles[cfg.polygon_name][shapefile_year].idvar].values + polygon_ids = polygon[cfg.shapefiles[cfg.polygon_name].idvar].values # == filenames to be aggregated if cfg.temporal_freq == "yearly": From 76bc6f24f462783a68b8fb41358750e0acd0e8e6 Mon Sep 17 00:00:00 2001 From: shreyanalluri Date: Thu, 20 Nov 2025 19:51:13 -0500 Subject: [PATCH 27/29] update readme to reflect changes to pipeline --- README.md | 103 ++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 93 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 83af885..27a05aa 100644 --- a/README.md +++ b/README.md @@ -29,26 +29,89 @@ Aaron van Donkelaar, Melanie S. Hammer, Liam Bindle, Michael Brauer, Jeffery R. # Codebook -## Dataset Columns: +## Dataset Output Structure -* county aggregations: +The pipeline produces parquet files with PM2.5 aggregations at the polygon level. -* zcta aggregations: +### Yearly aggregations (`temporal_freq=yearly`): + +Output path: `data/V5GL/output/{polygon_name}_yearly/pm25__randall__{polygon_name}_yearly__{year}.parquet` + +Columns: +* `{polygon_id}` (index): County FIPS code, ZCTA code, or Census Tract GEOID depending on polygon type +* `pm25` (float64): Mean PM2.5 concentration (µg/m³) for the year +* `year` (int64): Year of observation + +### Monthly aggregations (`temporal_freq=monthly`): + +Output path: `data/V5GL/output/{polygon_name}_monthly/pm25__randall__{polygon_name}_monthly__{year}.parquet` + +Columns: +* `{polygon_id}` (index): County FIPS code, ZCTA code, or Census Tract GEOID depending on polygon type +* `pm25` (float64): Mean PM2.5 concentration (µg/m³) for the month +* `year` (int64): Year of observation +* `month` (object/int): Month of observation + +### Polygon ID Variables + +The index column name varies by polygon type and is defined in `conf/shapefiles/shapefiles.yaml`: +* **County**: `county` (5-digit FIPS code, e.g., "01001" for Autauga County, Alabama) +* **ZCTA**: `zcta` (5-digit ZIP Code Tabulation Area, e.g., "00601") +* **Census Tract**: `GEOID` (11-digit census tract identifier) + +### Intermediate Files (Monthly only) + +During monthly processing, intermediate files are created: +* Path: `data/V5GL/intermediate/{polygon_name}_monthly/pm25__randall__{polygon_name}_monthly__{year}_{month}.parquet` +* These are concatenated into yearly files in the output directory --- # Configuration files -The configuration structure withing the `/conf` folder allow you to modify the input parameters for the following steps: +The configuration structure within the `/conf` folder allows you to modify the input parameters for the following steps: -* create directory paths: `utils/create_dir_paths.py` +* create directory paths: `src/create_datapaths.py` * download pm25: `src/download_pm25.py` * download shapefiles: `src/download_shapefile.py` * aggregate pm25: `src/aggregate_pm25.py` +* concatenate monthly files: `src/concat_monthly.py` The key parameters are: * `temporal_freq` which determines whether the original yearly or monthly pm25 files will be aggregated. The options are: `yearly` and `monthly`. -* `polygon_name` which determines into which polygons the pm25 grid will the aggregated. The options are: `zcta` and `county`. +* `polygon_name` which determines into which polygons the pm25 grid will be aggregated. The options are: `zcta`, `county`, and `census_tract`. + +## Configuration Structure + +The configuration system uses Hydra with the following structure: + +* `/conf/config.yaml` - Main configuration file with default settings +* `/conf/datapaths/` - Data path configurations for different environments: + * `cannon_v5gl.yaml` - Paths for V5GL data on Cannon cluster + * `cannon_v6gl.yaml` - Paths for V6GL data on Cannon cluster + * `datapaths.yaml` - Template configuration +* `/conf/shapefiles/shapefiles.yaml` - Shapefile metadata including: + * Available years for each polygon type + * ID column names (`idvar`) + * File naming prefixes + * Download URLs (optional, via `url_map`) +* `/conf/satellite_pm25/` - PM2.5 dataset configurations for different versions +* `/conf/snakemake.yaml` - Default parameters for Snakemake workflow + +## Shapefile Configuration + +Shapefiles can be obtained in two ways: + +1. **Symlinks to existing Lab shapefiles** (recommended for Cannon cluster): + ```bash + python src/create_datapaths.py + ``` + This creates symbolic links from the project's `data/` directory to the Lab's existing shapefile repository at `/n/dominici_lab/lab/lego/geoboundaries/`. + +2. **Direct download** from Census Bureau (optional): + Shapefiles can be downloaded automatically if URLs are configured in `conf/shapefiles/shapefiles.yaml`. The download script will use the `url_map` for the specified year. + +The pipeline uses backward compatibility for shapefiles - if PM2.5 data is from a year without an exact shapefile match, it automatically selects the most recent prior shapefile year available. --- @@ -75,17 +138,37 @@ mamba activate ## Input and output paths -Run +The pipeline requires setting up directory paths and symbolic links to data sources. Run: + +```bash +python src/create_datapaths.py +``` + +This script: +* Creates the base directory structure under `data/V5GL/` (or `data/V6GL/` depending on configuration) +* Creates symbolic links to: + * PM2.5 raw data at `/n/dominici_lab/lab/lego/environmental/pm25__randall/` + * Shapefiles at `/n/dominici_lab/lab/lego/geoboundaries/us_geoboundaries__census/us_shapefile__census/` + * Output directories for aggregated results + +To use a different configuration, specify the datapaths config file: ```bash -python utils/create_dir_paths.py +python src/create_datapaths.py datapaths=cannon_v6gl ``` ## Pipeline -You can run the pipeline steps manually or run the snakemake pipeline described in the Snakefile. +The pipeline consists of four main steps: + +1. **Download/link shapefiles**: Obtain or link to US Census shapefiles (counties, ZCTAs, or census tracts) +2. **Download PM2.5 data**: Download satellite PM2.5 NetCDF files from Washington University +3. **Aggregate PM2.5**: Perform spatial aggregation from raster grid to polygons +4. **Concatenate monthly files** (monthly frequency only): Combine monthly parquet files into yearly files + +You can run the pipeline steps manually or use the Snakemake workflow. -**run pipeline steps manually** +### Run pipeline steps manually ```bash python src/download_shapefile.py From 12b41297a6e581c107b4155612bf0f264249bddd Mon Sep 17 00:00:00 2001 From: shreyanalluri Date: Fri, 21 Nov 2025 08:41:23 -0500 Subject: [PATCH 28/29] remove shapefile download entirely --- .gitignore | 3 +++ Snakefile | 7 ------- conf/datapaths/cannon_v5gl.yaml | 4 +++- conf/datapaths/cannon_v6gl.yaml | 6 ++++-- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/.gitignore b/.gitignore index 484f99f..29de853 100644 --- a/.gitignore +++ b/.gitignore @@ -10,6 +10,9 @@ logs/ ## slurm ---- **/slurm* +## data files ---- +data/ + ## Python ---- # Byte-compiled / optimized / DLL files diff --git a/Snakefile b/Snakefile index 94fdb01..f5ac6d6 100644 --- a/Snakefile +++ b/Snakefile @@ -39,13 +39,6 @@ rule all: year=years_list ) -# remove and use symlink to the us census geoboundaries -rule download_shapefiles: - output: - f"{cfg.datapaths.base_path}/input/shapefiles/{shapefiles_cfg[polygon_name].prefix}" + "{shapefile_year}/{shapefiles_cfg[polygon_name].prefix}" + "{shapefile_year}.shp" - shell: - f"python src/download_shapefile.py polygon_name={polygon_name} " + "shapefile_year={wildcards.shapefile_year}" - rule download_satellite_pm25: output: expand( diff --git a/conf/datapaths/cannon_v5gl.yaml b/conf/datapaths/cannon_v5gl.yaml index e0c8490..3df4114 100644 --- a/conf/datapaths/cannon_v5gl.yaml +++ b/conf/datapaths/cannon_v5gl.yaml @@ -5,7 +5,9 @@ dirs: raw: yearly: /n/dominici_lab/lab/lego/environmental/pm25__randall/V5GL/raw/yearly #/n/netscratch/dominici_lab/Lab/pm25__randall__raw/yearly monthly: /n/dominici_lab/lab/lego/environmental/pm25__randall/V5GL/raw/monthly #/n/netscratch/dominici_lab/Lab/pm25__randall__raw/monthly - shapefiles: null + shapefiles: + county_yearly: /n/dominici_lab/lab/lego/geoboundaries/us_geoboundaries__census/us_shapefile__census/county_yearly + zcta_yearly: /n/dominici_lab/lab/lego/geoboundaries/us_geoboundaries__census/us_shapefile__census/zcta_yearly intermediate: zcta_monthly: /n/dominici_lab/lab/lego/environmental/pm25__randall/V5GL/intermediate/zcta_monthly diff --git a/conf/datapaths/cannon_v6gl.yaml b/conf/datapaths/cannon_v6gl.yaml index aeb34d7..313913c 100644 --- a/conf/datapaths/cannon_v6gl.yaml +++ b/conf/datapaths/cannon_v6gl.yaml @@ -5,7 +5,9 @@ dirs: raw: yearly: /n/dominici_lab/lab/lego/environmental/pm25__randall/V6GL/raw/yearly #/n/netscratch/dominici_lab/Lab/pm25__randall__raw/yearly monthly: /n/dominici_lab/lab/lego/environmental/pm25__randall/V6GL/raw/monthly #/n/netscratch/dominici_lab/Lab/pm25__randall__raw/monthly - shapefiles: null + shapefiles: + county_yearly: /n/dominici_lab/lab/lego/geoboundaries/us_geoboundaries__census/us_shapefile__census/county_yearly + zcta_yearly: /n/dominici_lab/lab/lego/geoboundaries/us_geoboundaries__census/us_shapefile__census/zcta_yearly intermediate: zcta_monthly: /n/dominici_lab/lab/lego/environmental/pm25__randall/V6GL/intermediate/zcta_monthly county_monthly: /n/dominici_lab/lab/lego/environmental/pm25__randall/V6GL/intermediate/county_monthly @@ -13,4 +15,4 @@ dirs: zcta_yearly: /n/dominici_lab/lab/lego/environmental/pm25__randall/V6GL/zcta_yearly zcta_monthly: /n/dominici_lab/lab/lego/environmental/pm25__randall/V6GL/zcta_monthly county_yearly: /n/dominici_lab/lab/lego/environmental/pm25__randall/V6GL/county_yearly - county_monthly: /n/dominici_lab/lab/lego/environmental/pm25__randall/V6GL/county_monthly + county_monthly: /n/dominici_lab/lab/lego/environmental/pm25__randall/V6GL/county_monthly \ No newline at end of file From 5799233af4d1f8929c264a21042993484abd06af Mon Sep 17 00:00:00 2001 From: shreyanalluri Date: Fri, 21 Nov 2025 09:42:34 -0500 Subject: [PATCH 29/29] remove shapefile download --- conf/satellite_pm25/global_pm25.yaml | 14 ---------- conf/shapefiles/shapefiles.yaml | 29 +++------------------ src/download_shapefile.py | 38 ---------------------------- 3 files changed, 4 insertions(+), 77 deletions(-) delete mode 100644 conf/satellite_pm25/global_pm25.yaml delete mode 100644 src/download_shapefile.py diff --git a/conf/satellite_pm25/global_pm25.yaml b/conf/satellite_pm25/global_pm25.yaml deleted file mode 100644 index 617e89a..0000000 --- a/conf/satellite_pm25/global_pm25.yaml +++ /dev/null @@ -1,14 +0,0 @@ -yearly: #ACAG-V5GL04-GWRPM25 has lower resolution, ACAG-V5GL04-GWRPM25c0p10 has higher resolution - url: https://wustl.app.box.com/v/ACAG-V5GL04-GWRPM25c0p10/folder/237172480358 - - zipname: Annual - - file_prefix: "V5GL04.HybridPM25c_0p10.Global" - #file name convention is V5GL04.HybridPM25.NorthAmerica.yyyymm-yyyymm.nc - # e.g. V5GL04.HybridPM25.NorthAmerica.201801-201812.nc is the file for 2018 and NorthAmerica - # "V5GL04.HybridPM25.NorthAmerica" #lower resolution - # "V5GL04.HybridPM25c_0p10.NorthAmerica" #higher resolution - -layer: "GWRPM25" #geographic weighted regression PM2.5 -latitude_layer: "lat" -longitude_layer: "lon" \ No newline at end of file diff --git a/conf/shapefiles/shapefiles.yaml b/conf/shapefiles/shapefiles.yaml index e8ba035..42bcf4f 100644 --- a/conf/shapefiles/shapefiles.yaml +++ b/conf/shapefiles/shapefiles.yaml @@ -1,10 +1,8 @@ -# Shapefile metadata configuration following gridmet_raster2polygon pattern +# Shapefile metadata configuration # Each polygon type contains: # - years: list of available shapefile years # - idvar: ID column name in the shapefile # - prefix: shapefile directory/file naming prefix -# - url_map (optional): URLs for downloading shapefiles by year -# Expected shapefile path structure: {prefix}{year}/{prefix}{year}.shp census_tract: years: @@ -13,11 +11,7 @@ census_tract: - 2022 idvar: "GEOID" prefix: "cb_us_tract_" - url_map: - 2020: "https://www2.census.gov/geo/tiger/GENZ2020/shp/cb_2020_us_tract_500k.zip" - 2021: "https://www2.census.gov/geo/tiger/GENZ2021/shp/cb_2021_us_tract_500k.zip" - 2022: "https://www2.census.gov/geo/tiger/GENZ2022/shp/cb_2022_us_tract_500k.zip" - + county: # County shapefiles (cartographic boundaries) https://www.census.gov/programs-surveys/geography/guidance/tiger-data-products-guide.html years: - 2013 @@ -33,19 +27,7 @@ county: # County shapefiles (cartographic boundaries) https://www.census.gov/pro - 2023 idvar: "county" prefix: "us_shapefile__census__county_yearly__" - url_map: - 2013: "https://www2.census.gov/geo/tiger/GENZ2013/cb_2013_us_county_500k.zip" - 2014: "https://www2.census.gov/geo/tiger/GENZ2014/shp/cb_2014_us_county_500k.zip" - 2015: "https://www2.census.gov/geo/tiger/GENZ2015/shp/cb_2015_us_county_500k.zip" - 2016: "https://www2.census.gov/geo/tiger/GENZ2016/shp/cb_2016_us_county_500k.zip" - 2017: "https://www2.census.gov/geo/tiger/GENZ2017/shp/cb_2017_us_county_500k.zip" - 2018: "https://www2.census.gov/geo/tiger/GENZ2018/shp/cb_2018_us_county_500k.zip" - 2019: "https://www2.census.gov/geo/tiger/GENZ2019/shp/cb_2019_us_county_500k.zip" - 2020: "https://www2.census.gov/geo/tiger/GENZ2020/shp/cb_2020_us_county_500k.zip" - 2021: "https://www2.census.gov/geo/tiger/GENZ2021/shp/cb_2021_us_county_500k.zip" - 2022: "https://www2.census.gov/geo/tiger/GENZ2022/shp/cb_2022_us_county_500k.zip" - 2023: "https://www2.census.gov/geo/tiger/GENZ2023/shp/cb_2023_us_county_500k.zip" - + zcta: # ZCTA shapefiles (cartographic boundaries) https://www.census.gov/programs-surveys/geography/guidance/tiger-data-products-guide.html years: - 2000 @@ -53,7 +35,4 @@ zcta: # ZCTA shapefiles (cartographic boundaries) https://www.census.gov/program - 2020 idvar: "zcta" prefix: "us_shapefile__census__zcta_yearly__" - url_map: - 2000: "https://www2.census.gov/geo/tiger/GENZ2010/gz_2010_us_860_00_500k.zip" - 2010: "https://www2.census.gov/geo/tiger/GENZ2019/shp/cb_2019_us_zcta510_500k.zip" - 2020: "https://www2.census.gov/geo/tiger/GENZ2020/shp/cb_2020_us_zcta520_500k.zip" + \ No newline at end of file diff --git a/src/download_shapefile.py b/src/download_shapefile.py deleted file mode 100644 index 0ea0059..0000000 --- a/src/download_shapefile.py +++ /dev/null @@ -1,38 +0,0 @@ -import logging -import os -import zipfile -import hydra -import wget - -@hydra.main(config_path="../conf", config_name="config", version_base=None) -def main(cfg): - url = cfg.shapefiles[cfg.polygon_name][cfg.shapefile_year].url - - tgt = f"{cfg.datapaths.base_path}/input/shapefiles/shapefile_{cfg.polygon_name}_{cfg.shapefile_year}" - - tgtdir = os.path.dirname(tgt) - tgtfile = os.path.basename(tgt) - - tgt = f"{tgtdir}/{tgtfile}" - logging.info(f"Downloading {url}") - wget.download(url, f"{tgt}.zip") - logging.info("Done.") - - # unzip with unzip library - with zipfile.ZipFile(f"{tgt}.zip", "r") as zip_ref: - zip_ref.extractall(tgt) - logging.info(f"Unzipped {tgt} with files:\n {os.listdir(tgt)}") - - # remove dirty zip file - os.remove(f"{tgt}.zip") - logging.info(f"Removed {tgt}.zip") - - logging.info(f"Rename files to shapefile.*") - files = os.listdir(tgt) - for f in files: - _, ext = os.path.splitext(f) - os.rename(f"{tgt}/{f}", f"{tgt}/shapefile{ext}") - logging.info(f"Done.") - -if __name__ == "__main__": - main()