Source code for solarforecastarbiter.reference_forecasts.main

"""
Make benchmark irradiance and power forecasts.

The functions in this module use the
:py:mod:`solarforecastarbiter.datamodel` objects.
"""
from collections import namedtuple
import itertools
import json
import logging
import re


import pandas as pd


from solarforecastarbiter import datamodel, pvmodel
from solarforecastarbiter.io import api
from solarforecastarbiter.io.fetch import nwp as fetch_nwp
from solarforecastarbiter.reference_forecasts import persistence, models, utils


logger = logging.getLogger(__name__)


[docs]def run_nwp(forecast, model, run_time, issue_time): """ Calculate benchmark irradiance and power forecasts for a Forecast. Forecasts may be run operationally or retrospectively. For operational forecasts, *run_time* is typically set to now. For retrospective forecasts, *run_time* is the time by which the forecast should be run so that it could have been be delivered for the *issue_time*. Forecasts will only use data with timestamps before *run_time*. Parameters ---------- forecast : datamodel.Forecast The metadata of the desired forecast. model : function NWP model loading and processing function. See :py:mod:`solarforecastarbiter.reference_forecasts.models` for options. run_time : pd.Timestamp Run time of the forecast. issue_time : pd.Timestamp Issue time of the forecast run. Returns ------- ghi : pd.Series dni : pd.Series dhi : pd.Series air_temperature : pd.Series wind_speed : pd.Series ac_power : pd.Series Examples -------- The following code would return hourly average forecasts derived from the subhourly HRRR model. >>> from solarforecastarbiter import datamodel >>> from solarforecastarbiter.reference_forecasts import models >>> init_time = pd.Timestamp('20190328T1200Z') >>> start = pd.Timestamp('20190328T1300Z') # typical available time >>> end = pd.Timestamp('20190329T1300Z') # 24 hour forecast >>> modeling_parameters = datamodel.FixedTiltModelingParameters( ... ac_capacity=10, dc_capacity=15, ... temperature_coefficient=-0.004, dc_loss_factor=0, ... ac_loss_factor=0) >>> power_plant = datamodel.SolarPowerPlant( ... name='Test plant', latitude=32.2, longitude=-110.9, ... elevation=715, timezone='America/Phoenix', ... modeling_parameters = modeling_parameters) >>> ghi, dni, dhi, temp_air, wind_speed, ac_power = run( ... power_plant, models.hrrr_subhourly_to_hourly_mean, ... init_time, start, end) """ fetch_metadata = fetch_nwp.model_map[models.get_nwp_model(model)] init_time = utils.get_init_time(run_time, fetch_metadata) start, end = utils.get_forecast_start_end(forecast, issue_time) site = forecast.site logger.info( 'Calculating forecast for model %s starting at %s from %s to %s', model, init_time, start, end) *forecasts, resampler, solar_position_calculator = model( site.latitude, site.longitude, site.elevation, init_time, start, end) if isinstance(site, datamodel.SolarPowerPlant): solar_position = solar_position_calculator() ac_power = pvmodel.irradiance_to_power( site.modeling_parameters, solar_position['apparent_zenith'], solar_position['azimuth'], *forecasts) else: ac_power = None # resample data after power calculation resampled = list(map(resampler, (*forecasts, ac_power))) nwpoutput = namedtuple( 'NWPOutput', ['ghi', 'dni', 'dhi', 'air_temperature', 'wind_speed', 'ac_power']) return nwpoutput(*resampled)
[docs]def run_persistence(session, observation, forecast, run_time, issue_time, index=False): """ Run a persistence *forecast* for an *observation*. For intraday forecasts, the *index* argument controls if the forecast is constructed using persistence of the measured values (*index = False*) or persistence using clear sky index or AC power index. For day ahead forecasts, only persistence of measured values (*index = False*) is supported. Forecasts may be run operationally or retrospectively. For operational forecasts, *run_time* is typically set to now. For retrospective forecasts, *run_time* is the time by which the forecast should be run so that it could have been be delivered for the *issue_time*. Forecasts will only use data with timestamps before *run_time*. The persistence *window* is the time over which the persistence quantity (irradiance, power, clear sky index, or power index) is averaged. The persistence window is automatically determined from the *forecast* attributes: * Intraday persistence forecasts: *window = run_time - forecast.run_length*. No longer than 1 hour. * Day ahead forecasts: *window = forecast.interval_length*. Users that would like more flexibility may use the lower-level functions in :py:mod:`solarforecastarbiter.reference_forecasts.persistence`. Parameters ---------- session : api.Session The session object to use to request data from the SolarForecastArbiter API. observation : datamodel.Observation The metadata of the observation to be used to create the forecast. forecast : datamodel.Forecast The metadata of the desired forecast. run_time : pd.Timestamp Run time of the forecast. issue_time : pd.Timestamp Issue time of the forecast run. index : bool, default False If False, use persistence of observed value. If True, use persistence of clear sky or AC power index. Returns ------- forecast : pd.Series Forecast conforms to the metadata specified by the *forecast* argument. Raises ------ ValueError If forecast and issue_time are incompatible. ValueError If persistence window < observation.interval_length. ValueError If forecast.run_length = 1 day and forecast period is not midnight to midnight. ValueError If forecast.run_length = 1 day and index=True. ValueError If instantaneous forecast and instantaneous observation interval lengths do not match. ValueError If average observations are used to make instantaneous forecast. """ forecast_start, forecast_end = utils.get_forecast_start_end( forecast, issue_time) intraday = utils._is_intraday(forecast) if not intraday: # raise ValueError if not intraday and not midnight to midnight utils._check_midnight_to_midnight(forecast_start, forecast_end) data_start, data_end = utils.get_data_start_end( observation, forecast, run_time) def load_data(observation, data_start, data_end): df = session.get_observation_values(observation.observation_id, data_start, data_end, observation.interval_label) df = df.tz_convert(observation.site.timezone) return df['value'] if intraday and index: fx = persistence.persistence_scalar_index( observation, data_start, data_end, forecast_start, forecast_end, forecast.interval_length, forecast.interval_label, load_data) elif intraday and not index: fx = persistence.persistence_scalar( observation, data_start, data_end, forecast_start, forecast_end, forecast.interval_length, forecast.interval_label, load_data) elif not intraday and not index: fx = persistence.persistence_interval( observation, data_start, data_end, forecast_start, forecast.interval_length, forecast.interval_label, load_data) else: raise ValueError( 'index=True not supported for forecasts with run_length >= 1day') return fx
def all_equal(iterable): "Returns True if all the elements are equal to each other" g = itertools.groupby(iterable) return next(g, True) and not next(g, False) def _verify_nwp_forecasts_compatible(fx_group): """Verify that all the forecasts grouped by piggyback_on are compatible """ errors = [] if not len(fx_group.model.unique()) == 1: errors.append('model') for var in ('issue_time_of_day', 'lead_time_to_start', 'interval_length', 'run_length', 'interval_label', 'interval_value_type', 'site'): if not all_equal(getattr(fx, var) for fx in fx_group.forecast): errors.append(var) return errors def _is_reference_forecast(extra_params_string): match = re.search('is_reference_forecast(["\\s\\:]*)true', extra_params_string, re.I) return match is not None
[docs]def find_reference_nwp_forecasts(forecasts, run_time=None): """ Sort through all *forecasts* to find those that should be generated by the Arbiter from NWP models. The forecast must have a *model* key in *extra_parameters* (formatted as a JSON string). If *piggyback_on* is also defined in *extra_parameters*, it should be the forecast_id of another forecast that has the same parameters, including site, except the variable. Parameters ---------- forecasts : list of datamodel.Forecasts The forecasts that should be filtered to find references. run_time : pandas.Timestamp or None, default None The run_time of that forecast generation is taking place. If not None, the next issue time for each forecast is added to the output. Returns ------- pandas.DataFrame NWP reference forecasts with index of forecast_id and columns (forecast, piggyback_on, model, next_issue_time). """ df_vals = [] for fx in forecasts: # more explicit than filter() if not _is_reference_forecast(fx.extra_parameters): logger.debug('Forecast %s is not labeled as a reference forecast', fx.forecast_id) continue try: extra_parameters = json.loads(fx.extra_parameters) except json.JSONDecodeError: logger.warning( 'Failed to decode extra_parameters for %s: %s as JSON', fx.name, fx.forecast_id) continue try: model = extra_parameters['model'] except KeyError: logger.error( 'Forecast, %s: %s, has no model. Cannot make forecast.', fx.name, fx.forecast_id) continue if run_time is not None: next_issue_time = utils.get_next_issue_time(fx, run_time) else: next_issue_time = None piggyback_on = extra_parameters.get('piggyback_on', fx.forecast_id) df_vals.append((fx.forecast_id, fx, piggyback_on, model, next_issue_time)) forecast_df = pd.DataFrame( df_vals, columns=['forecast_id', 'forecast', 'piggyback_on', 'model', 'next_issue_time'] ).set_index('forecast_id') return forecast_df
[docs]def process_nwp_forecast_groups(session, run_time, forecast_df): """ Groups NWP forecasts based on piggyback_on, calculates the forecast as appropriate for *run_time*, and uploads the values to the API. Parameters ---------- session : io.api.APISession API session for uploading forecast values run_time : pandas.Timestamp Run time of the forecast. Also used along with the forecast metadata to determine the issue_time of the forecast. forecast_df : pandas.DataFrame Dataframe of the forecast objects as procduced by :py:func:`solarforecastarbiter.reference_forecasts.main.find_reference_nwp_forecasts``. """ # NOQA for run_for, group in forecast_df.groupby('piggyback_on'): logger.info('Computing forecasts for group %s', run_for) errors = _verify_nwp_forecasts_compatible(group) if errors: logger.error( 'Not all forecasts compatible in group with %s. ' 'The following parameters may differ: %s', run_for, errors) continue try: key_fx = group.loc[run_for].forecast except KeyError: logger.error('Forecast, %s, that others are piggybacking on not ' 'found', run_for) continue model = getattr(models, group.loc[run_for].model) issue_time = group.loc[run_for].next_issue_time if issue_time is None: issue_time = utils.get_next_issue_time(key_fx, run_time) try: nwp_result = run_nwp(key_fx, model, run_time, issue_time) except FileNotFoundError as e: logger.error('Could not process group of %s, %s', run_for, str(e)) continue for fx_id, fx in group['forecast'].iteritems(): fx_vals = getattr(nwp_result, fx.variable) if fx_vals is None: logger.warning('No forecast produced for %s in group with %s', fx_id, run_for) continue logger.info('Posting values %s for %s:%s issued at %s', len(fx_vals), fx.name, fx_id, issue_time) session.post_forecast_values(fx_id, fx_vals)
[docs]def make_latest_nwp_forecasts(token, run_time, issue_buffer, base_url=None): """ Make all reference NWP forecasts for *run_time* that are within *issue_buffer* of the next issue time for the forecast. For example, this function may run in a cronjob every five minutes with *run_time* set to now. By setting *issue_buffer* to '5min', only forecasts that should be issued in the next five minutes will be generated on each run. Parameters ---------- token : str Access token for the API run_time : pandas.Timestamp Run time of the forecast generation issue_buffer : pandas.Timedelta Maximum time between *run_time* and the next initialization time of each forecast that will be updated base_url : str or None, default None Alternate base_url of the API """ session = api.APISession(token, base_url=base_url) forecasts = session.list_forecasts() forecast_df = find_reference_nwp_forecasts(forecasts, run_time) execute_for = forecast_df[ forecast_df.next_issue_time <= run_time + issue_buffer] if execute_for.empty: logger.info('No forecasts to be made at %s', run_time) return return process_nwp_forecast_groups(session, run_time, execute_for)