Source code for solarforecastarbiter.io.fetch.nwp

"""
Fetch NWP files from NCEP NOMADS for select variables. Should primarily be used
as a CLI program.


The script fetches grib2 files from NOMADS as they are available using g2sub,
uses wgrib2 to convert the grib files to netCDF (and adds wind speed),
and optimizes the netCDF file for our expected usage accessing a time-series
for a single location.

Each possible model has an associated dictonary with parameters that are
passed in the HTTP request. Other keys in the dictionaries are used
to set the timing of the HTTP requests:
- update_freq: the time between full model runs
- valid_hr_gen: A generator to produce the valid forecast hours that the model
  produces
- time_between_fcst_hrs: The time in seconds that it typically takes to produce
  the grib2 file for the next forecasted hour
- delay_to_first_forecast: The time after the nominal initialization time
  that the first forecast from the model is typically available
- avg_max_run_length: The average time it takes the model to produce the last
  possible forecast file from the first forecast file.

Many of these parameters are inferred from
https://www.nco.ncep.noaa.gov/pmb/nwprod/prodstat/


This script uses features of asyncio that are likely not available in Windows.
"""
import asyncio
from itertools import chain
import logging
import os
from pathlib import Path
import re
import shutil
import stat
import subprocess
import sys
import tempfile


import aiohttp
import pandas as pd
import xarray as xr


from solarforecastarbiter.io.fetch import (
    make_session, run_in_executor, abort_all_on_exception)

from solarforecastarbiter.io.nwp import DOMAIN


logger = logging.getLogger(__name__)


CHECK_URL = 'https://nomads.ncep.noaa.gov/pub/data/nccf/com/{}/prod'
BASE_URL = 'https://nomads.ncep.noaa.gov/cgi-bin/'


GFS_0P25_1HR = {'endpoint': 'filter_gfs_0p25_1hr.pl',
                'file': 'gfs.t{init_hr:02d}z.pgrb2.0p25.f{valid_hr:03d}',
                # /atmos added to dir with March 22, 2021, 1200Z GFS upgrade
                'dir': '/gfs.{init_date}/{init_hr}/atmos',
                'lev_2_m_above_ground': 'on',
                'lev_10_m_above_ground': 'on',
                'lev_entire_atmosphere': 'on',
                'lev_surface': 'on',
                'var_DSWRF': 'on',
                'var_TCDC': 'on',
                'var_TMP': 'on',
                'var_UGRD': 'on',
                'var_VGRD': 'on',
                'update_freq': '6h',
                'valid_hr_gen': lambda x: chain(range(120), range(120, 240, 3),
                                                range(240, 385, 12)),
                'time_between_fcst_hrs': 60,
                'delay_to_first_forecast': '200min',
                'avg_max_run_length': '100min',
                'filename': 'gfs_0p25.nc'}


NAM_CONUS = {'endpoint': 'filter_nam.pl',
             'file': 'nam.t{init_hr:02d}z.awphys{valid_hr:02d}.tm00.grib2',
             'dir': '/nam.{init_date}',
             'lev_2_m_above_ground': 'on',
             'lev_10_m_above_ground': 'on',
             'lev_entire_atmosphere_\\(considered_as_a_single_layer\\)': 'on',
             'lev_surface': 'on',
             'var_DSWRF': 'on',
             'var_TCDC': 'on',
             'var_TMP': 'on',
             'var_UGRD': 'on',
             'var_VGRD': 'on',
             'update_freq': '6h',
             'valid_hr_gen': lambda x: chain(range(36), range(36, 85, 3)),
             'time_between_fcst_hrs': 60,
             'delay_to_first_forecast': '90min',
             'avg_max_run_length': '80min',
             'filename': 'nam_12km.nc'}


