ehratm APIs
metgrid.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 
3 # *******************************************************
4 # * International Data Centre *
5 # * Comprehensive Nuclear Test Ban Treaty Organization *
6 # * Vienna *
7 # * Austria *
8 # *
9 # * Don Morton (DM) *
10 # * Boreal Scientific Computing *
11 # * Fairbanks, Alaska USA *
12 # *******************************************************
13 
14 
15 
16 #-----------------------------------------------------------------------------
17 
18 #import argparse
19 import datetime
20 import logging
21 import os
22 import uuid
23 
24 import f90nml
25 
26 import ehratm.defaults
27 import ehratm.mylogger
28 
29 import nwpservice.wps.namelistwps
30 import nwpservice.wps.metgrid
31 
33 
35 
36 
37 class MetgridWorkflow(object):
38 
39 
40  '''Class for managing preparation and execution of metgrid components
41  '''
42 
43 
44 
45  def __init__(self,
46  wpswrf_distro_path=None,
47  working_rootdir=None,
48  start_time_dt=None,
49  stop_time_dt=None,
50  hours_intvl=None,
51  num_nests=None,
52  ungribbed_data_dict=None,
53  geogrid_data_dict=None,
54  custom_metgrid_tbl_path=None,
55  num_mpi_tasks=None,
56  log_level=None
57  ):
58 
59  '''Check and initialise the class
60 
61 
62  Parameters
63  ----------
64  wpswrf_distro_path : str
65  (Optional) Full path to the WPS/WRF distribution to be used.
66  This is assumed to have been installed in a way that is compatible
67  with the nwpservice module. If arg is not present, uses a default
68  value
69  working_rootdir : str
70  (Optional) Full path to a directory (assumed to have already
71  been created), to be used as scratch space for setting up and
72  running this metgrid instance. If arg is not present, uses a
73  default value
74  start_time_dt : datetime
75  Time of first ungribbed file(s)
76  stop_time_dt : datetime
77  Time of last ungribbed file(s)
78  hours_intvl : int
79  Interval (in hours) between ungribbed files
80  num_nests : int
81  Number of domain nests represented in geogrid data, and in
82  the metgrid data that will be produced
83  num_mpi_tasks : int
84  (Optional) Number of MPI tasks to use for running this instance
85  of metgrid. If arg is not present, non-MPI execution is assumed.
86  ungribbed_data_dict : dict
87  Necessary information on the ungribbed files. We expect dict with
88  entries {'FG_NAME' : <path>, ...}. Note that the 'FG_NAME' is
89  not expected to adhere to any kind of workflow convention. This
90  method simply uses what it's given
91  geogrid_data_dict : dict
92  Necessary information on the geogrid files. We expect dict with
93  entries {'path' : <path>, 'filelist' : [list of geogrid files]}.
94  This implies that the geogrid files are all in the same src
95  directory
96  custom_metgrid_tbl_path : str
97  (Optional) Path to a METGRID.TBL to be used for this run. If arg
98  is not present, then we assume that no custom file is needed.
99  log_level : int
100  Python logging level (e.g. logging.INFO)
101 
102 
103  Example
104  -------
105  These are examples of the geogrid_data_dict ungribbed_data_dict
106 
107  ::
108 
109  geogrid_data_dict = {
110  'path' : '/path/to/dir/with/geogrid/files'
111  'filelist' : ['geo_em.d01.nc', 'geo_em.d02.nc']
112  }
113 
114  ungribbed_data_dict = {
115  'ECMWF_ML' : '/path/to/dir/with/ecmwf_ml/ungribbed_files',
116  'ECMWF_SFC' : '/path/to/dir/with/ecmwf_sfc/ungribbed_files',
117  'ECMWF_PRES' : '/path/to/dir/with/pres/ungribbed_files'
118  }
119  '''
120 
121 
122  if log_level:
123  self._logging_level = log_level
124  else:
125  self._logging_level = DEFAULTS.log_level()
126  LOGGER.setLevel(self._logging_level)
127 
128  # Now I can use the logger
129  LOGGER.debug('started')
130 
131  # Do some checks (need to expand on these over the years)
132  # Essentially, would be good to do a controlled bail here if there's
133  # a problem as opposed to doing it further down
134 
135 
136  if not wpswrf_distro_path:
137  wpswrf_distro_path = DEFAULTS.wpswrf_distro_path()
138  self._wpswrf_distro_path = wpswrf_distro_path
139  LOGGER.debug('wpswrf_distro_path: %s' % self._wpswrf_distro_path)
140  if not os.path.isdir(self._wpswrf_distro_path):
141  raise FileNotFoundError('wpswrf_distro_path not found: %s' %
142  self._wpswrf_distro_path)
143 
144  if not working_rootdir:
145  working_rootdir = DEFAULTS.working_scratch_rootdir()
146  self._working_rootdir = working_rootdir
147  if not os.path.isdir(self._working_rootdir):
148  raise FileNotFoundError('working_rootdir not found: %s' %
149  self._working_rootdir)
150  LOGGER.debug('working_rootdir: %s' % self._working_rootdir)
151 
152 
153  # Basic test of num_nests
154  if not num_nests:
155  raise ValueError("Missing num_nests arg")
156  if num_nests <= 0:
157  raise ValueError("num_nests: %d" % num_nests)
158  self._num_nests = num_nests
159 
160  # If custom_metgrid_tbl_path is an arg, then ensure the path is valid.
161  # If not defined, then leave it that way. We will assume that a
162  # calling process makes the decision about whether the custom
163  # version is used or not (typically dependent on whether we're using
164  # ECMWF ERA data)
165  if custom_metgrid_tbl_path:
166  if not os.path.isfile(custom_metgrid_tbl_path):
167  raise FileNotFoundError(
168  'Path not found: %s' % custom_metgrid_tbl_path)
169  self._custom_metgrid_tbl_path = custom_metgrid_tbl_path
170  else:
171  self._custom_metgrid_tbl_path = None
172 
173  # Check that expected geogrid files are available (use geogrid_data_dict)
174  geogrid_dir = geogrid_data_dict['path_to_files']
175  geogrid_files = geogrid_data_dict['files_list']
176  if len(geogrid_files) != self._num_nests:
177  raise ValueError('Wrong number geogrid files: %d' % len(geogrid_files))
178  for fname in geogrid_files:
179  fpath = os.path.join(geogrid_dir, fname)
180  if not os.path.isfile(fpath):
181  raise FileNotFoundError('fpath: %s' % fpath)
182 
183  # Check that expected ungribbed files are available
184  # (use ungribbed_data_dict)
185  ungrib_fg_keys = list(ungribbed_data_dict.keys())
186  for fg_key in ungrib_fg_keys:
187  ug_path = ungribbed_data_dict[fg_key]
188  LOGGER.debug('ug_path: %s' % ug_path)
189  curr_dt = start_time_dt
190  while curr_dt <= stop_time_dt:
191  LOGGER.debug('curr_dt: %s' % curr_dt)
192  curr_timestr = curr_dt.strftime('%Y-%m-%d_%H')
193  LOGGER.debug('curr_timestr: %s' % curr_timestr)
194  fname = fg_key + ':' + curr_timestr
195  expected_path = os.path.join(ug_path, fname)
196  if not os.path.isfile(expected_path):
197  raise FileNotFoundError('expected_path: %s' % expected_path)
198  curr_dt += datetime.timedelta(hours=hours_intvl)
199 
200  # Use specified number of MPI tasks, or default to 0, which will imply
201  # non-MPI execution
202  self._mpirunpath = None # Just give it a namespace for now
203  if num_mpi_tasks:
204  # Ensure valid number, otherwise raise exception. If the value
205  # looks good, check that the default mpirun executable is
206  if 1 <= num_mpi_tasks <= DEFAULTS.max_mpi_tasks():
207  # Go ahead and check the mpirun path
208  mpirunpath = DEFAULTS.mpirun_path()
209  if os.path.isfile(mpirunpath) and \
210  os.access(mpirunpath, os.X_OK):
211  self._num_mpi_tasks = num_mpi_tasks
212  self._mpirunpath = mpirunpath
213  else:
214  raise FileNotFoundError('mpirun not executable: %s' %
215  mpirunpath)
216  else:
217  raise ValueError('Bad num_mpi_tasks value: %d' % num_mpi_tasks)
218  else:
219  # This default value of 0 will denote non-MPI execution when
220  # invoking the geogrid service
221  self._num_mpi_tasks = 0
222  LOGGER.debug('num_mpi_tasks: %d' % self._num_mpi_tasks)
223 
224 
225  # If we got through all this, save the time info for the run_metgrid()
226  # method
227  self._start_time_dt = start_time_dt
228  self._stop_time_dt = stop_time_dt
229  self._hours_intvl = hours_intvl
230 
231  self._ungribbed_data_dict = ungribbed_data_dict
232  self._geogrid_data_dict = geogrid_data_dict
233 
234  def run_metgrid(self, metgrid_output_stagedir=None):
235 
236 
237  '''Set up and run, via nwpservice module, instance of metgrid
238 
239  Parameters
240  ----------
241  metgrid_output_stagedir : str
242  (Optional) Full path to dir where metgrid files will be staged.
243  If not specified, no staging will be done. If specified, we
244  assume the dir already exists.
245 
246  Returns
247  -------
248  return_dict : dict
249  Dictionary with manifest of the metgrid files and the location
250  (if applicable) of staged files
251  '''
252 
253 
254  LOGGER.debug('Starting run_metgrid()...')
255 
256  # Check the output stagedir
257 
258  # Create namelist.wps
259  namelist_wps_path = os.path.join(self._working_rootdir, 'namelist.wps')
260  LOGGER.debug('namelist_wps_path: %s' % namelist_wps_path)
261 
262  # The share section
263  start_time_str = self._start_time_dt.strftime('%Y%m%d%H%M%S')
264  end_time_str = self._stop_time_dt.strftime('%Y%m%d%H%M%S')
265  interval_secs = int(self._hours_intvl*3600)
266 
267  # Create the list of fg_name for metgrid group
268  fg_name_list = []
269  for ugkey in self._ungribbed_data_dict.keys():
270  fg_name_list.append(ugkey)
271 
272  # The full section_data_dict - note that start_date_list and
273  # end_date_list have to have a "num_nests" number of elements
274  # which we assume are identical. metgrid may not complain, but
275  # may not create all nests if we neglect this
276  section_data_dict = {
277  'share' : {
278  'max_dom' : self._num_nests,
279  'start_date_list' : [start_time_str]*self._num_nests,
280  'end_date_list' : [end_time_str]*self._num_nests,
281  'interval_seconds' : interval_secs
282  },
283  'metgrid' : {
284  'fg_name' : fg_name_list
285  }
286  }
287 
288  myobj = nwpservice.wps.namelistwps.NamelistWpsWriter(
289  destpath=namelist_wps_path,
290  section_data_dict=section_data_dict
291  )
292  LOGGER.debug('Writing namelist: %s' % namelist_wps_path)
293  myobj.write()
294 
295  # Create nwpservice object, then setup and run
296  domainpath = os.path.join(self._working_rootdir, 'metgrid_rundir')
297  LOGGER.debug('domainpath: %s' % domainpath)
298 
299 
300  nwpmetgrid_obj = nwpservice.wps.metgrid.Metgrid(
301  wpswrf_distro_path=self._wpswrf_distro_path,
302  wpswrf_rundir=domainpath,
303  ungribbed_data=self._ungribbed_data_dict,
304  geogriddatadir=self._geogrid_data_dict['path_to_files'],
305  metgridtbl_userdef=self._custom_metgrid_tbl_path,
306  namelist_wps=namelist_wps_path,
307  output_dir=metgrid_output_stagedir,
308  numpes=self._num_mpi_tasks,
309  mpirun_path=self._mpirunpath,
310  log_level=self._logging_level
311  )
312 
313  nwpmetgrid_obj.setup()
314  output_manifest = nwpmetgrid_obj.run()
315  LOGGER.debug('output_manifest: %s' % output_manifest)
316 
317  # If this wasn't successful (even if an output staging dir was
318  # never specified), it should return false
319  stage_success = nwpmetgrid_obj.stage_output(auxfiles=True)
320  if stage_success:
321  staging_dir = metgrid_output_stagedir
322  else:
323  staging_dir = None
324 
325  return_dict = {
326  'metgrid_output_manifest' : output_manifest,
327  'staging_dir' : staging_dir,
328  'hours_intvl' : self._hours_intvl
329  }
330  LOGGER.debug('return_dict: %s' % return_dict)
331 
332 
333 
334  #return output_manifest
335  return return_dict
ehratm.wps.metgrid.MetgridWorkflow._num_nests
_num_nests
Definition: metgrid.py:146
ehratm.defaults
Definition: defaults.py:1
ehratm.wps.metgrid.MetgridWorkflow
Definition: metgrid.py:37
ehratm.wps.metgrid.MetgridWorkflow._start_time_dt
_start_time_dt
Definition: metgrid.py:215
ehratm.wps.metgrid.MetgridWorkflow._mpirunpath
_mpirunpath
Definition: metgrid.py:190
ehratm.wps.metgrid.MetgridWorkflow._stop_time_dt
_stop_time_dt
Definition: metgrid.py:216
ehratm.wps.metgrid.MetgridWorkflow._wpswrf_distro_path
_wpswrf_distro_path
Definition: metgrid.py:126
ehratm.wps.metgrid.MetgridWorkflow._num_mpi_tasks
_num_mpi_tasks
Definition: metgrid.py:199
ehratm.wps.metgrid.MetgridWorkflow._custom_metgrid_tbl_path
_custom_metgrid_tbl_path
Definition: metgrid.py:157
ehratm.defaults.Defaults
Definition: defaults.py:9
ehratm.wps.metgrid.MetgridWorkflow.__init__
def __init__(self, wpswrf_distro_path=None, working_rootdir=None, start_time_dt=None, stop_time_dt=None, hours_intvl=None, num_nests=None, ungribbed_data_dict=None, geogrid_data_dict=None, custom_metgrid_tbl_path=None, num_mpi_tasks=None, log_level=None)
Definition: metgrid.py:45
ehratm.wps.metgrid.MetgridWorkflow._hours_intvl
_hours_intvl
Definition: metgrid.py:217
ehratm.wps.metgrid.MetgridWorkflow._geogrid_data_dict
_geogrid_data_dict
Definition: metgrid.py:220
ehratm.wps.metgrid.MetgridWorkflow._working_rootdir
_working_rootdir
Definition: metgrid.py:134
ehratm.wps.metgrid.MetgridWorkflow.run_metgrid
def run_metgrid(self, metgrid_output_stagedir=None)
Definition: metgrid.py:234
ehratm.wps.metgrid.MetgridWorkflow._ungribbed_data_dict
_ungribbed_data_dict
Definition: metgrid.py:219
ehratm.mylogger.getlogger
def getlogger()
Definition: mylogger.py:11
ehratm.mylogger
Definition: mylogger.py:1
ehratm.wps.metgrid.MetgridWorkflow._logging_level
_logging_level
Definition: metgrid.py:111