Demo of Full Simulation Usage
Following on from Demo of Single-Component Usage, there are a couple of workflow driver examples in the tests/demos/ directory of the nwpservice package. The full_wpswrf_workflow_driver.py
code demonstrates a basic example of the WPS preprocessing and WRF execution. The full_workflow_driver_test_anne_eu.py
, discussed in this section, is a more ambitious demo, based on one of Anne’s case studies. It starts with ERA input data which comes in the form of surface and 3d data files, each of which need to be ungribbed. They then require additional processing because ECMWF data comes in model levels, and WRF needs to work with pressure levels.
For convenience, the code, although long, is appended to the end of this document.
Like the component demonstrations, the code starts with a set of hard-coded system- and test-specific values. In this case, the code has been tested for this documentation in the EHRATM_VM
environment, and paths for the CTBTO_DEVLAN
environment need to be edited. There are a number of variables specific to this test case, and it won’t work for some of the simpler cases, because of its complexity (try using full_wpswrf_workflow_driver.py
for something more basic, without the ERA inputs and their resulting additional processing requirements).
There are plenty of things that can go wrong in the execution of this demo code but, in order to keep things simple, there is almost no error handling. A reliable production code would likely be more than two or three times longer, with extensive code for checking returned values and reacting to run-time exceptions. As described in the component demonstrations, each of the calls to a component’s run()
method will return a manifest of the output files produced, and it is up to the programmer to add in code that ensures that expected files are produced.
The general execution of full_workflow_driver_test_anne_eu.py
is as follows
Create a root directory system for the various run directories (one for each component invocation) and staged output files
Create an ungrib object for the model level ERA met data, set it up, and run it, staging the output files in
outputdir_root_path
. Note the use of custom VTable for the ERA model level data.Create an ungrib object for the surface level ERA met data, set it up, and run it, staging the output files in
outputdir_root_path
. Note the use of custom VTable for the ERA surface dataUsing the ungribbed data from the previous stages, create an object for creating ungribbed files based on pressure levels, set it up, run it, and stage the output.
Based on specifications in a provided namelist, generate geogrid files that contain the static fields such as topography, landuse, etc.
Using geogrid files, as well as the previously generated collection of ungribbed files, create a metgrid object for mapping everything to the domain of interest, and writing those to a set of metgrid output files.
Using the metgrid output files generated in previous step, run the real component to package up the data into files of initial conditions and lateral boundary conditions.
Using the initial and lateral boundary conditions generated in previous step, run the WRF simulation
Using the WRF output files generated in previous step, run Flexpart WRF.
Upon successful completion of all of the above (again, there is plenty that can go wrong, and this program does not attempt to check for any of this), the output run directory will look as follows:
$ tree -L 2 -F ~/tmp/full_workflow_test_20230813.021753
/home/ctbtuser/tmp/full_workflow_test_20230813.021753
├── calcecmwfp_rundir/
│ ├── GEOG_DATA -> /scratch/WRFV4.3-Distribution/GEOG_DATA/
│ ├── WPS/
│ └── WRF/
├── flexwrf_rundir/
│ └── flexwrf_rundir/
├── geogrid_rundir/
│ ├── GEOG_DATA -> /scratch/WRFV4.3-Distribution/GEOG_DATA/
│ ├── WPS/
│ └── WRF/
├── metgrid_rundir/
│ ├── GEOG_DATA -> /scratch/WRFV4.3-Distribution/GEOG_DATA/
│ ├── WPS/
│ └── WRF/
├── real_rundir/
│ ├── GEOG_DATA -> /scratch/WRFV4.3-Distribution/GEOG_DATA/
│ ├── WPS/
│ └── WRF/
├── ungrib_ml_rundirl/
│ ├── GEOG_DATA -> /scratch/WRFV4.3-Distribution/GEOG_DATA/
│ ├── WPS/
│ └── WRF/
├── ungrib_sfc_rundir/
│ ├── GEOG_DATA -> /scratch/WRFV4.3-Distribution/GEOG_DATA/
│ ├── WPS/
│ └── WRF/
└── wrf_rundir/
├── GEOG_DATA -> /scratch/WRFV4.3-Distribution/GEOG_DATA/
├── WPS/
└── WRF/
30 directories, 0 files
Note that each component has its own run directory and, just as with the component demos, each one of these is a fully functional virtual run environment. The developer and user can navigate into the directory, set up (assuming all went correctly) with links to needed input files, and execute, debug, or experiment independently of the other components.
The program has staged the output files to a common directory
$ ls full_workflow_output_20230813.021753
dates PRES:2014-01-24_06
ecmwf_coeffs rsl.error.0000
flxout_d01_20140124_010000.nc rsl.error.0001
geo_em.d01.nc rsl.out.0000
geogrid.log.0000 rsl.out.0001
geogrid.log.0001 standalone_wrfout
header_d01.nc SURF:2014-01-24_00
met_em.d01.2014-01-24_00:00:00.nc SURF:2014-01-24_03
met_em.d01.2014-01-24_03:00:00.nc SURF:2014-01-24_06
met_em.d01.2014-01-24_06:00:00.nc ungrib.log
metgrid.log.0000 wrfbdy_d01
metgrid.log.0001 wrfinput_d01
MODLVL:2014-01-24_00 wrfout_d01_2014-01-24_00:00:00
MODLVL:2014-01-24_03 wrfout_d01_2014-01-24_01:00:00
MODLVL:2014-01-24_06 wrfout_d01_2014-01-24_02:00:00
namelist.input wrfout_d01_2014-01-24_03:00:00
namelist.wps wrfout_d01_2014-01-24_04:00:00
PRES:2014-01-24_00 wrfout_d01_2014-01-24_05:00:00
PRES:2014-01-24_03 wrfout_d01_2014-01-24_06:00:00
The preprocessing stages should execute somewhat quickly, but the actual WRF component may take 5-10 minutes (depending on how many MPI tasks you can assign - see the variable settings at the top of the program), and the Flexpart WRF component might take up to five minutes.
Code listing of full_workflow_driver_test_anne_eu.py
#!/usr/bin/env python3
import datetime
import glob
import logging
import os
import shutil
import subprocess
import sys
import nwpservice.wps.ungrib as ungrib
import nwpservice.wps.calcecmwfp as calcecmwfp
import nwpservice.wps.geogrid as geogrid
import nwpservice.wps.metgrid as metgrid
import nwpservice.wrf.real as real
import nwpservice.wrf.wrf as wrf
import nwpservice.flexwrf.flexwrf as flexwrf
"""
Demonstration of typical WPS/WRF/FlexWRF workflow using ECMWF
input GRIB files for WPS.
Error checking is left out of this so that we can focus on the basic
steps
"""
## Uncomment exactly one of the following
SYSTEM_NAME = 'EHRATM_VM'
#SYSTEM_NAME = 'CTBTO_DEVLAN'
#### These values are generally system-specific
if SYSTEM_NAME == 'EHRATM_VM':
USER_ROOT = '/home/ctbtuser'
REPOSITORY_PATH = os.path.join(USER_ROOT, 'git/high-res-atm') # Local git repo
WPSWRF_COMPONENTS_PYTHONPATH = os.path.join(REPOSITORY_PATH,
'packages/nwpservice/src')
TMP_ROOT_DIR = os.path.join(USER_ROOT, 'tmp') # Dir for temp files
OUTPUT_ROOT_DIR = os.path.join(USER_ROOT, 'tmp') # Dir for output products
WPSWRF_DISTRO = '/scratch/WRFV4.3-Distribution' # Dir of WPS/WRF distro
FLEXWRF_DISTRO = '/scratch/FLEXWRFv3.3-gnu-Distirbution-2022-02-03' # Dir of FlexWRF distro
MPIRUN = '/usr/lib64/openmpi/bin/mpirun' # mpirun executable
elif SYSTEM_NAME == 'CTBTO_DEVLAN':
USER_ROOT = '/dvlscratch/ATM/morton'
REPOSITORY_PATH = os.path.join(USER_ROOT, 'git/high-res-atm') # Local git repo
WPSWRF_COMPONENTS_PYTHONPATH = os.path.join(REPOSITORY_PATH,
'packages/nwpservice/src')
TMP_ROOT_DIR = os.path.join(USER_ROOT, 'tmp') # Dir for temp files
OUTPUT_ROOT_DIR = os.path.join(USER_ROOT, 'tmp') # Dir for output products
WPSWRF_DISTRO = '/scratch/WRFV4.3-Distribution' # Dir of WPS/WRF distro
FLEXWRF_DISTRO = '/scratch/FLEXWRFv3.3-gnu-Distirbution-2022-02-03' # Dir of FlexWRF distro
MPIRUN = '/usr/lib64/openmpi/bin/mpirun' # mpirun executable
else:
print('SYSTEM_NAME not supported: %s' % SYSTEM_NAME)
sys.exit()
# Testcase files, hardcoded for this single test
TESTCASE_DATA_DIR = os.path.join(REPOSITORY_PATH,
'packages/nwpservice/tests/testcase_data',
'ecmwf_anne_2014_eu'
)
TESTCASE_NAMELIST_WPS_GEOGRID = os.path.join(TESTCASE_DATA_DIR,
'WPS/namelist.wps.geogrid')
TESTCASE_NAMELIST_WPS_UNGRIB = os.path.join(TESTCASE_DATA_DIR,
'WPS/namelist.wps.ungrib')
TESTCASE_NAMELIST_WPS_METGRID = os.path.join(TESTCASE_DATA_DIR,
'WPS/namelist.wps.metgrid')
TESTCASE_NAMELIST_INPUT = os.path.join(TESTCASE_DATA_DIR, 'WRF/namelist.input')
TESTCASE_MODELLEVEL_METDATA_DIR = os.path.join(TESTCASE_DATA_DIR, 'metdata',
'metdata_ml')
TESTCASE_SURFACE_METDATA_DIR = os.path.join(TESTCASE_DATA_DIR, 'metdata',
'metdata_sfc')
TESTCASE_NAMELIST_WPS_GEOGRID = os.path.join(TESTCASE_DATA_DIR,
'WPS/namelist.wps.geogrid')
TESTCASE_NAMELIST_WPS_ML = os.path.join(TESTCASE_DATA_DIR,
'WPS/namelist.wps.ungrib_ml')
TESTCASE_UNGRIB_PREFIX_ML = 'MODLVL'
TESTCASE_NAMELIST_WPS_SFC = os.path.join(TESTCASE_DATA_DIR,
'WPS/namelist.wps.ungrib_sfc')
TESTCASE_UNGRIB_PREFIX_SFC = 'SURF'
TESTCASE_NAMELIST_WPS_CALCECMWFP = os.path.join(TESTCASE_DATA_DIR,
'WPS/namelist.wps.calcecmwfp')
TESTCASE_NAMELIST_WPS_METGRID = os.path.join(TESTCASE_DATA_DIR,
'WPS/namelist.wps.metgrid')
TESTCASE_VTABLE_ML = os.path.join(TESTCASE_DATA_DIR,
'WPS/Vtable.ECMWF.ml')
TESTCASE_VTABLE_SFC = os.path.join(TESTCASE_DATA_DIR,
'WPS/Vtable.ECMWF.sfc')
TESTCASE_ECMWF_COEFFS = os.path.join(TESTCASE_DATA_DIR,
'WPS/ecmwf_coeffs')
TESTCASE_METGRIDTBL_USERDEF = os.path.join(TESTCASE_DATA_DIR,
'WPS/METGRID.TBL')
TESTCASE_NAMELIST_INPUT = os.path.join(TESTCASE_DATA_DIR,
'WRF/namelist.input')
TESTCASE_FLEXP_INPUT_TXT = os.path.join(TESTCASE_DATA_DIR,
'FLEXWRF/flxp_input.txt')
NUM_MPI_TASKS_WPS = 2
NUM_MPI_TASKS_WRF = 2
# If set to True, the run directories will be retained
NO_CLEANUP = True
def main():
print('This is the full workflow driver test')
# Create a root run and output directory for the various components
utc_timestamp = datetime.datetime.utcnow()
time_str = utc_timestamp.strftime('%Y%m%d.%H%M%S')
rundir_root_name = "full_workflow_test_" + time_str
rundir_root_path = os.path.join(TMP_ROOT_DIR, rundir_root_name)
os.mkdir(rundir_root_path, 0o755)
print('Created rundir root dir: %s' % rundir_root_path)
outputdir_root_name = "full_workflow_output_" + time_str
outputdir_root_path = os.path.join(TMP_ROOT_DIR, outputdir_root_name)
os.mkdir(outputdir_root_path, 0o755)
print('Created outputdir root dir: %s' % outputdir_root_path)
# ungrib ecmwf model level
rundir = os.path.join(rundir_root_path, 'ungrib_ml_rundirl')
my_ungrib = ungrib.Ungrib(
wpswrf_distro_path=WPSWRF_DISTRO,
wpswrf_rundir=rundir,
namelist_wps=TESTCASE_NAMELIST_WPS_ML,
vtable_userdef=TESTCASE_VTABLE_ML,
metdatadir=TESTCASE_MODELLEVEL_METDATA_DIR,
metdatatype='ecmwfml',
output_dir=outputdir_root_path,
log_level=logging.DEBUG
)
my_ungrib.setup()
my_ungrib.run()
my_ungrib.stage_output(auxfiles=True)
# ungrib ecmwf surface level
rundir = os.path.join(rundir_root_path, 'ungrib_sfc_rundir')
my_ungrib = ungrib.Ungrib(
wpswrf_distro_path=WPSWRF_DISTRO,
wpswrf_rundir=rundir,
namelist_wps=TESTCASE_NAMELIST_WPS_SFC,
vtable_userdef=TESTCASE_VTABLE_SFC,
metdatadir=TESTCASE_SURFACE_METDATA_DIR,
metdatatype='ecmwfsfc',
output_dir=outputdir_root_path,
log_level=logging.DEBUG
)
my_ungrib.setup()
my_ungrib.run()
my_ungrib.stage_output(auxfiles=True)
# Use calc_ecmwf_p.exe to create file of pressure level data
rundir = os.path.join(rundir_root_path, 'calcecmwfp_rundir')
ungribbed_data_dict = {
TESTCASE_UNGRIB_PREFIX_ML : outputdir_root_path,
TESTCASE_UNGRIB_PREFIX_SFC : outputdir_root_path
}
my_calcecmwfp = calcecmwfp.CalcEcmwfPlevels(
wpswrf_distro_path=WPSWRF_DISTRO,
wpswrf_rundir=rundir,
ungribbed_data=ungribbed_data_dict,
ecmwf_coeffs_path=TESTCASE_ECMWF_COEFFS,
namelist_wps=TESTCASE_NAMELIST_WPS_CALCECMWFP,
output_dir=outputdir_root_path,
log_level=logging.DEBUG
)
my_calcecmwfp.setup()
my_calcecmwfp.run()
my_calcecmwfp.stage_output(auxfiles=True)
# This structure is needed so that we can pass multiple ungribbed
# types to metgrid. We always expect the output prefix for calcecmwfp
# to be PRES
ungribbed_data = {
TESTCASE_UNGRIB_PREFIX_ML : outputdir_root_path,
TESTCASE_UNGRIB_PREFIX_SFC : outputdir_root_path,
'PRES' : outputdir_root_path
}
# geogrid
rundir = os.path.join(rundir_root_path, 'geogrid_rundir')
my_geogrid = geogrid.Geogrid(
wpswrf_distro_path=WPSWRF_DISTRO,
wpswrf_rundir=rundir,
namelist_wps=TESTCASE_NAMELIST_WPS_GEOGRID,
output_dir=outputdir_root_path,
numpes=NUM_MPI_TASKS_WPS,
mpirun_path=MPIRUN,
log_level=logging.DEBUG
)
my_geogrid.setup()
my_geogrid.run()
my_geogrid.stage_output(auxfiles=True)
# metgrid
rundir = os.path.join(rundir_root_path, 'metgrid_rundir')
my_metgrid = metgrid.Metgrid(
wpswrf_distro_path=WPSWRF_DISTRO,
wpswrf_rundir=rundir,
namelist_wps=TESTCASE_NAMELIST_WPS_METGRID,
metgridtbl_userdef=TESTCASE_METGRIDTBL_USERDEF,
ungribbed_data=ungribbed_data,
geogriddatadir=outputdir_root_path,
output_dir=outputdir_root_path,
numpes=NUM_MPI_TASKS_WPS,
mpirun_path=MPIRUN,
log_level=logging.DEBUG
)
my_metgrid.setup()
my_metgrid.run()
my_metgrid.stage_output(auxfiles=True)
# real
rundir = os.path.join(rundir_root_path, 'real_rundir')
my_real = real.Real(
wpswrf_distro_path=WPSWRF_DISTRO,
wpswrf_rundir=rundir,
namelist_input=TESTCASE_NAMELIST_INPUT,
metgriddatadir=outputdir_root_path,
output_dir=outputdir_root_path,
numpes=NUM_MPI_TASKS_WRF,
mpirun_path=MPIRUN,
log_level=logging.DEBUG
)
my_real.setup()
my_real.run()
my_real.stage_output(auxfiles=True)
# wrf
rundir = os.path.join(rundir_root_path, 'wrf_rundir')
my_wrf = wrf.Wrf(
wpswrf_distro_path=WPSWRF_DISTRO,
wpswrf_rundir=rundir,
namelist_input=TESTCASE_NAMELIST_INPUT,
realdatadir=outputdir_root_path,
output_dir=outputdir_root_path,
numpes=NUM_MPI_TASKS_WRF,
mpirun_path=MPIRUN,
log_level=logging.DEBUG
)
my_wrf.setup()
my_wrf.run()
my_wrf.stage_output(auxfiles=True)
# flexwrf
# TEMPORARY UGLY HACK
# The current implementation assumes that the wrfout files
# are in a directory all by themselves. It's not a good
# assumption, but for now that's how it is, so I need to
# create a new dir, then, in the new dir, make links to the
# wrfout files so they're in that dir all by themselves
standalone_wrfout_dir = os.path.join(outputdir_root_path,
'standalone_wrfout')
os.mkdir(standalone_wrfout_dir, 0o755)
wrfout_files = [ os.path.basename(f) for f in
glob.glob(outputdir_root_path + '/wrfout_d0*')]
for f in wrfout_files:
os.symlink( os.path.join(outputdir_root_path, f),
os.path.join(standalone_wrfout_dir, f) )
###############################
metfilespath_list = [standalone_wrfout_dir]
rundir = os.path.join(rundir_root_path, 'flexwrf_rundir')
my_flexwrf = flexwrf.FlexWrf(
flexwrf_distro_path=FLEXWRF_DISTRO,
flexwrf_rundir=rundir,
flexp_input_txt=TESTCASE_FLEXP_INPUT_TXT,
metfilespath_list=metfilespath_list,
output_dir=outputdir_root_path,
log_level=logging.DEBUG
)
my_flexwrf.setup()
my_flexwrf.run()
my_flexwrf.stage_output(auxfiles=True)
if NO_CLEANUP:
print('Retaining run directories: %s' % rundir_root_path)
print('Output files in: %s' % outputdir_root_path)
if __name__=="__main__":
main()