# should be able to use RANGE requests and get data directly from grib files
# like https://www.cpc.ncep.noaa.gov/products/wesley/fast_downloading_grib.html
# so we can get DSWRF for RAP
RAP = {'endpoint': 'filter_rap.pl',
       'file': 'rap.t{init_hr:02d}z.awp130pgrbf{valid_hr:02d}.grib2',
       'dir': '/rap.{init_date}',
       'lev_2_m_above_ground': 'on',
       'lev_10_m_above_ground': 'on',
       'lev_entire_atmosphere': 'on',
       'lev_surface': 'on',
       'var_TCDC': 'on',
       'var_TMP': 'on',
       'var_UGRD': 'on',
       'var_VGRD': 'on',
       'update_freq': '1h',
       'valid_hr_gen': (
           lambda x: range(40) if x in (3, 9, 15, 21) else range(22)),
       'time_between_fcst_hrs': 60,
       'delay_to_first_forecast': '50min',
       'avg_max_run_length': '30min',
       'filename': 'rap.nc'}


HRRR_HOURLY = {
    'endpoint': 'filter_hrrr_2d.pl',
    'file': 'hrrr.t{init_hr:02d}z.wrfsfcf{valid_hr:02d}.grib2',
    'dir': '/hrrr.{init_date}/conus',
    'lev_2_m_above_ground': 'on',
    'lev_10_m_above_ground': 'on',
    'lev_entire_atmosphere': 'on',
    'lev_surface': 'on',
    'var_DSWRF': 'on',
    'var_VBDSF': 'on',
    'var_VDDSF': 'on',
    'var_TCDC': 'on',
    'var_TMP': 'on',
    'var_UGRD': 'on',
    'var_VGRD': 'on',
    'update_freq': '1h',
    'valid_hr_gen': (
        lambda x: range(37) if x in (0, 6, 12, 18) else range(19)),
    'time_between_fcst_hrs': 120,
    'delay_to_first_forecast': '45min',
    'avg_max_run_length': '70min',
    'filename': 'hrrr_hourly.nc'}


HRRR_SUBHOURLY = {
    'endpoint': 'filter_hrrr_sub.pl',
    'file': 'hrrr.t{init_hr:02d}z.wrfsubhf{valid_hr:02d}.grib2',
    'dir': '/hrrr.{init_date}/conus',
    'lev_2_m_above_ground': 'on',
    'lev_10_m_above_ground': 'on',
    'lev_entire_atmosphere': 'on',
    'lev_surface': 'on',
    'var_DSWRF': 'on',
    'var_VBDSF': 'on',
    'var_VDDSF': 'on',
    'var_TMP': 'on',
    'var_WIND': 'on',
    'update_freq': '1h',
    'valid_hr_gen': (lambda x: range(19)),
    'time_between_fcst_hrs': 120,
    'delay_to_first_forecast': '45min',
    'avg_max_run_length': '50min',
    'filename': 'hrrr_subhourly.nc'}


# each GEFS stat_or_member is treated separately
# really makes use of async capabilities
GEFS_0P50_RAW = {'endpoint': 'filter_gefs_atmos_0p50a.pl',
                 'file': 'ge{stat_or_member}.t{init_hr:02d}z.pgrb2a.0p50.f{valid_hr:03d}',  # NOQA
                 'dir': '/gefs.{init_date}/{init_hr}/atmos/pgrb2ap5',
                 'lev_2_m_above_ground': 'on',
                 'lev_10_m_above_ground': 'on',
                 'lev_entire_atmosphere': 'on',
                 'lev_surface': 'on',
                 'var_DSWRF': 'on',
                 'var_TCDC': 'on',
                 'var_TMP': 'on',
                 'var_UGRD': 'on',
                 'var_VGRD': 'on',
                 'update_freq': '6h',
                 'valid_hr_gen': lambda x: chain(range(0, 240, 3),
                                                 range(240, 385, 6)),
                 'time_between_fcst_hrs': 60,
                 'delay_to_first_forecast': '280min',
                 'avg_max_run_length': '60min',
                 'filename': 'gefs_{stat_or_member}.nc',
                 'members': (['avg', 'c00', 'spr'] +
                             [f'p{r:02d}' for r in range(1, 21)]),
                 'check_url_name': 'gens'}


EXTRA_KEYS = ['update_freq', 'valid_hr_gen', 'time_between_fcst_hrs',
              'delay_to_first_forecast', 'avg_max_run_length', 'filename',
              'check_url_name', 'member']

