"""
Make benchmark irradiance and power forecasts.
The functions in this module use the
:py:mod:`solarforecastarbiter.datamodel` objects.
"""
from collections import namedtuple, defaultdict
import itertools
import json
import logging
import re
import pandas as pd
from solarforecastarbiter import datamodel, pvmodel
from solarforecastarbiter.utils import generate_continuous_chunks
from solarforecastarbiter.io import api
from solarforecastarbiter.io.utils import adjust_timeseries_for_interval_label
from solarforecastarbiter.reference_forecasts import persistence, models, utils
logger = logging.getLogger(__name__)
[docs]def run_nwp(forecast, model, run_time, issue_time):
"""
Calculate benchmark irradiance and power forecasts for a Forecast or
ProbabilisticForecast.
Forecasts may be run operationally or retrospectively. For
operational forecasts, *run_time* is typically set to now. For
retrospective forecasts, *run_time* is the time by which the
forecast should be run so that it could have been be delivered for
the *issue_time*. Forecasts will only use data with timestamps
before *run_time*.
Parameters
----------
forecast : datamodel.Forecast or datamodel.ProbabilisticForecast
The metadata of the desired forecast.
model : function
NWP model loading and processing function.
See :py:mod:`solarforecastarbiter.reference_forecasts.models`
for options.
run_time : pd.Timestamp
Run time of the forecast.
issue_time : pd.Timestamp
Issue time of the forecast run.
Returns
-------
ghi : pd.Series or pd.DataFrame
dni : pd.Series or pd.DataFrame
dhi : pd.Series or pd.DataFrame
air_temperature : pd.Series or pd.DataFrame
wind_speed : pd.Series or pd.DataFrame
ac_power : pd.Series or pd.DataFrame
Series are returned for deterministic forecasts, DataFrames are
returned for probabilisic forecasts.
Examples
--------
The following code would return hourly average forecasts derived
from the subhourly HRRR model.
.. testsetup::
import datetime
from solarforecastarbiter import datamodel
from solarforecastarbiter.reference_forecasts import models
from solarforecastarbiter.reference_forecasts.main import *
>>> run_time = pd.Timestamp('20190515T0200Z')
>>> issue_time = pd.Timestamp('20190515T0000Z')
>>> modeling_parameters = datamodel.FixedTiltModelingParameters(
... surface_tilt=30, surface_azimuth=180,
... ac_capacity=10, dc_capacity=15,
... temperature_coefficient=-0.4, dc_loss_factor=0,
... ac_loss_factor=0)
>>> power_plant = datamodel.SolarPowerPlant(
... name='Test plant', latitude=32.2, longitude=-110.9,
... elevation=715, timezone='America/Phoenix',
... modeling_parameters=modeling_parameters)
>>> forecast = datamodel.Forecast(
... name='Test plant fx',
... site=power_plant,
... variable='ac_power',
... interval_label='ending',
... interval_value_type='interval_mean',
... interval_length='1h',
... issue_time_of_day=datetime.time(hour=0),
... run_length=pd.Timedelta('24h'),
... lead_time_to_start=pd.Timedelta('0h'))
>>> ghi, dni, dhi, temp_air, wind_speed, ac_power = run_nwp(
... forecast, models.hrrr_subhourly_to_hourly_mean,
... run_time, issue_time)
"""
# bury import in the function so that io.fetch.nwp libraries (aoihttp, etc)
# are restricted to this function. might be better to extract the relevant
# model metadata into io.nwp, but that would be more invasive.
from solarforecastarbiter.io.fetch import nwp as fetch_nwp
fetch_metadata = fetch_nwp.model_map[models.get_nwp_model(model)]
# absolute date and time for model run most recently available
# as of run_time
init_time = utils.get_init_time(run_time, fetch_metadata)
# absolute start and end times. interval_label still controls
# inclusive/exclusive
start, end = utils.get_forecast_start_end(forecast, issue_time)
site = forecast.site
logger.info(
'Calculating forecast for model %s starting at %s from %s to %s',
model, init_time, start, end)
# model will account for interval_label
*forecasts, resampler, solar_position_calculator = model(
site.latitude, site.longitude, site.elevation,
init_time, start, end, forecast.interval_label)
if isinstance(site, datamodel.SolarPowerPlant):
solar_position = solar_position_calculator()
if isinstance(forecasts[0], pd.DataFrame):
# must iterate over columns because pvmodel.irradiance_to_power
# calls operations that do not properly broadcast Series along
# a DataFrame time index. pvlib.irradiance.haydavies operation
# (AI = dni_ens / dni_extra) is the known culprit, though there
# may be more.
ac_power = {}
for col in forecasts[0].columns:
member_fx = [fx.get(col) for fx in forecasts]
member_ac_power = pvmodel.irradiance_to_power(
site.modeling_parameters,
solar_position['apparent_zenith'],
solar_position['azimuth'],
*member_fx)
ac_power[col] = member_ac_power
ac_power = pd.DataFrame(ac_power)
else:
ac_power = pvmodel.irradiance_to_power(
site.modeling_parameters, solar_position['apparent_zenith'],
solar_position['azimuth'], *forecasts)
else:
ac_power = None
# resample data after power calculation
resampled = list(map(resampler, (*forecasts, ac_power)))
nwpoutput = namedtuple(
'NWPOutput', ['ghi', 'dni', 'dhi', 'air_temperature', 'wind_speed',
'ac_power'])
return nwpoutput(*resampled)
def _default_load_data(session):
def load_data(observation, data_start, data_end):
df = session.get_observation_values(observation.observation_id,
data_start, data_end,
observation.interval_label)
df = df.tz_convert(observation.site.timezone)
return df['value']
return load_data
[docs]def run_persistence(session, observation, forecast, run_time, issue_time,
index=False, load_data=None):
"""
Run a persistence *forecast* for an *observation*.
For intraday forecasts, the *index* argument controls if the
forecast is constructed using persistence of the measured values
(*index = False*) or persistence using clear sky index or AC power
index.
For day ahead forecasts, only persistence of measured values
(*index = False*) is supported.
Forecasts may be run operationally or retrospectively. For
operational forecasts, *run_time* is typically set to now. For
retrospective forecasts, *run_time* is the time by which the
forecast should be run so that it could have been be delivered for
the *issue_time*. Forecasts will only use data with timestamps
before *run_time*.
The persistence *window* is the time over which the persistence
quantity (irradiance, power, clear sky index, or power index) is
averaged. The persistence window is automatically determined
from the *forecast* attributes:
- Intraday persistence forecasts:
+ ``window = forecast.run_length``. No longer than 1 hour.
- Day ahead forecasts (all but net load) and week ahead forecasts (net
load only):
+ ``window = forecast.interval_length``.
Users that would like more flexibility may use the lower-level
functions in
:py:mod:`solarforecastarbiter.reference_forecasts.persistence`.
Parameters
----------
session : api.Session
The session object to use to request data from the
SolarForecastArbiter API.
observation : datamodel.Observation
The metadata of the observation to be used to create the
forecast.
forecast : datamodel.Forecast
The metadata of the desired forecast.
run_time : pd.Timestamp
Run time of the forecast.
issue_time : pd.Timestamp
Issue time of the forecast run.
index : bool, default False
If False, use persistence of observed value. If True, use
persistence of clear sky or AC power index.
load_data : function
Function to load the observation data 'value' series given
(observation, data_start, data_end) arguments. Typically,
calls `session.get_observation_values` and selects the 'value'
column. May also have data preloaded to then slice from
data_start to data_end.
Returns
-------
forecast : pd.Series
Forecast conforms to the metadata specified by the *forecast*
argument.
Raises
------
ValueError
If forecast and issue_time are incompatible.
ValueError
If data is required from after run_time.
ValueError
If persistence window < observation.interval_length.
ValueError
If forecast.run_length => 1 day and index=True.
ValueError
If instantaneous forecast and instantaneous observation interval
lengths do not match.
ValueError
If average observations are used to make instantaneous forecast.
Notes
-----
For non-intraday net load forecasts, this function will use a weekahead
persistence due to the fact that net load exhibits stronger correlation
week-to-week than day-to-day. For example, the net load on a Monday tends
to look more similar to the previous Monday that it does to the previous
day (Sunday).
For probabilistic forecasts, this function will always use the
persistence ensemble time of day method. 30 days of data is pulled.
"""
utils.check_persistence_compatibility(observation, forecast, index)
forecast_start, forecast_end = utils.get_forecast_start_end(
forecast, issue_time, False)
intraday = utils._is_intraday(forecast)
if load_data is None:
load_data = _default_load_data(session)
data_start, data_end = utils.get_data_start_end(
observation, forecast, run_time, issue_time)
if data_end > run_time:
raise ValueError(
'Persistence forecast requires data from after run_time')
if isinstance(forecast, datamodel.ProbabilisticForecast):
cvs = [f.constant_value for f in forecast.constant_values]
fx = persistence.persistence_probabilistic_timeofday(
observation, data_start, data_end, forecast_start, forecast_end,
forecast.interval_length, forecast.interval_label, load_data,
forecast.axis, cvs)
elif intraday and index:
fx = persistence.persistence_scalar_index(
observation, data_start, data_end, forecast_start, forecast_end,
forecast.interval_length, forecast.interval_label, load_data)
elif intraday and not index:
fx = persistence.persistence_scalar(
observation, data_start, data_end, forecast_start, forecast_end,
forecast.interval_length, forecast.interval_label, load_data)
elif not intraday and not index:
fx = persistence.persistence_interval(
observation, data_start, data_end, forecast_start,
forecast.interval_length, forecast.interval_label, load_data)
else: # pragma: no cover
raise ValueError(
'index=True not supported for forecasts with run_length >= 1day')
return fx
def all_equal(iterable):
"Returns True if all the elements are equal to each other"
g = itertools.groupby(iterable)
return next(g, True) and not next(g, False)
def _verify_nwp_forecasts_compatible(fx_group):
"""Verify that all the forecasts grouped by piggyback_on are compatible
"""
errors = []
if not len(fx_group.model.unique()) == 1:
errors.append('model')
for var in ('issue_time_of_day', 'lead_time_to_start', 'interval_length',
'run_length', 'interval_label', 'interval_value_type',
'site'):
if not all_equal(getattr(fx, var) for fx in fx_group.forecast):
errors.append(var)
return errors
def _is_reference_forecast(extra_params_string):
match = re.search(r'is_reference_forecast(["\s\:]*)true',
extra_params_string, re.I)
return match is not None
[docs]def find_reference_nwp_forecasts(forecasts, run_time=None):
"""
Sort through all *forecasts* to find those that should be generated
by the Arbiter from NWP models. The forecast must have a *model* key
in *extra_parameters* (formatted as a JSON string). If *piggyback_on*
is also defined in *extra_parameters*, it should be the forecast_id
of another forecast that has the same parameters, including site,
except the variable.
Parameters
----------
forecasts : list of datamodel.Forecasts
The forecasts that should be filtered to find references.
run_time : pandas.Timestamp or None, default None
The run_time of that forecast generation is taking place. If not
None, the next issue time for each forecast is added to the output.
Returns
-------
pandas.DataFrame
NWP reference forecasts with index of forecast_id and columns
(forecast, piggyback_on, model, next_issue_time).
"""
df_vals = []
for fx in forecasts:
# more explicit than filter()
if not _is_reference_forecast(fx.extra_parameters):
logger.debug('Forecast %s is not labeled as a reference forecast',
fx.forecast_id)
continue
try:
extra_parameters = json.loads(fx.extra_parameters)
except json.JSONDecodeError:
logger.warning(
'Failed to decode extra_parameters for %s: %s as JSON',
fx.name, fx.forecast_id)
continue
try:
model = extra_parameters['model']
except KeyError:
logger.error(
'Forecast, %s: %s, has no model. Cannot make forecast.',
fx.name, fx.forecast_id)
continue
if run_time is not None:
next_issue_time = utils.get_next_issue_time(fx, run_time)
else:
next_issue_time = None
piggyback_on = extra_parameters.get('piggyback_on', fx.forecast_id)
df_vals.append((fx.forecast_id, fx, piggyback_on, model,
next_issue_time))
forecast_df = pd.DataFrame(
df_vals, columns=['forecast_id', 'forecast', 'piggyback_on', 'model',
'next_issue_time']
).set_index('forecast_id')
return forecast_df
def _post_forecast_values(session, fx, fx_vals, model_str):
if isinstance(fx, datamodel.ProbabilisticForecast):
if not model_str.startswith('gefs'):
raise ValueError(
'Can only process probabilisic forecast from GEFS')
if not isinstance(fx_vals, pd.DataFrame) or len(fx_vals.columns) != 21:
raise TypeError(
'Could not post probabilistic forecast values: '
'forecast values in unknown format')
# adjust columns to be constant values
cv_df = fx_vals.rename(columns={i: i * 5.0 for i in range(22)})
for cv_fx in fx.constant_values:
# will raise a KeyError if no match
cv_vals = cv_df[cv_fx.constant_value]
logger.debug('Posting %s values to %s', len(cv_vals),
cv_fx.forecast_id)
session.post_probabilistic_forecast_constant_value_values(
cv_fx.forecast_id, cv_vals
)
else:
session.post_forecast_values(fx.forecast_id, fx_vals)
[docs]def process_nwp_forecast_groups(session, run_time, forecast_df):
"""
Groups NWP forecasts based on piggyback_on, calculates the forecast as
appropriate for *run_time*, and uploads the values to the API.
Parameters
----------
session : io.api.APISession
API session for uploading forecast values
run_time : pandas.Timestamp
Run time of the forecast. Also used along with the forecast metadata
to determine the issue_time of the forecast.
forecast_df : pandas.DataFrame
Dataframe of the forecast objects as procduced by
:py:func:`solarforecastarbiter.reference_forecasts.main.find_reference_nwp_forecasts`.
""" # NOQA
for run_for, group in forecast_df.groupby('piggyback_on'):
_process_single_group(session, run_for, group, run_time)
def _process_single_group(session, run_for, group, run_time):
logger.info('Computing forecasts for group %s at %s', run_for, run_time)
errors = _verify_nwp_forecasts_compatible(group)
if errors:
logger.error(
'Not all forecasts compatible in group with %s. '
'The following parameters may differ: %s', run_for, errors)
return
try:
key_fx = group.loc[run_for].forecast
except KeyError:
logger.error('Forecast, %s, that others are piggybacking on not '
'found', run_for)
return
model_str = group.loc[run_for].model
model = getattr(models, model_str)
issue_time = group.loc[run_for].next_issue_time
if issue_time is None:
issue_time = utils.get_next_issue_time(key_fx, run_time)
try:
nwp_result = run_nwp(key_fx, model, run_time, issue_time)
except FileNotFoundError as e:
logger.error('Could not process group of %s, %s', run_for, str(e))
return
for fx_id, fx in group['forecast'].iteritems():
fx_vals = getattr(nwp_result, fx.variable)
if fx_vals is None:
logger.warning('No forecast produced for %s in group with %s',
fx_id, run_for)
continue
logger.info('Posting values %s for %s:%s issued at %s',
len(fx_vals), fx.name, fx_id, issue_time)
_post_forecast_values(session, fx, fx_vals, model_str)
[docs]def make_latest_nwp_forecasts(token, run_time, issue_buffer, base_url=None):
"""
Make all reference NWP forecasts for *run_time* that are within
*issue_buffer* of the next issue time for the forecast. For example,
this function may run in a cronjob every five minutes with *run_time*
set to now. By setting *issue_buffer* to '5min', only forecasts that
should be issued in the next five minutes will be generated on each
run. Only forecasts that belong to the same provider/organization
of the token user will be updated.
Parameters
----------
token : str
Access token for the API
run_time : pandas.Timestamp
Run time of the forecast generation
issue_buffer : pandas.Timedelta
Maximum time between *run_time* and the next initialization time of
each forecast that will be updated
base_url : str or None, default None
Alternate base_url of the API
"""
session, forecast_df = _get_nwp_forecast_df(token, run_time, base_url)
execute_for = forecast_df[
forecast_df.next_issue_time <= run_time + issue_buffer]
if execute_for.empty:
logger.info('No forecasts to be made at %s', run_time)
return
process_nwp_forecast_groups(session, run_time, execute_for)
def _get_nwp_forecast_df(token, run_time, base_url):
session = api.APISession(token, base_url=base_url)
user_info = session.get_user_info()
forecasts = session.list_forecasts()
forecasts += session.list_probabilistic_forecasts()
forecasts = [fx for fx in forecasts
if fx.provider == user_info['organization']]
forecast_df = find_reference_nwp_forecasts(forecasts, run_time)
return session, forecast_df
def _nwp_issue_time_generator(fx, gap_start, gap_end):
# max_run_time is the forecast issue time that will generate forecast
# that end before gap_end
max_run_time = utils.find_next_issue_time_from_last_forecast(
fx, gap_end - pd.Timedelta('1ns'))
# next_issue_time is the forecast issue time that will generate forecast
# values at gap_start
next_issue_time = utils.find_next_issue_time_from_last_forecast(
fx, gap_start)
while next_issue_time < max_run_time:
yield next_issue_time
next_issue_time = utils.get_next_issue_time(
fx, next_issue_time + pd.Timedelta('1ns'))
def _find_group_gaps(session, forecasts, start, end):
times = set()
for forecast in forecasts:
gaps = session.get_value_gaps(forecast, start, end)
for gap in gaps:
times |= set(_nwp_issue_time_generator(
forecast, gap[0], gap[1]))
return sorted(times)
[docs]def fill_nwp_forecast_gaps(token, start, end, base_url=None):
"""
Make all reference NWP forecasts that are missing from *start* to *end*.
Only forecasts that belong to the same provider/organization
of the token user will be updated.
Parameters
----------
token : str
Access token for the API
start : pandas.Timestamp
Start of the period to check and fill forecast gaps
end : pandas.Timestamp
End of the period to check and fill forecast gaps
base_url : str or None, default None
Alternate base_url of the API
"""
session, forecast_df = _get_nwp_forecast_df(token, None, base_url)
# go through each group separately
for run_for, group in forecast_df.groupby('piggyback_on'):
issue_times = _find_group_gaps(session, group.forecast.to_list(),
start, end)
group = group.copy()
for issue_time in issue_times:
group.loc[:, 'next_issue_time'] = issue_time
_process_single_group(session, run_for, group, issue_time)
def _is_reference_persistence_forecast(extra_params_string):
match = re.search(r'is_reference_persistence_forecast(["\s\:]*)true',
extra_params_string, re.I)
return match is not None
def _ref_persistence_check(fx, observation_dict, user_info, session):
if not _is_reference_persistence_forecast(fx.extra_parameters):
logger.debug(
'Forecast %s is not labeled as a reference '
'persistence forecast', fx.forecast_id)
return
if not fx.provider == user_info['organization']:
logger.debug(
"Forecast %s is not in user's organization",
fx.forecast_id)
return
try:
extra_parameters = json.loads(fx.extra_parameters)
except json.JSONDecodeError:
logger.warning(
'Failed to decode extra_parameters for %s: %s as JSON',
fx.name, fx.forecast_id)
return
try:
observation_id = extra_parameters['observation_id']
except KeyError:
logger.error(
'Forecast, %s: %s, has no observation_id to base forecasts'
' off of. Cannot make persistence forecast.',
fx.name, fx.forecast_id)
return
if observation_id not in observation_dict:
logger.error(
'Observation %s not in set of given observations.'
' Cannot generate persistence forecast for %s: %s.',
observation_id, fx.name, fx.forecast_id)
return
observation = observation_dict[observation_id]
index = extra_parameters.get('index_persistence', False)
obs_mint, obs_maxt = session.get_observation_time_range(observation_id)
if pd.isna(obs_maxt): # no observations to use anyway
logger.info(
'No observation values to use for %s: %s from observation %s',
fx.name, fx.forecast_id, observation_id)
return
return observation, index, obs_mint, obs_maxt
def generate_reference_persistence_forecast_parameters(
session, forecasts, observations, max_run_time):
"""Sort through all *forecasts* to find those that should be generated
by the Arbiter from persisting Observation values. The forecast
must have ``'is_reference_persistence_forecast': true`` and an
observation_id in Forecast.extra_parameters (formatted as a JSON
string). A boolean value for "index_persistence" in
Forecast.extra_parameters controls whether the persistence
forecast should be made adjusting for clear-sky/AC power index or
not.
Parameters
----------
session : solarforecastarbiter.io.api.APISession
forecasts : list of datamodel.Forecasts
The forecasts that should be filtered to find references.
observations : list of datamodel.Observations
Observations that will are available to use to fetch values
and make persistence forecasts.
max_run_time : pandas.Timestamp
The maximum run time/issue time for any forecasts. Usually now.
Returns
-------
generator of (Forecast, Observation, index, data_start, issue_times)
"""
user_info = session.get_user_info()
observation_dict = {obs.observation_id: obs for obs in observations}
out = namedtuple(
'PersistenceParameters',
['forecast', 'observation', 'index', 'data_start',
'issue_times', 'adjusted_max_run_time'])
for fx in forecasts:
obs_ind_mint_maxt = _ref_persistence_check(
fx, observation_dict, user_info, session)
if obs_ind_mint_maxt is None:
continue
observation, index, obs_mint, obs_maxt = obs_ind_mint_maxt
# probably split this out to generate issues times for only gaps vs
# latest
if isinstance(fx, datamodel.ProbabilisticForecast):
fx_mint, fx_maxt = \
session.get_probabilistic_forecast_constant_value_time_range(
fx.constant_values[0].forecast_id)
else:
fx_mint, fx_maxt = session.get_forecast_time_range(fx.forecast_id)
# find the next issue time for the forecast based on the last value
# in the forecast series
if pd.isna(fx_maxt):
# if there is no forecast yet, go back a bit from the last
# observation. Don't use the start of observations, since it
# could really stress the workers if we have a few years of
# data before deciding to make a persistence fx
next_issue_time = utils.get_next_issue_time(
fx, obs_maxt - fx.run_length)
else:
next_issue_time = utils.find_next_issue_time_from_last_forecast(
fx, fx_maxt)
data_start, _ = utils.get_data_start_end(
observation, fx, next_issue_time, next_issue_time)
adjusted_max_run_time = utils._limit_persistence_run_time(
data_start,
max_run_time,
fx
)
issue_times = tuple(_issue_time_generator(
observation, fx, obs_mint, obs_maxt,
next_issue_time, adjusted_max_run_time))
if len(issue_times) == 0:
continue
yield out(fx, observation, index, data_start, issue_times,
adjusted_max_run_time)
def _issue_time_generator(observation, fx, obs_mint, obs_maxt, next_issue_time,
max_run_time):
# now find all the run times that can be made based on the
# last observation timestamp
while next_issue_time <= max_run_time:
data_start, data_end = utils.get_data_start_end(
observation, fx, next_issue_time, next_issue_time)
if data_end > obs_maxt:
break
if data_start > obs_mint:
yield next_issue_time
next_issue_time = utils.get_next_issue_time(
fx, next_issue_time + pd.Timedelta('1ns'))
def _preload_load_data(session, obs, data_start, data_end):
"""Fetch all the data required at once and slice as appropriate.
Much more efficient when generating many persistence forecasts from
the same observation.
"""
obs_data = session.get_observation_values(
obs.observation_id, data_start, data_end
).tz_convert(obs.site.timezone)['value']
def load_data(observation, data_start, data_end):
data = obs_data.loc[data_start:data_end]
return adjust_timeseries_for_interval_label(
data, observation.interval_label, data_start, data_end)
return load_data
[docs]def make_latest_persistence_forecasts(token, max_run_time, base_url=None):
"""Make all reference persistence forecasts that need to be made up to
*max_run_time*.
Parameters
----------
token : str
Access token for the API
max_run_time : pandas.Timestamp
Last possible run time of the forecast generation
base_url : str or None, default None
Alternate base_url of the API
"""
session = api.APISession(token, base_url=base_url)
forecasts = session.list_forecasts()
observations = session.list_observations()
params = generate_reference_persistence_forecast_parameters(
session, forecasts, observations, max_run_time)
for fx, obs, index, data_start, issue_times, fx_max_run_time in params:
_pers_loop(session, fx, obs, index, data_start, fx_max_run_time,
issue_times)
def _pers_loop(session, fx, obs, index, data_start, data_end, issue_times):
load_data = _preload_load_data(session, obs, data_start, data_end)
out = defaultdict(list)
logger.info('Making persistence forecast for %s:%s from %s to %s',
fx.name, fx.forecast_id, issue_times[0], issue_times[-1])
for issue_time in issue_times:
run_time = issue_time
try:
fx_out = run_persistence(
session, obs, fx, run_time, issue_time,
index=index, load_data=load_data)
except ValueError as e:
logger.error('Unable to generate persistence forecast: %s', e)
else:
if hasattr(fx, 'constant_values'):
cv_ids = [f.forecast_id for f in fx.constant_values]
for id_, fx_ser in zip(cv_ids, fx_out):
out[id_].append(fx_ser)
else:
out[fx.forecast_id].append(fx_out)
for id_, serlist in out.items():
if len(serlist) > 0:
ser = pd.concat(serlist)
for cser in generate_continuous_chunks(ser, fx.interval_length):
if type(fx) == datamodel.Forecast:
session.post_forecast_values(id_, cser)
else:
session.post_probabilistic_forecast_constant_value_values(
id_, cser)
[docs]def make_latest_probabilistic_persistence_forecasts(
token, max_run_time, base_url=None):
"""Make all reference probabilistic persistence forecasts that need to
be made up to *max_run_time*.
Parameters
----------
token : str
Access token for the API
max_run_time : pandas.Timestamp
Last possible run time of the forecast generation
base_url : str or None, default None
Alternate base_url of the API
"""
session = api.APISession(token, base_url=base_url)
forecasts = session.list_probabilistic_forecasts()
observations = session.list_observations()
params = generate_reference_persistence_forecast_parameters(
session, forecasts, observations, max_run_time)
for fx, obs, index, data_start, issue_times, fx_max_run_time in params:
_pers_loop(session, fx, obs, index, data_start, fx_max_run_time,
issue_times)
def generate_reference_persistence_forecast_gaps_parameters(
session, forecasts, observations, start, end):
"""Sort through all *forecasts* to find those with gaps in the data
that should be generated by the Arbiter from persisting
Observation values. The forecast must have
``'is_reference_persistence_forecast': true`` and an
observation_id in Forecast.extra_parameters (formatted as a JSON
string). A boolean value for "index_persistence" in
Forecast.extra_parameters controls whether the persistence
forecast should be made adjusting for clear-sky/AC power index or
not.
Parameters
----------
session : solarforecastarbiter.io.api.APISession
forecasts : list of datamodel.Forecasts
The forecasts that should be filtered to find references.
observations : list of datamodel.Observations
Observations that will are available to use to fetch values
and make persistence forecasts.
start : pandas.Timestamp
The start of the period to search for missing forecast values.
end : pandas.Timestamp
The end of the period to search for missing forecast values.
Returns
-------
generator of (Forecast, Observation, index, data_start, data_end, issue_times)
""" # NOQA: E501
user_info = session.get_user_info()
observation_dict = {obs.observation_id: obs for obs in observations}
out = namedtuple(
'PersistenceGapParameters',
['forecast', 'observation', 'index', 'data_start', 'data_end',
'issue_times'])
for fx in forecasts:
obs_ind_mint_maxt = _ref_persistence_check(
fx, observation_dict, user_info, session)
if obs_ind_mint_maxt is None:
continue
observation, index, obs_mint, obs_maxt = obs_ind_mint_maxt
times = set()
gaps = session.get_value_gaps(fx, start, end)
for gap in gaps:
times |= set(_issue_time_generator(
observation, fx, obs_mint, obs_maxt, gap[0],
gap[1] - pd.Timedelta('1ns')))
issue_times = tuple(sorted(times))
if len(issue_times) == 0:
continue
# get_data_start_end only looks for start/end of a single
# forecast run, so need to do for first and last issue times
# to get full range of data possibly needed
data_start, _ = utils.get_data_start_end(
observation, fx, issue_times[0], issue_times[0])
_, data_end = utils.get_data_start_end(
observation, fx, issue_times[-1], issue_times[-1])
yield out(fx, observation, index, data_start, data_end, issue_times)
def _fill_persistence_gaps(token, start, end, base_url, forecast_fnc):
session = api.APISession(token, base_url=base_url)
forecasts = getattr(session, forecast_fnc)()
observations = session.list_observations()
params = generate_reference_persistence_forecast_gaps_parameters(
session, forecasts, observations, start, end)
for fx, obs, index, data_start, data_end, issue_times in params:
_pers_loop(session, fx, obs, index, data_start, data_end,
issue_times)
[docs]def fill_persistence_forecasts_gaps(token, start, end, base_url=None):
"""Make all reference persistence forecasts that need to be made
between start and end.
Parameters
----------
token : str
Access token for the API
start : pandas.Timestamp
The start of the period to search for missing forecast values.
end : pandas.Timestamp
The end of the period to search for missing forecast values.
base_url : str or None, default None
Alternate base_url of the API
"""
_fill_persistence_gaps(token, start, end, base_url, 'list_forecasts')
[docs]def fill_probabilistic_persistence_forecasts_gaps(
token, start, end, base_url=None):
"""Make all reference probabilistic persistence forecasts that need to
be made between start and end.
Parameters
----------
token : str
Access token for the API
start : pandas.Timestamp
The start of the period to search for missing forecast values.
end : pandas.Timestamp
The end of the period to search for missing forecast values.
base_url : str or None, default None
Alternate base_url of the API
"""
_fill_persistence_gaps(token, start, end, base_url,
'list_probabilistic_forecasts')