ehratm APIs
flexwrf.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 
3 # *******************************************************
4 # * International Data Centre *
5 # * Comprehensive Nuclear Test Ban Treaty Organization *
6 # * Vienna *
7 # * Austria *
8 # *
9 # * Don Morton (DM) *
10 # * Boreal Scientific Computing *
11 # * Fairbanks, Alaska USA *
12 # *******************************************************
13 
14 
15 
16 #-----------------------------------------------------------------------------
17 
18 #import argparse
19 import datetime
20 import logging
21 import os
22 import uuid
23 
24 import f90nml
25 
26 import ehratm.defaults
27 import ehratm.mylogger
28 
29 import nwpservice.flexwrf.flexwrf
30 
32 
34 
35 
36 class FlexwrfWorkflow(object):
37 
38 
39  '''Class for managing preparation and execution of flexwrf components
40  '''
41 
42 
43 
44  def __init__(self,
45  flexwrf_distro_path=None,
46  working_rootdir=None,
47  start_time_dt=None,
48  stop_time_dt=None,
49  outgrid_defn_dict=None,
50  flexwrf_specs_dict=None,
51  wrfout_hours_intvl=None,
52  wrfout_num_nests=None,
53  wrfout_data_dict_list=None,
54  num_mpi_tasks=None,
55  num_openmp_threads=None,
56  log_level=None,
57  bypass_flexp_input=None,
58  ):
59 
60  '''Check and initialise the class
61 
62 
63  Parameters
64  ----------
65  flexwrf_distro_path : str
66  (Optional) Full path to the FlexWRF distribution to be used.
67  This is assumed to have been installed in a way that is compatible
68  with the nwpservice module. If arg is not present, uses a default
69  value
70  working_rootdir : str
71  (Optional) Full path to a directory (assumed to have already
72  been created), to be used as scratch space for setting up and
73  running this flexwrf instance. If arg is not present, uses a
74  default value
75  start_time_dt : datetime
76  Time of first wrfout file(s), start time of flexwrf sim
77  stop_time_dt : datetime
78  Time of last wrfout file(s), end time of flexwrf sim
79  outgrid_defn_dict : dict
80  Dict of outgrid definitions (of use if generating flexwrf namelist)
81  flexwrf_specs_dict : dict
82  For future use when the user input file will be generated by
83  this routine (for now, it's "bypassed" with a complete and correct
84  flexp_input)
85  wrfout_hours_intvl : int
86  Number of expected hours between wrfout files. For now, it is
87  required, even when using bypass_flexp_input
88  wrfout_num_nests : int
89  Number of nests represented by wrfout files. For now, it is
90  required, even when using bypass_flexp_input
91  wrfout_data_dict_list : list
92  Necessary information on the wrfout files. We expect
93  list of dicts, one for each nest, with entries {'nest' : n,
94  'path' : <path>, 'filelist' : [list of wrfout files]}. At
95  this level, it's OK if the path is the same for different nests
96  (e.g. they might all be in the same wrf rundir)
97  num_mpi_tasks : int
98  (Optional) Number of MPI tasks to use for running this instance
99  of flexwrf. If arg is not present, non-MPI execution is assumed.
100  num_openmp_threads : int
101  (Optional) Number of OpenMP threads to use for running this
102  instance of flexwrf. If arg is not present, non-OpenMP execution
103  is assumed.
104  log_level : int
105  Python logging level (e.g. logging.INFO)
106  bypass_flexp_input : str
107  This is an assumed-correct flexwrf namelist to be used for
108  execution of flexwrf. Its presence bypasses the normal process
109  of creating a namelist, and puts complete trust in the
110  correctness of the provided namelist If this argument
111  is present, then any values for start_time_dt, stop_time_dt,
112  hours_intvl and num_nests are ignored. This is meant primarily
113  for devtest operations, and probably shouldn't be used for
114  normal workflows.
115 
116 
117  Example
118  -------
119  #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
120  Change this for assumption of single dir
121 
122  These are examples of the wrfout_data_dict_list
123 
124  ::
125 
126  wrfout_data_dict_list = [
127  {
128  'nest' : 1,
129  'path' : '/path/to/dir/with/wrfout_d01/files'
130  'filelist' : ['wrfout_d01_2014-01-24_00:00:00', ...]
131  },
132  {
133  'nest' : 2,
134  'path' : '/path/to/dir/with/wrfout_d02/files'
135  'filelist' : ['wrfout_d02_2014-01-24_00:00:00', ...]
136  }
137  ]
138  '''
139 
140  if log_level:
141  self._logging_level = log_level
142  else:
143  self._logging_level = DEFAULTS.log_level()
144  LOGGER.setLevel(self._logging_level)
145 
146  # Now I can use the logger
147  LOGGER.debug('started')
148 
149  # Do some checks (need to expand on these over the years)
150  # Essentially, would be good to do a controlled bail here if there's
151  # a problem as opposed to doing it further down
152 
153 
154  # Check bypass_flexp_input arg. If present, verify good
155  # path. And, if present, some of the following checks will be
156  # ignored
158  if bypass_flexp_input:
159  if not os.path.isfile(bypass_flexp_input):
160  raise FileNotFoundError('bypass_flexp_input not found: %s' %
161  bypass_flexp_input)
162  self._bypass_flexp_input = bypass_flexp_input
163  LOGGER.debug('bypass_flexp_input: %s' %
164  self._bypass_flexp_input)
165 
166 
167  # Check for path to FlexWRF distribution, and try to use default
168  # if it wasn't passed in as an arg
169  if not flexwrf_distro_path:
170  flexwrf_distro_path = DEFAULTS.flexwrf_distro_path()
171  self._flexwrf_distro_path = flexwrf_distro_path
172  LOGGER.debug('flexwrf_distro_path: %s' % self._flexwrf_distro_path)
173  if not os.path.isdir(self._flexwrf_distro_path):
174  raise FileNotFoundError('flexwrf_distro_path not found: %s' %
176 
177  # Check for the working root dir, and try to use a default
178  # value if it wasn't passed in as an arg
179  if not working_rootdir:
180  working_rootdir = DEFAULTS.working_scratch_rootdir()
181  self._working_rootdir = working_rootdir
182  if not os.path.isdir(self._working_rootdir):
183  raise FileNotFoundError('working_rootdir not found: %s' %
184  self._working_rootdir)
185  LOGGER.debug('working_rootdir: %s' % self._working_rootdir)
186 
187  # Basic test of wrfout_num_nests
188  if not wrfout_num_nests:
189  raise ValueError("Missing wrfout_num_nests arg")
190  if wrfout_num_nests <= 0:
191  raise ValueError("wrfout_num_nests: %d" % wrfout_num_nests)
192  self._wrfout_num_nests = wrfout_num_nests
193  LOGGER.debug('wrfout_num_nests: %d' % self._wrfout_num_nests)
194 
195  # Check that wrfout_data_dict_list has correct number of expected
196  # nests
197  if len(wrfout_data_dict_list) != wrfout_num_nests:
198  raise ValueError(
199  'wrfout_data_dict_list unexpected number of nests: %d' %
200  len(wrfout_data_dict_list))
201 
202  # Check that the files listed in wrfout_data_dict_list are
203  # all accessible
204  for wrfout_nest_dict in wrfout_data_dict_list:
205 
206  wrfout_dir = wrfout_nest_dict['path']
207  wrfout_files = wrfout_nest_dict['filelist']
208 
209  for fname in wrfout_files:
210  fpath = os.path.join(wrfout_dir, fname)
211  if not os.path.isfile(fpath):
212  raise FileNotFoundError('fpath: %s' % fpath)
213 
214 
215 
216  # Some stuff to add later
217  #
218  # If we're not using bypass_flexp_input, make sure we have
219  # dicts for various parameters, OUTGRID, etc.
220  #
221  # Consider checking for wrfout_hours_intvl? I'm not sure yet.
222 
223 
224 
225  # Use specified number of MPI tasks, or default to 0, which will imply
226  # non-MPI execution
227  self._mpirunpath = None # Just give it a namespace for now
228  if num_mpi_tasks:
229  # Ensure valid number, otherwise raise exception. If the value
230  # looks good, check that the default mpirun executable is
231  if 1 <= num_mpi_tasks <= DEFAULTS.max_mpi_tasks():
232  # Go ahead and check the mpirun path
233  mpirunpath = DEFAULTS.mpirun_path()
234  if os.path.isfile(mpirunpath) and \
235  os.access(mpirunpath, os.X_OK):
236  self._num_mpi_tasks = num_mpi_tasks
237  self._mpirunpath = mpirunpath
238  else:
239  raise FileNotFoundError('mpirun not executable: %s' %
240  mpirunpath)
241  else:
242  raise ValueError('Bad num_mpi_tasks value: %d' % num_mpi_tasks)
243  else:
244  # This default value of 0 will denote non-MPI execution when
245  # invoking the geogrid service
246  self._num_mpi_tasks = 0
247  LOGGER.debug('num_mpi_tasks: %d' % self._num_mpi_tasks)
248 
249 
250  if num_openmp_threads:
251  # Ensure valid number, otherwise raise exception.
252  if 1 <= num_openmp_threads <= DEFAULTS.max_openmp_threads():
253  pass
254  else:
255  raise ValueError('Bad num_openmp_threads value: %d' %
256  num_openmp_threadss)
257  else:
258  # This default value of 0 will denote non-OpenMP execution when
259  # invoking the flexwrf service
260  self._num_openmp_threads = 0
261  LOGGER.debug('num_openmp_threads: %d' % self._num_openmp_threads)
262 
263  # If we got through all this, save the time info for the run_wrf()
264  # method
265 
266 
267  if self._bypass_flexp_input:
268  # We won't use these if bypass_flexp_input is being used, so
269  # set them to None so that other code won't mistakenly try to use
270  # them
271  self._start_time_dt = None
272  self._stop_time_dt = None
273  self._wrfout_hours_intvl = None
274 
275  else:
276  self._start_time_dt = start_time_dt
277  self._stop_time_dt = stop_time_dt
278  self._hours_intvl = hours_intvl
279 
280  self._wrfout_data_dict_list = wrfout_data_dict_list
281 
282 
283  def run_flexwrf(self, flexwrf_output_stagedir=None):
284 
285 
286  '''Set up and run, via nwpservice module, instance of flexwrf
287 
288  Parameters
289  ----------
290  flexwrf_output_stagedir : str
291  (Optional) Full path to dir where flexwrf output files will be staged.
292  If not specified, no staging will be done. If specified, we
293  assume the dir already exists.
294 
295  Returns
296  -------
297  return_dict : dict
298  Dictionary with manifest of the flexwrf output files and the location
299  (if applicable) of staged files
300  '''
301 
302 
303  LOGGER.debug('Starting run_flexwrf()...')
304 
305  # Check the output stagedir
306 
307  # If we're using the bypass_flexp_input, then there's
308  # just a little prep necessary. Otherwise, we will need to
309  # create the flexp_input based on a number of parameters, including
310  # the domain_defn and the dates/times, etc.
311  if self._bypass_flexp_input:
312  # Test for its presence
313  if not os.path.isfile(self._bypass_flexp_input):
314  raise FileNotFoundError(self._bypass_flexp_input)
315  flexp_input_path = self._bypass_flexp_input
316 
317  else:
318  # This is where eventually we'll add in creation of flexp_input
319  flexp_input_path = None
320  raise NotImplementedError(
321  'flexp_input creation not yet supported')
322 
323 
324  # Create nwpservice object, then setup and run
325  domainpath = os.path.join(self._working_rootdir, 'flexwrf_rundir')
326  LOGGER.debug('domainpath: %s' % domainpath)
327 
328 
329  # The current specifications for metfiles_path arg to the nwpservice
330  # flexwrf is that it is a list of paths, one for each wrfout nest.
331  # Every wrfout file in that path will end up being included in
332  # the flexwrf setup, whether it's needed or not. So, in this case,
333  # since file accessibility was already checked in __init__(), we
334  # will, for each nest, create a wrfout_d0* dir in the run directory,
335  # make symlinks to each of the wrfout files in that nest (using
336  # wrfout_data_dict_list entries for source), then add the path of
337  # this newly created dir to our metfiles_path list
338 
339  metfiles_path_list = []
340  #LOGGER.debug('wrfout_data_dict_list: %s' % self._wrfout_data_dict_list)
341  for i in range(self._wrfout_num_nests):
342  # Although index i "should" correspond to dict entry nest i,
343  # I'm going to not make that assumption, and try to be a little
344  # more flexible (and complicated)
345  try:
346  nest_idx = self._wrfout_data_dict_list.index(
347  next(
348  filter(lambda n: n.get('nest') == (i+1),
350  )
351  )
352  except:
353  raise ValueError('Failed to find nest index: %d' % (i+1))
354  LOGGER.debug('nest_idx: %d' % nest_idx)
355 
356  # Now we know we have the correct idx for this nest, so extract
357  # the path
358  nest_path = self._wrfout_data_dict_list[nest_idx]['path']
359  nest_files = self._wrfout_data_dict_list[nest_idx]['filelist']
360  LOGGER.debug('nest_path: %s' % nest_path)
361  LOGGER.debug('nest_files: %s' % nest_files)
362 
363 
364  # Create the wrfout_d0* dir within the rundir, and within it,
365  # make links to the file names just extracted
366  dirname = 'wrfout_d%02d' % (i+1)
367  dest_path = os.path.join(self._working_rootdir, dirname)
368  os.mkdir(dest_path, 0o755)
369  for wrfout_file in nest_files:
370  src_fpath = os.path.join(nest_path, wrfout_file)
371  dest_fpath = os.path.join(dest_path, wrfout_file)
372  os.symlink(src_fpath, dest_fpath)
373 
374  metfiles_path_list.append(dest_path)
375 
376  nwpflexwrf_obj = nwpservice.flexwrf.flexwrf.FlexWrf(
377  flexwrf_distro_path=self._flexwrf_distro_path,
378  flexwrf_rundir=domainpath,
379  flexp_input_txt=self._bypass_flexp_input,
380  metfilespath_list=metfiles_path_list,
381  output_dir=flexwrf_output_stagedir,
382  numpes=self._num_mpi_tasks,
383  mpirun_path=self._mpirunpath,
384  log_level=self._logging_level
385  )
386 
387  nwpflexwrf_obj.setup()
388  output_manifest = nwpflexwrf_obj.run()
389  LOGGER.debug('output_manifest: %s' % output_manifest)
390 
391  # If this wasn't successful (even if an output staging dir was
392  # never specified), it should return false
393  stage_success = nwpflexwrf_obj.stage_output(auxfiles=True)
394  if stage_success:
395  staging_dir = flexwrf_output_stagedir
396  else:
397  staging_dir = None
398 
399  return_dict = {
400  'flexwrf_output_manifest' : output_manifest,
401  'staging_dir' : staging_dir,
402  'hours_intvl' : self._wrfout_hours_intvl
403  }
404  LOGGER.debug('return_dict: %s' % return_dict)
405 
406  return return_dict
ehratm.flexwrf.flexwrf.FlexwrfWorkflow._num_mpi_tasks
_num_mpi_tasks
Definition: flexwrf.py:222
ehratm.flexwrf.flexwrf.FlexwrfWorkflow._wrfout_data_dict_list
_wrfout_data_dict_list
Definition: flexwrf.py:266
ehratm.flexwrf.flexwrf.FlexwrfWorkflow._wrfout_num_nests
_wrfout_num_nests
Definition: flexwrf.py:178
ehratm.defaults
Definition: defaults.py:1
ehratm.flexwrf.flexwrf.FlexwrfWorkflow.__init__
def __init__(self, flexwrf_distro_path=None, working_rootdir=None, start_time_dt=None, stop_time_dt=None, outgrid_defn_dict=None, flexwrf_specs_dict=None, wrfout_hours_intvl=None, wrfout_num_nests=None, wrfout_data_dict_list=None, num_mpi_tasks=None, num_openmp_threads=None, log_level=None, bypass_flexp_input=None)
Definition: flexwrf.py:44
ehratm.flexwrf.flexwrf.FlexwrfWorkflow._logging_level
_logging_level
Definition: flexwrf.py:127
ehratm.flexwrf.flexwrf.FlexwrfWorkflow._hours_intvl
_hours_intvl
Definition: flexwrf.py:264
ehratm.flexwrf.flexwrf.FlexwrfWorkflow._bypass_flexp_input
_bypass_flexp_input
Definition: flexwrf.py:143
ehratm.defaults.Defaults
Definition: defaults.py:9
ehratm.flexwrf.flexwrf.FlexwrfWorkflow._flexwrf_distro_path
_flexwrf_distro_path
Definition: flexwrf.py:157
ehratm.flexwrf.flexwrf.FlexwrfWorkflow._start_time_dt
_start_time_dt
Definition: flexwrf.py:257
ehratm.flexwrf.flexwrf.FlexwrfWorkflow._num_openmp_threads
_num_openmp_threads
Definition: flexwrf.py:246
ehratm.flexwrf.flexwrf.FlexwrfWorkflow._mpirunpath
_mpirunpath
Definition: flexwrf.py:213
ehratm.mylogger.getlogger
def getlogger()
Definition: mylogger.py:11
ehratm.mylogger
Definition: mylogger.py:1
ehratm.flexwrf.flexwrf.FlexwrfWorkflow
Definition: flexwrf.py:36
ehratm.flexwrf.flexwrf.FlexwrfWorkflow._working_rootdir
_working_rootdir
Definition: flexwrf.py:167
ehratm.flexwrf.flexwrf.FlexwrfWorkflow._stop_time_dt
_stop_time_dt
Definition: flexwrf.py:258
ehratm.flexwrf.flexwrf.FlexwrfWorkflow._wrfout_hours_intvl
_wrfout_hours_intvl
Definition: flexwrf.py:259
ehratm.flexwrf.flexwrf.FlexwrfWorkflow.run_flexwrf
def run_flexwrf(self, flexwrf_output_stagedir=None)
Definition: flexwrf.py:283