model_map = {'gfs_0p25': GFS_0P25_1HR, 'nam_12km': NAM_CONUS,
             'rap': RAP, 'hrrr_hourly': HRRR_HOURLY,
             'hrrr_subhourly': HRRR_SUBHOURLY,
             'gefs': GEFS_0P50_RAW}

NC_TBL = """
TMP:surface:ignore
TMP:2 m above ground:t2m
UGRD:10 m above ground:ignore
VGRD:10 m above ground:ignore
TCDC:entire atmosphere:tcdc
TCDC:entire atmosphere (considered as a single layer):tcdc
DSWRF:surface:dswrf
VBDSF:surface:vbdsf
VDDSF:surface:vddsf
WIND:10 m above ground:si10
"""

COMPRESSION = {'zlib': True, 'complevel': 1, 'shuffle': True,
               'fletcher32': True}
DEFAULT_ENCODING = {
    # stores the time steps, not an actual time
    'time': {'dtype': 'int16'},
    'latitude': {'dtype': 'float32', 'least_significant_digit': 3},
    'longitude': {'dtype': 'float32', 'least_significant_digit': 3}
}
LEAST_SIGNIFICANT_DIGITS = {
    't2m': 2,
    'tcdc': 1,
    'si10': 2,
    'dswrf': 1,
    'vbdsf': 1,
    'vddsf': 1
}


