ehratm APIs
ungrib.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 
3 # *******************************************************
4 # * International Data Centre *
5 # * Comprehensive Nuclear Test Ban Treaty Organization *
6 # * Vienna *
7 # * Austria *
8 # *
9 # * Don Morton (DM) *
10 # * Boreal Scientific Computing *
11 # * Fairbanks, Alaska USA *
12 # *******************************************************
13 
14 
15 
16 #-----------------------------------------------------------------------------
17 
18 #import argparse
19 import datetime
20 import logging
21 import os
22 import uuid
23 
24 import f90nml
25 
26 import ehratm.defaults
27 import ehratm.mylogger
28 
29 import nwpservice.wps.namelistwps
30 import nwpservice.wps.ungrib
31 
33 
35 
36 
37 class UngribWorkflow(object):
38 
39 
40  '''Class for managing preparation and execution of ungrib components
41  '''
42 
43 
44  def __init__(self, log_level=None):
45 
46  '''Initialises the class
47  '''
48  if log_level:
49  self._logging_level = log_level
50  else:
51  self._logging_level = DEFAULTS.log_level()
52  LOGGER.setLevel(self._logging_level)
53 
54 
55 
56  @staticmethod
58  rootdir=None,
59  subtype=None,
60  wrfstart_time_str=None,
61  wrfstop_time_str=None,
62  hour_intvl=None
63  ):
64 
65  '''Verifies that all expected grib files of type gfs_ctbto are
66  accessible
67 
68  Parameters
69  ----------
70  rootdir : str
71  Full path to dir containing all expected grib files
72  subtype : str
73  Specific class of gfs_ctbto file (0.5, 1.0, 0.5.fv3)
74  wrfstart_time_str : str
75  Start time of WRF simulation in YYYYMMDDHH[mmss] format
76  wrfstop_time_str : str
77  Stop time of WRF simulation in YYYYMMDDHH[mmss] format
78  hour_intvl : int
79  Expected interval (in hours) between GRIB files
80 
81  Returns
82  -------
83  return_dict : dict
84  Dictionary with 'verified' and 'missing files' keys
85  '''
86 
87  VALID_SUBTYPES = ['1.0', '0.5', '0.5.fv3']
88 
89  # We start with assumption that all will verify, and a single
90  # miss will set this to False
91  return_dict = {'verified_correct' : True, 'missing_files' : []}
92 
93  GFS_CTBTO_FILENAME_PREFIX = 'GD'
94 
95  # Ensure valid subtype
96  if subtype not in VALID_SUBTYPES:
97  raise ValueError('Invalid subtype: %s' % subtype)
98 
99  # Pad minutes and seconds with zeroes if necessary
100  wrfstart_time_str = wrfstart_time_str.ljust(14, '0')
101  wrfstop_time_str = wrfstop_time_str.ljust(14, '0')
102 
103  # Create datetime representation
104  wrfstart_dt = datetime.datetime.strptime(wrfstart_time_str,
105  '%Y%m%d%H%M%S')
106  wrfstop_dt = datetime.datetime.strptime(wrfstop_time_str,
107  '%Y%m%d%H%M%S')
108 
109  # Now, iterate through and add any missing files to the missing file
110  # list
111  #
112  # In this type of data, we are looking for
113  #
114  # <rootdir>/YYYY/MM/DD/<subtype>/GDYYMMDDHH
115  #
116  curr_time_dt = wrfstart_dt
117 
118  while curr_time_dt <= wrfstop_dt:
119  LOGGER.debug('Verifying: %s' % curr_time_dt)
120 
121  year = '%04d' % curr_time_dt.year
122  month = "%02d" % curr_time_dt.month
123  day = "%02d" % curr_time_dt.day
124  hour = "%02d" % curr_time_dt.hour
125  fname = GFS_CTBTO_FILENAME_PREFIX + year[2:4] + month + day + hour
126  metpath = os.path.join(rootdir,
127  year, month, day,
128  subtype,
129  fname
130  )
131  LOGGER.debug('Checking for: %s' % metpath)
132  if not os.path.exists(metpath):
133  return_dict['verified_correct'] = False
134  LOGGER.warning('Unable to find metpath: %s' % metpath)
135  return_dict['missing_files'].append(metpath)
136 
137 
138  curr_time_dt += datetime.timedelta(hours=hour_intvl)
139 
140  return return_dict
141 
142 
143  @staticmethod
145  rootdir=None,
146  wrfstart_time_str=None,
147  wrfstop_time_str=None,
148  hour_intvl=None
149  ):
150 
151 
152  '''Verifies that all expected grib files of type ecmwf_ml are
153  accessible
154 
155  Parameters
156  ----------
157  rootdir : str
158  Full path to dir containing all expected grib files
159  wrfstart_time_str : str
160  Start time of WRF simulation in YYYYMMDDHH[mmss] format
161  wrfstop_time_str : str
162  Stop time of WRF simulation in YYYYMMDDHH[mmss] format
163  hour_intvl : int
164  Expected interval (in hours) between GRIB files
165 
166  Returns
167  -------
168  return_dict : dict
169  Dictionary with 'verified' and 'missing files' keys
170  '''
171 
172  # We start with assumption that all will verify, and a single
173  # miss will set this to False
174  return_dict = {'verified_correct' : True, 'missing_files' : []}
175 
176  ECMWF_ML_FILENAME_PREFIX = 'EA'
177 
178  # Pad minutes and seconds with zeroes if necessary
179  wrfstart_time_str = wrfstart_time_str.ljust(14, '0')
180  wrfstop_time_str = wrfstop_time_str.ljust(14, '0')
181 
182  # Create datetime representation
183  wrfstart_dt = datetime.datetime.strptime(wrfstart_time_str,
184  '%Y%m%d%H%M%S')
185  wrfstop_dt = datetime.datetime.strptime(wrfstop_time_str,
186  '%Y%m%d%H%M%S')
187 
188  # Now, iterate through and add any missing files to the missing file
189  # list
190  #
191  # In this type of data, we are looking for
192  #
193  # <rootdir>/YYYY/MM/DD/<subtype>/GDYYMMDDHH
194  #
195  curr_time_dt = wrfstart_dt
196 
197  while curr_time_dt <= wrfstop_dt:
198  LOGGER.debug('Verifying: %s' % curr_time_dt)
199 
200  year = '%04d' % curr_time_dt.year
201  month = "%02d" % curr_time_dt.month
202  day = "%02d" % curr_time_dt.day
203  hour = "%02d" % curr_time_dt.hour
204  fname = ECMWF_ML_FILENAME_PREFIX + year + month + day + hour
205  fname += '.ml'
206  metpath = os.path.join(rootdir,
207  year, month, day,
208  fname
209  )
210  LOGGER.debug('Checking for: %s' % metpath)
211  if not os.path.exists(metpath):
212  return_dict['verified_correct'] = False
213  LOGGER.warning('Unable to find metpath: %s' % metpath)
214  return_dict['missing_files'].append(metpath)
215 
216 
217  curr_time_dt += datetime.timedelta(hours=hour_intvl)
218 
219  return return_dict
220 
221 
222  @staticmethod
224  rootdir=None,
225  wrfstart_time_str=None,
226  wrfstop_time_str=None,
227  hour_intvl=None
228  ):
229 
230 
231  '''Verifies that all expected grib files of type ecmwf_sfc are
232  accessible
233 
234  Parameters
235  ----------
236  rootdir : str
237  Full path to dir containing all expected grib files
238  wrfstart_time_str : str
239  Start time of WRF simulation in YYYYMMDDHH[mmss] format
240  wrfstop_time_str : str
241  Stop time of WRF simulation in YYYYMMDDHH[mmss] format
242  hour_intvl : int
243  Expected interval (in hours) between GRIB files
244 
245  Returns
246  -------
247  return_dict : dict
248  Dictionary with 'verified' and 'missing files' keys
249  '''
250 
251  # We start with assumption that all will verify, and a single
252  # miss will set this to False
253  return_dict = {'verified_correct' : True, 'missing_files' : []}
254 
255  ECMWF_SFC_FILENAME_PREFIX = 'EA'
256 
257  # Pad minutes and seconds with zeroes if necessary
258  wrfstart_time_str = wrfstart_time_str.ljust(14, '0')
259  wrfstop_time_str = wrfstop_time_str.ljust(14, '0')
260 
261  # Create datetime representation
262  wrfstart_dt = datetime.datetime.strptime(wrfstart_time_str,
263  '%Y%m%d%H%M%S')
264  wrfstop_dt = datetime.datetime.strptime(wrfstop_time_str,
265  '%Y%m%d%H%M%S')
266 
267  # Now, iterate through and add any missing files to the missing file
268  # list
269  #
270  # In this type of data, we are looking for
271  #
272  # <rootdir>/YYYY/MM/DD/<subtype>/GDYYMMDDHH
273  #
274  curr_time_dt = wrfstart_dt
275 
276  while curr_time_dt <= wrfstop_dt:
277  LOGGER.debug('Verifying: %s' % curr_time_dt)
278 
279  year = '%04d' % curr_time_dt.year
280  month = "%02d" % curr_time_dt.month
281  day = "%02d" % curr_time_dt.day
282  hour = "%02d" % curr_time_dt.hour
283  fname = ECMWF_SFC_FILENAME_PREFIX + year + month + day + hour
284  fname += '.sfc'
285  metpath = os.path.join(rootdir,
286  year, month, day,
287  fname
288  )
289  LOGGER.debug('Checking for: %s' % metpath)
290  if not os.path.exists(metpath):
291  return_dict['verified_correct'] = False
292  LOGGER.warning('Unable to find metpath: %s' % metpath)
293  return_dict['missing_files'].append(metpath)
294 
295 
296  curr_time_dt += datetime.timedelta(hours=hour_intvl)
297 
298  return return_dict
299 
300 
302  wpswrf_distro_path=None,
303  working_scratch_rootdir=None,
304  start_time_dt=None, stop_time_dt=None,
305  hours_intvl=None,
306  ungrib_prefix='FILE',
307  vtable_custom_path=None,
308  grib_type=None, grib_subtype=None,
309  grib_rootdir=None,
310  ungrib_output_stagedir=None):
311 
312  '''Set up and run, via nwpservice module, instance of ungrib
313 
314  Parameters
315  ----------
316  wpswrf_distro_path : str
317  Full path to the WPS/WRF distribution to be used. This is assumed
318  to have been installed in a way that is compatible with the
319  nwpservice module
320  working_scratch_rootdir : str
321  Full path to a directory (assumed to have already been created),
322  to be used as scratch space for setting up and running this
323  ungrib instance
324  start_time_dt : datetime
325  Start time of GRIB retrieval
326  stop_time_dt : datetime
327  Stop time of GRIB retrieval
328  hours_intvl : int
329  Interval (in hours) between GRIB files
330  ungrib_prefix : str
331  (Optional, defaults to 'FILE') String to use for the ungrib
332  files prefix
333  vtable_custom_path : str
334  (Optional) Path to a custom vtable to use for this instance
335  grib_type : str
336  Type of grib file (this needs to be documented better)
337  grib_subtype : str
338  Subtype of grib file (this needs to be documented better)
339  grib_rootdir : str
340  Full path to dir containing the GRIB files
341  ungrib_output_stagedir : str
342  (Optional) Full path to dir where ungribbed files will be staged.
343  If not specified, no staging will be done. If specified, we
344  assume the dir already exists.
345 
346  Returns
347  -------
348  return_dict : dict
349  Dictionary with manifest of the ungribbed files and the location
350  (if applicable) of staged files
351  '''
352 
353 
354  LOGGER.debug('Start _run_single_ungrib()')
355 
356 
357  # Create name for tempdir for the met data and create dir
358 
361  metdata_tmp_dir = os.path.join(working_scratch_rootdir,
362  'tmpmetdata-' + grib_type)
363  LOGGER.debug('metdata_tmp_dir: %s' % metdata_tmp_dir)
364  try:
365  os.mkdir(metdata_tmp_dir, 0o755)
366  except:
367  raise OSError('mkdir failed: %s' % metdata_tmp_dir)
368 
369 
370  # Create name for a tempdir for the run
371  #wpswrf_rundir = os.path.join(working_scratch_rootdir,
372  # 'ungrib_rundir_' + str(uuid.uuid4()))
373 
374  #wpswrf_rundir = os.path.join(working_scratch_rootdir,
375  # 'ungrib_rundir')
376  #LOGGER.debug('wpswrf_rundir: %s' % wpswrf_rundir)
377 
378 
379  # Create name for namelist.wps
380  namelist_wps_path = os.path.join(working_scratch_rootdir,
381  'namelist.wps_' + grib_type)
382  LOGGER.debug('namelist_wps_path: %s' % namelist_wps_path)
383 
384  # Generate flat set of links to met files
385  #flatlinks_dir = os.path.join('/tmp', str(uuid.uuid4()) +
386  # '-flatlinks-' + grib_type)
387  #LOGGER.debug('flatlinks_dir: %s' % flatlinks_dir)
388  self.gribfile_flatlinks(grib_type=grib_type,
389  grib_subtype=grib_subtype,
390  grib_rootdir=grib_rootdir,
391  start_time_dt=start_time_dt,
392  end_time_dt=stop_time_dt,
393  hour_intvl=hours_intvl,
394  newdir=metdata_tmp_dir)
395  LOGGER.debug('links to grib files: %s' % os.listdir(metdata_tmp_dir))
396 
397 
398  # Create namelist.wps
399  # The share section
400  start_time_str = start_time_dt.strftime('%Y%m%d%H%M%S')
401  end_time_str = stop_time_dt.strftime('%Y%m%d%H%M%S')
402  interval_secs = int(hours_intvl*3600)
403 
404  # The full section_data_dict
405  section_data_dict = {
406  'share' : {
407  'max_dom' : 1,
408  'start_date_list' : [start_time_str],
409  'end_date_list' : [end_time_str],
410  'interval_seconds' : interval_secs
411  },
412  'ungrib' : {
413  'out_format' : 'WPS',
414  'prefix' : ungrib_prefix
415  }
416  }
417 
418  myobj = nwpservice.wps.namelistwps.NamelistWpsWriter(
419  destpath=namelist_wps_path,
420  section_data_dict=section_data_dict,
421  log_level=self._logging_level
422  )
423  LOGGER.debug('Writing namelist: %s' % namelist_wps_path)
424  myobj.write()
425 
426  # Init ungrib component, run it, stage the output
427 
428  domainpath = os.path.join(working_scratch_rootdir,
429  'ungrib_rundir_' + grib_type)
430  if grib_type == 'gfs_ctbto':
431  metdatatype = 'gfs'
432  vtable_userdef = None
433  elif grib_type == 'ecmwf_ml':
434  metdatatype = 'ecmwfml'
435  vtable_userdef=vtable_custom_path
436  elif grib_type == 'ecmwf_sfc':
437  metdatatype = 'ecmwfsfc'
438  vtable_userdef=vtable_custom_path
439  else:
440  raise ValueError('grib_type not yet supported here: %s' % grib_type)
441  ungrib_obj = nwpservice.wps.ungrib.Ungrib(
442  wpswrf_distro_path=wpswrf_distro_path,
443  wpswrf_rundir=domainpath,
444  namelist_wps=namelist_wps_path,
445  metdatadir=metdata_tmp_dir,
446  metdatatype=metdatatype,
447  vtable_userdef=vtable_custom_path,
448  output_dir=ungrib_output_stagedir,
449  log_level=self._logging_level
450  )
451 
452  ungrib_obj.setup()
453  output_manifest = ungrib_obj.run()
454  LOGGER.debug('output_manifest: %s' % output_manifest)
455 
456  # If this wasn't successful (even if an output staging dir was
457  # never specified), it should return False
458  stage_success = ungrib_obj.stage_output(auxfiles=False)
459  if stage_success:
460  staging_dir = ungrib_output_stagedir
461  else:
462  staging_dir = None
463 
464  return_dict = {
465  'ungrib_output_manifest' : output_manifest,
466  'staging_dir' : staging_dir,
467  'hours_intvl' : hours_intvl
468  }
469 
470  LOGGER.debug('return_dict: %s' % return_dict)
471  return return_dict
472 
473 
474 
476  grib_type=None,
477  grib_subtype=None,
478  grib_rootdir=None,
479  start_time_dt=None, end_time_dt=None,
480  hour_intvl=None,
481  newdir=None
482  ):
483 
484  '''Sets up a dir of all expected GRIB files in a flat space
485 
486  Ungrib ultimately needs to make links to all of the GRIB files so that
487  the links are all in the same directory. This sets up a dir beforehand
488  that gets them all (via links) into a single dir, simplifying the
489  setup of ungrib down the road
490 
491 
492  Parameters
493  ----------
494  grib_type : str
495  Type of grib file (this needs to be documented better)
496  grib_subtype : str
497  Subtype of grib file (this needs to be documented better)
498  grib_rootdir : str
499  Full path to dir containing the GRIB files
500  start_time_dt : datetime
501  Start time of GRIB retrieval
502  stop_time_dt : datetime
503  Stop time of GRIB retrieval
504  hour_intvl : int
505  Interval (in hours) between GRIB files
506  newdir : str
507  Full path to the dir containing the links in "flat file space"
508  Dir is assumed to already exist
509 
510  Returns
511  -------
512  Nothing : None
513  '''
514 
515 
516  VALID_METDATA_TYPES = ['gfs_ctbto', 'ecmwf_sfc', 'ecmwf_ml']
517 
518  if grib_type not in VALID_METDATA_TYPES:
519  raise ValueError('Invalid grib_type: %s' % grib_type)
520 
521  if not os.path.isdir(grib_rootdir):
522  raise ValueError('grib_rootdir not a directory: %s' % grib_rootdir)
523 
524  if not os.path.isdir(newdir):
525  raise ValueError('newdir not a directory: %s' % newdir)
526 
527  # Iterate through expected times
528  curr_time_dt = start_time_dt
529  while curr_time_dt <= end_time_dt:
530  LOGGER.debug('curr_time_dt: %s' % curr_time_dt)
531 
532  # Form the full path to grib file - although the dir structure
533  # is similar for these types, I'm choosing to repeat them for
534  # each type, just to I'm not making difficult-to-change
535  # assumptions at some later date.
536  if grib_type == 'gfs_ctbto':
537  timestr = curr_time_dt.strftime('%y%m%d%H')
538  rel_path = '%04d/%02d/%02d/%s/GD%8s' % \
539  (curr_time_dt.year, curr_time_dt.month,
540  curr_time_dt.day, grib_subtype,
541  timestr)
542  #LOGGER.debug('rel_path: %s' % rel_path)
543  full_path = os.path.join(grib_rootdir, rel_path)
544  LOGGER.debug('full_path: %s' % full_path)
545 
546  link_name = 'GD%8s' % timestr
547  link_path = os.path.join(newdir, link_name)
548  LOGGER.debug('link_path: %s' % link_path)
549  elif grib_type == 'ecmwf_ml':
550  timestr = curr_time_dt.strftime('%Y%m%d%H')
551  #rel_path = 'EA%10s.ml' % timestr
552  rel_path = '%04d/%02d/%02d/EA%10s.ml' % \
553  (curr_time_dt.year, curr_time_dt.month,
554  curr_time_dt.day, timestr)
555  #LOGGER.debug('rel_path: %s' % rel_path)
556  full_path = os.path.join(grib_rootdir, rel_path)
557  LOGGER.debug('full_path: %s' % full_path)
558 
559  link_name = 'EA%10s.ml' % timestr
560  link_path = os.path.join(newdir, link_name)
561  LOGGER.debug('link_path: %s' % link_path)
562  elif grib_type == 'ecmwf_sfc':
563  timestr = curr_time_dt.strftime('%Y%m%d%H')
564  #rel_path = 'EA%10s.sfc' % timestr
565  rel_path = '%04d/%02d/%02d/EA%10s.sfc' % \
566  (curr_time_dt.year, curr_time_dt.month,
567  curr_time_dt.day, timestr)
568  #LOGGER.debug('rel_path: %s' % rel_path)
569  full_path = os.path.join(grib_rootdir, rel_path)
570  LOGGER.debug('full_path: %s' % full_path)
571 
572  link_name = 'EA%10s.sfc' % timestr
573  link_path = os.path.join(newdir, link_name)
574  LOGGER.debug('link_path: %s' % link_path)
575  else:
576  raise ValueError('Unsupported grib_type: %s' % grib_type)
577 
578  # Make the link
579  try:
580  os.symlink(full_path, link_path)
581  except:
582  OSError("Failed to make link: %s" % link_path)
583 
584 
585  curr_time_dt += datetime.timedelta(hours=hour_intvl)
ehratm.wps.ungrib.UngribWorkflow.gribmet_verify_ecmwf_sfc
def gribmet_verify_ecmwf_sfc(rootdir=None, wrfstart_time_str=None, wrfstop_time_str=None, hour_intvl=None)
Definition: ungrib.py:223
ehratm.defaults
Definition: defaults.py:1
ehratm.wps.ungrib.UngribWorkflow.run_single_ungrib
def run_single_ungrib(self, wpswrf_distro_path=None, working_scratch_rootdir=None, start_time_dt=None, stop_time_dt=None, hours_intvl=None, ungrib_prefix='FILE', vtable_custom_path=None, grib_type=None, grib_subtype=None, grib_rootdir=None, ungrib_output_stagedir=None)
Definition: ungrib.py:301
ehratm.wps.ungrib.UngribWorkflow.__init__
def __init__(self, log_level=None)
Definition: ungrib.py:44
ehratm.wps.ungrib.UngribWorkflow._logging_level
_logging_level
Definition: ungrib.py:49
ehratm.wps.ungrib.UngribWorkflow
Definition: ungrib.py:37
ehratm.wps.ungrib.UngribWorkflow.gribmet_verify_gfs_ctbto
def gribmet_verify_gfs_ctbto(rootdir=None, subtype=None, wrfstart_time_str=None, wrfstop_time_str=None, hour_intvl=None)
Definition: ungrib.py:57
ehratm.wps.ungrib.UngribWorkflow.gribfile_flatlinks
def gribfile_flatlinks(self, grib_type=None, grib_subtype=None, grib_rootdir=None, start_time_dt=None, end_time_dt=None, hour_intvl=None, newdir=None)
Definition: ungrib.py:475
ehratm.defaults.Defaults
Definition: defaults.py:9
ehratm.wps.ungrib.UngribWorkflow.gribmet_verify_ecmwf_ml
def gribmet_verify_ecmwf_ml(rootdir=None, wrfstart_time_str=None, wrfstop_time_str=None, hour_intvl=None)
Definition: ungrib.py:144
ehratm.mylogger.getlogger
def getlogger()
Definition: mylogger.py:11
ehratm.mylogger
Definition: mylogger.py:1