[docs]async def get_with_retries(get_func, *args, retries=5, **kwargs): """ Call get_func and retry if the request fails Parameters ---------- get_func : function Function that performs an aiohttp call to be retried retries : int Number of retries before raising the error *args Passed to get_func **kwargs Passed to get_func Returns ------- Result of get_func Raises ------ aiohttp.ClientResponseError When get_func fails after retrying retries times """ retried = 0 while True: try: res = await get_func(*args, **kwargs) except aiohttp.ClientResponseError as e: logger.warning('Request to %s failed with code %s, retrying', e.request_info.url, e.status) retried += 1 if retried >= retries: raise except aiohttp.ClientError: logger.warning('Request failed in connection, retrying') retried += 1 if retried >= retries: raise else: return res await asyncio.sleep(60)
def _simple_model(model): return model['dir'].split('.')[0][1:]
[docs]@abort_all_on_exception async def get_available_dirs(session, model): """Get the available date/date+init_hr directories""" simple_model = _simple_model(model) is_init_date = 'init_date' in model['dir'] model_url = BASE_URL + model['endpoint'] async def _get(model_url): async with session.get(model_url, raise_for_status=True) as r: return await r.text() page = await get_with_retries(_get, model_url) if is_init_date: list_avail_days = set( re.findall(simple_model + '\\.([0-9]{8})', page)) else: list_avail_days = set( re.findall(simple_model + '\\.([0-9]{10})', page)) return list_avail_days
def _process_params(model, init_time): """Generator to get the parameters for fetching forecasts for a given model at a given init_time""" params = model.copy() params.update(DOMAIN) valid_hr_gen = params['valid_hr_gen'](init_time.hour) for p in EXTRA_KEYS: if p in params: del params[p] params['dir'] = params['dir'].format( init_date=init_time.strftime('%Y%m%d'), init_dt=init_time.strftime('%Y%m%d%H'), init_hr=init_time.strftime('%H')) for i in valid_hr_gen: newp = params.copy() newp['file'] = newp['file'].format( init_hr=init_time.hour, valid_hr=i) yield newp
[docs]async def check_next_inittime(session, init_time, model): """Check if data from the next model initializtion time is available""" next_inittime = init_time + pd.Timedelta(model['update_freq']) simple_model = _simple_model(model) next_init_url = (CHECK_URL.format(model.get('check_url_name', simple_model)) + model['dir'].format( init_date=next_inittime.strftime('%Y%m%d'), init_dt=next_inittime.strftime('%Y%m%d%H'), init_hr=next_inittime.strftime('%H')) + '/' + model['file'].format(init_hr=next_inittime.hour, valid_hr=0)) try: async with session.head(next_init_url) as r: if r.status == 200: logger.warning( 'Skipping to next init time at %s for %s %s', next_inittime, simple_model, model.get('member', '')) return True else: return False except aiohttp.ClientOSError: return False
[docs]def get_filename(basepath, init_time, params): filename = ( basepath / init_time.strftime('%Y/%m/%d/%H') / params['file']) if not filename.suffix == '.grib2': filename = filename.with_suffix(filename.suffix + '.grib2') return filename
[docs]async def files_to_retrieve(session, model, modelpath, init_time): """Generator to return the parameters of the available files for download """ possible_params = _process_params(model, init_time) simple_model = _simple_model(model) first_file_modified_at = None for next_params in possible_params: logger.debug('Checking if file is available for %s', next_params) filename = get_filename(modelpath, init_time, next_params) if filename.exists(): yield next_params continue next_model_url = (CHECK_URL.format(model.get('check_url_name', simple_model)) + next_params['dir'] + '/' + next_params['file']) while True: # is the next file ready? try: logger.debug('Calling HEAD %s', next_model_url) async with session.head( next_model_url, raise_for_status=True) as r: if first_file_modified_at is None: first_file_modified_at = pd.Timestamp( r.headers['Last-Modified']) logger.debug('First file was available at %s %s', first_file_modified_at, model.get('member', '')) logger.debug('HEAD returned %s', next_model_url) except aiohttp.ClientResponseError as e: if e.status == 404: # Not found logger.debug( 'Next file not ready yet for %s at %s %s\n%s %s', simple_model, init_time, model.get('member', ''), e.status, e.message) else: logger.error( 'Error checking if next file is ready %s\n' '%s %s', model.get('member', ''), e.status, e.message) except aiohttp.ClientError as e: logger.warning('Error in checking for next file %s %s', model.get('member', ''), str(e)) except Exception: logger.exception( 'Uncaught exception when checking for next file %s', model.get('member', '')) else: logger.debug('%s/%s is ready for download', next_params['dir'], next_params['file']) yield next_params break # if the current time is after 'avg_max_run_length' after the # first forecast was available, check if forecasts from the # next model run are available and if so, move on to that run if ( first_file_modified_at is not None and pd.Timestamp.utcnow() > first_file_modified_at + pd.Timedelta(model['avg_max_run_length']) ): nextrun_available = await check_next_inittime( session, init_time, model) if nextrun_available: return await asyncio.sleep(model['time_between_fcst_hrs'])
async def _get_file(session, url, params, tmpfile, chunksize): timeout = aiohttp.ClientTimeout(total=660, connect=60, sock_read=600) async with session.get(url, params=params, raise_for_status=True, timeout=timeout) as r: with open(tmpfile, 'wb') as f: async for chunk in r.content.iter_chunked(chunksize * 1024): f.write(chunk) @abort_all_on_exception async def fetch_grib_files(session, params, basepath, init_time, chunksize): """ Fetch the grib file referenced by params and save to the appropriate folder under basepath. Retrieves the files in chunks. Parameters ---------- session : aiohttp.ClientSession The HTTP session to use to request the file params : dict Parameters to include in the GET query to params['endpoint'] basepath : Path Path to the base directory where files will be saved. New directories under basepath of the form basepath / year / month / day / hour will be created as necessary. init_time : datetime Initialization time of the model we're trying to fetch chunksize : int Chunksize in KB to fetch and save at once Returns ------- filename : Path Path of the successfully saved file Raises ------ aiohttp.ClientResponseError When the HTTP request fails/returns a status code >= 400 """ endpoint = params.pop('endpoint') url = BASE_URL + endpoint filename = get_filename(basepath, init_time, params) if filename.exists(): return filename if not filename.parent.is_dir(): filename.parent.mkdir(parents=True) logger.info('Getting file %s', filename) tmpfile = filename.with_name('.tmp_' + filename.name) await get_with_retries(_get_file, session, url, params, tmpfile, chunksize) tmpfile.rename(filename) logging.debug('Successfully saved %s', filename) return filename
[docs]@abort_all_on_exception async def process_grib_to_netcdf(folder, model): logger.info('Converting GRIB files to NetCDF with wgrib2 %s', model.get('member', '')) _handle, nctmp = tempfile.mkstemp() os.close(_handle) nctmp = Path(nctmp) # possible that this holds up processing on file io # so run in separate process grib_prefix = model['file'].split('.')[0] wind_in_model = 'var_WIND' not in model try: await run_in_executor(_process_grib, folder, nctmp, grib_prefix, wind_in_model) except Exception: nctmp.unlink() raise return nctmp
def _process_grib(folder, nctmp, grib_prefix, wind_in_model): if wind_in_model: # need to add wind to the grib files for grbfile in folder.glob(f'{grib_prefix}*.grib2'): path = str(grbfile.resolve()) try: subprocess.run( f'wgrib2 {path} -wind_speed - -match "(UGRD|VGRD)" | ' f'wgrib2 - -append -grib_out {path}', shell=True, check=True, capture_output=True) except subprocess.CalledProcessError as e: logger.error('Error converting wind in file %s\n%s', grbfile, e.stderr) raise OSError if 'subhourly' in str(folder): # for hrrr subhourly, assume TMP and VDDSF have no average but others fmt = "-match 'ave|TMP|VDDSF'" else: fmt = '' with tempfile.NamedTemporaryFile(mode='w') as tmp_nc_tbl: tmp_nc_tbl.write(NC_TBL) tmp_nc_tbl.flush() try: subprocess.run( f'cat {str(folder)}/{grib_prefix}*.grib2 | ' f'wgrib2 - -nc4 -nc_table {tmp_nc_tbl.name} {fmt} -append -netcdf {str(nctmp)}', # NOQA shell=True, check=True, capture_output=True) except subprocess.CalledProcessError as e: logger.error('Error converting grib files %s*.grib2 to netCDF\n%s', grib_prefix, e.stderr) raise OSError return nctmp def _optimize_netcdf(nctmpfile, out_path): """Optmizes the netcdf file for accessing by time slice.""" ds = xr.open_dataset(nctmpfile, engine='netcdf4', backend_kwargs={'mode': 'r'}) # time is likely unlimited if 'unlimited_dims' in ds.encoding: ds.encoding = {} chunksizes = [] for dim, size in ds.dims.items(): if dim == 'time': chunksizes.append(size) else: chunksizes.append(50) encoding = DEFAULT_ENCODING.copy() encoding.update( {key: {'dtype': 'float32', 'least_significant_digit': LEAST_SIGNIFICANT_DIGITS[key], 'chunksizes': chunksizes, **COMPRESSION} for key in ds.keys()}) ds.to_netcdf(out_path, format='NETCDF4', mode='w', unlimited_dims=None, encoding=encoding) ds.close()
[docs]async def optimize_netcdf(nctmpfile, final_path): """Compress the netcdf file and adjust the chunking for fast time-series access""" logger.info('Optimizing NetCDF file to save at %s', final_path) parent = Path(final_path.parent) if not parent.is_dir(): parent.mkdir(parents=True) _handle, tmp_path = tempfile.mkstemp(dir=parent) os.close(_handle) tmp_path = Path(tmp_path) # possible that this leaks memory, so run in separate process # that is restarted after a number of jobs try: await run_in_executor(_optimize_netcdf, nctmpfile, tmp_path) except Exception: tmp_path.unlink() raise else: tmp_path.rename(final_path) final_path.chmod(stat.S_IRGRP | stat.S_IRUSR | stat.S_IROTH | stat.S_IWUSR) logger.info('Done optimizing NetCDF at %s', final_path) finally: nctmpfile.unlink()
[docs]async def sleep_until_inittime(inittime, model): # don't bother requesting a file until it might be ready now = pd.Timestamp.utcnow() likely_ready_time = inittime + pd.Timedelta( model['delay_to_first_forecast']) if likely_ready_time > now: seconds = (likely_ready_time - now).total_seconds() logger.info('Sleeping %0.1fs for next model run %s', seconds, model.get('member', '')) await asyncio.sleep(seconds)
[docs]async def startup_find_next_runtime(model_path, session, model): """Find the next model run to get based on what is available on NOMADS and what .nc files are present locally""" dirs = await get_available_dirs(session, model) no_file = [] first = pd.Timestamp('20000101T0000Z') max_time = first for dir_ in dirs: if len(dir_) == 8: path = model_path / dir_[:4] / dir_[4:6] / dir_[6:8] for hr in range(0, 24, int(model['update_freq'].strip('h'))): hrpath = path / f'{hr:02d}' hrtime = pd.Timestamp(f'{dir_[:8]}T{hr:02d}00Z') if not (hrpath / model['filename']).exists(): no_file.append(hrtime) else: max_time = max(max_time, hrtime) else: hrpath = model_path / dir_[:4] / dir_[4:6] / dir_[6:8] / dir_[8:10] hrtime = pd.Timestamp(f'{dir_[:8]}T{dir_[8:10]}00Z') if not (hrpath / model['filename']).exists(): no_file.append(hrtime) else: max_time = max(max_time, hrtime) if len(no_file) == 0: if max_time > first: inittime = max_time + pd.Timedelta(model['update_freq']) else: # No available dirs? raise ValueError( 'Failed to find next available model from NOMADS %s' % model.get('member', '')) else: inittime = min(no_file) await sleep_until_inittime(inittime, model) return inittime
[docs]async def next_run_time(inittime, modelpath, model): inittime += pd.Timedelta(model['update_freq']) # check if nc file exists for this inittime if ( modelpath / inittime.strftime('%Y/%m/%d/%H') / model['filename']).exists(): return await next_run_time(inittime, modelpath, model) await sleep_until_inittime(inittime, model) return inittime
async def _run_loop(session, model, modelpath, chunksize, once, use_tmp): inittime = await startup_find_next_runtime(modelpath, session, model) while True: fetch_tasks = set() finalpath = (modelpath / inittime.strftime('%Y/%m/%d/%H') / model['filename']) if use_tmp: _tmpdir = tempfile.TemporaryDirectory() gribdir = Path(_tmpdir.name) else: gribdir = modelpath async for params in files_to_retrieve(session, model, gribdir, inittime): logger.debug('Processing parameters %s', params) fetch_tasks.add(asyncio.create_task( fetch_grib_files(session, params, gribdir, inittime, chunksize))) files = await asyncio.gather(*fetch_tasks) if len(files) != 0: # skip to next inittime path_to_files = files[0].parent try: nctmpfile = await process_grib_to_netcdf(path_to_files, model) await optimize_netcdf(nctmpfile, finalpath) except Exception: raise if use_tmp: _tmpdir.cleanup() else: # remove grib files for f in files: f.unlink() if once: break else: logger.info('Moving on to next model run') inittime = await next_run_time(inittime, modelpath, model)
[docs]async def run(basepath, model_name, chunksize, once=False, use_tmp=False): session = make_session() modelpath = basepath / model_name if model_name != 'gefs': model = model_map[model_name] await _run_loop(session, model, modelpath, chunksize, once, use_tmp) else: base_model = model_map[model_name].copy() members = base_model.pop('members') member_loops = set() for member in members: model = base_model.copy() model['member'] = member model['file'] = model['file'].replace('{stat_or_member}', member) model['filename'] = model['filename'].format(stat_or_member=member) member_loops.add(asyncio.create_task( _run_loop(session, model, modelpath, chunksize, once, use_tmp))) await asyncio.wait(member_loops) await session.close()
[docs]async def optimize_only(path_to_files, model_name): model = model_map[model_name] nctmpfile = await process_grib_to_netcdf(path_to_files, model) try: await optimize_netcdf( nctmpfile, path_to_files / f'{model_name}.nc') except Exception: raise else: # remove grib files for f in path_to_files.glob(f'{model["file"].split(".")[0]}*.grib2'): f.unlink()
[docs]def check_wgrib2(): if shutil.which('wgrib2') is None: logger.error('wgrib2 was not found in PATH and is required') sys.exit(1)