"""
Provides preprocessing steps to be performed on the timeseries data.
"""
import logging
from typing import Dict, List, Optional, Tuple, Union
import numpy as np
import pandas as pd
from solarforecastarbiter import datamodel
from solarforecastarbiter.validation import quality_mapping
logger = logging.getLogger(__name__)
# Titles to refer to counts of preprocessing results
FILL_RESULT_TOTAL_STRING = "Missing {0}Forecast Values {1}"
DISCARD_DATA_STRING = "{0} Values Discarded by Alignment"
FORECAST_FILL_CONST_STRING = "Filled with {0}"
OUTAGE_DISCARD_STRING = "{0} Values Discarded Due To Outage"
FORECAST_FILL_STRING_MAP = {'drop': "Discarded",
'forward': "Forward Filled"}
[docs]def apply_fill(fx_data, forecast, forecast_fill_method, start, end):
"""
Apply fill procedure to the data from the start to end timestamps.
Parameters
----------
fx_data : pandas.Series or pandas.DataFrame
Forecast data with pandas.DatetimeIndex.
forecast : datamodel.Forecast
forecast_fill_method : {'drop', 'forward', float}
Indicates what process to use for handling missing forecasts.
* _'drop'_ drops all missing values for any row with a missing value.
* _'forward'_ fills missing values with the most recent real value.
If any leading missing values fill with zeros.
* _float_ fills any missing values with the given value.
start : pandas.Timestamp
end : pandas.Timestamp
Returns
-------
filled: pandas.Series or pandas.DataFrame
Forecast filled according to the specified logic
count : int
Number of values filled or dropped
"""
forecast_fill_method = str(forecast_fill_method)
# Create full datetime range at resolution
full_dt_index = pd.date_range(
start=start, end=end, freq=forecast.interval_length,
closed=datamodel.CLOSED_MAPPING[forecast.interval_label],
name=fx_data.index.name)
if forecast_fill_method == 'drop':
# Drop any missing values.
# If data is a DataFrame any row that is missing a value is
# dropped for all columns.
if isinstance(fx_data, pd.DataFrame):
count = fx_data.isna().any(axis=1).sum() * fx_data.shape[1]
else:
count = fx_data.isna().sum()
filled = fx_data.dropna(how='any').astype(float)
elif forecast_fill_method == 'forward':
# Reindex with expected datetime range.
# Fills missing values with the most recent real value.
# If any leading missing values fill with zeros.
filled = fx_data.reindex(index=full_dt_index)
count = filled.isna().sum()
filled.fillna(method='ffill', inplace=True)
filled.fillna(value=0, inplace=True)
else:
# Value should be numeric
try:
const_fill_value = pd.to_numeric(
forecast_fill_method).astype(float)
except ValueError:
raise ValueError(
f"Unsupported forecast fill missing data method: "
f"{forecast_fill_method}")
# Reindex with expected datetime range.
# Fills missing values with the given constant value.
filled = fx_data.reindex(index=full_dt_index)
count = filled.isna().sum()
filled.fillna(value=const_fill_value, inplace=True)
# If data provided as DataFrame count will be a series, so sum over that
# series to get the total count for all columns (Except for 'drop').
if isinstance(count, pd.Series):
count = count.sum()
return filled, count
def _resample_event_obs(
obs: Union[datamodel.Observation, datamodel.Aggregate],
fx: datamodel.EventForecast,
obs_data: pd.DataFrame,
quality_flags: Tuple[datamodel.QualityFlagFilter, ...]
) -> Tuple[pd.Series, List[datamodel.ValidationResult]]:
"""Resample the event observation.
Parameters
----------
obs : datamodel.Observation
The Observation being resampled.
fx : datamodel.EventForecast
The corresponding Forecast.
obs_data : pandas.DataFrame
Timeseries of values and quality flags of the
observation/aggregate data.
quality_flags : tuple of solarforecastarbiter.datamodel.QualityFlagFilter
Flags to process and apply as filters during resampling.
Returns
-------
obs_resampled : pandas.Series
Timeseries data of the Observation resampled to match the Forecast.
validation_results : list
Elements are
:py:class:`solarforecastarbiter.datamodel.ValidationResult`.
Raises
------
ValueError
If the Forecast and Observation do not have the same interval length.
"""
if fx.interval_length != obs.interval_length:
raise ValueError("Event observation and forecast time-series "
"must have matching interval length.")
# bools w/ has columns like NIGHTTIME, CLEARSKY EXCEEDED, but many of
# these are not valid for event obs! Arguably only USER FLAGGED and
# NIGHTTIME are valid for event obs.
obs_flags = quality_mapping.convert_mask_into_dataframe(
obs_data['quality_flag'])
obs_flags['ISNAN'] = obs_data['value'].isna()
# determine the points that should never contribute
# combine unique elements of tuple of tuples
discard_before_resample_flags = set(['ISNAN'])
for f in filter(lambda x: x.discard_before_resample, quality_flags):
discard_before_resample_flags |= set(f.quality_flags)
discard_before_resample = obs_flags[discard_before_resample_flags]
to_discard_before_resample = discard_before_resample.any(axis=1)
obs_resampled = obs_data.loc[~to_discard_before_resample, 'value']
# construct validation results
counts = discard_before_resample.astype(int).sum(axis=0).to_dict()
counts['TOTAL DISCARD BEFORE RESAMPLE'] = to_discard_before_resample.sum()
validation_results = _counts_to_validation_results(counts, True)
# resampling not allowed, so fill in 0 for discard after resample
validation_results += _counts_to_validation_results(
{'TOTAL DISCARD AFTER RESAMPLE': 0},
False
)
return obs_resampled, validation_results
def _validate_event_dtype(ser):
"""
Validate the event data dtype, converting to boolean values if possible.
Parameter
---------
ser : pandas.Series
The event time-series data (observation or forecast).
Returns
-------
ser : pandas.Series
The event time-series data as boolean values.
Raises
------
TypeError
If the event time-series data dtype cannot be converted to boolean.
"""
if ser.dtype == bool:
return ser
elif ser.dtype == int and np.all(np.isin(ser.unique(), [0, 1])):
return ser.astype(bool)
elif ser.dtype == float and np.all(np.isin(ser.unique(), [0.0, 1.0])):
return ser.astype(bool)
else:
raise TypeError("Invalid data type for event time-series; unable to "
"convert {} to boolean.".format(ser.dtype))
def _resample_obs(
obs: Union[datamodel.Observation, datamodel.Aggregate],
fx: datamodel.Forecast,
obs_data: pd.DataFrame,
quality_flags: Tuple[datamodel.QualityFlagFilter, ...],
outages: Tuple[datamodel.TimePeriod, ...] = ()
) -> Tuple[pd.Series, List[datamodel.ValidationResult]]:
"""Resample observations.
Parameters
----------
obs : datamodel.Observation
The Observation being resampled.
fx : datamodel.Forecast
The corresponding Forecast.
obs_data : pandas.DataFrame
Timeseries of values and quality flags of the
observation/aggregate data.
quality_flags : tuple of solarforecastarbiter.datamodel.QualityFlagFilter
Flags to process and apply as filters during resampling.
outages : tuple of solarforecastarbiter.datamode.TimePeriod
Determines the time periods to drop from obs_data before resampling.
Returns
-------
obs_resampled : pandas.Series
The observation time series resampled to match the forecast
interval_length. Time series will have missing labels where
values failed validation.
validation_results : list
Elements are
:py:class:`solarforecastarbiter.datamodel.ValidationResult`.
Raises
------
ValueError
If fx.interval_length < obs.interval_length
"""
if fx.interval_length < obs.interval_length:
# typically impossible to reach this because ForecastObservation init
# prevents it
raise ValueError(
'Cannot resample observation to match forecast because '
'fx.interval_length < obs.interval_length.')
if obs_data.empty:
return obs_data['value'], []
# fx label convention when resampling
closed_fx = datamodel.CLOSED_MAPPING[fx.interval_label]
# obs label convention when resampling
closed_obs = datamodel.CLOSED_MAPPING[obs.interval_label]
# drop any outage data before preprocessing
obs_data, outage_point_count = remove_outage_periods(
outages, obs_data, obs.interval_label
)
outage_result = datamodel.ValidationResult(
flag="OUTAGE",
count=int(outage_point_count),
before_resample=True
)
# bools w/ has columns like NIGHTTIME, CLEARSKY EXCEEDED
obs_flags = quality_mapping.convert_mask_into_dataframe(
obs_data['quality_flag'])
obs_flags['ISNAN'] = obs_data['value'].isna()
# determine the points that should be discarded before resampling.
to_discard_before_resample, val_results = _calc_discard_before_resample(
obs_flags, quality_flags)
val_results.append(outage_result)
# resample using all of the data except for what was flagged by the
# discard before resample process.
resampled_values = \
obs_data.loc[~to_discard_before_resample, 'value'].resample(
fx.interval_length, closed=closed_obs, label=closed_fx).mean()
# determine the intervals that have too many flagged points
to_discard_after_resample, after_resample_val_results = \
_calc_discard_after_resample(
obs_flags,
quality_flags,
to_discard_before_resample,
fx.interval_length,
obs.interval_length,
closed_obs,
closed_fx
)
# discard the intervals with too many flagged sub-interval points.
# resampled_values.index does not contain labels for intervals for
# which all points were discarded, so care is needed in the next
# indexing operation.
good_labels = to_discard_after_resample.index[~to_discard_after_resample]
obs_resampled = resampled_values.loc[
resampled_values.index.intersection(good_labels)]
# merge the val_results lists
val_results += after_resample_val_results
return obs_resampled, val_results
def _calc_discard_before_resample(
obs_flags: pd.DataFrame,
quality_flags: Tuple[datamodel.QualityFlagFilter, ...]
) -> Tuple[pd.Series, List[datamodel.ValidationResult]]:
"""Determine intervals to discard before resampling.
Parameters
----------
obs_flags : pd.DataFrame
Output of convert_mask_into_dataframe, plus ISNAN.
quality_flags : tuple of solarforecastarbiter.datamodel.QualityFlagFilter
Flags to process and apply as filters during resampling.
Returns
-------
to_discard_before_resample : pd.Series
Indicates if a point should be discarded (True) or kept (False)
before the resample.
validation_results : list
Elements are
:py:class:`solarforecastarbiter.datamodel.ValidationResult`.
"""
# determine the points that should never contribute
# combine unique elements of tuple of tuples
# list(dict.fromkeys()) is good enough for Raymond Hettinger
# https://stackoverflow.com/a/39835527/2802993
flags = ['ISNAN']
for f in filter(lambda x: x.discard_before_resample, quality_flags):
flags.extend(f.quality_flags)
discard_before_resample_flags = list(dict.fromkeys(flags))
discard_before_resample = obs_flags[discard_before_resample_flags]
to_discard_before_resample = discard_before_resample.any(axis=1)
# construct validation results
counts = discard_before_resample.astype(int).sum(axis=0).to_dict()
counts['TOTAL DISCARD BEFORE RESAMPLE'] = to_discard_before_resample.sum()
validation_results = _counts_to_validation_results(counts, True)
# TODO: add filters for time of day and value, OR with
# to_discard_before_resample, add discarded number to counts
return to_discard_before_resample, validation_results
def _calc_discard_after_resample(
obs_flags: pd.DataFrame,
quality_flags: Tuple[datamodel.QualityFlagFilter, ...],
to_discard_before_resample: pd.Series,
fx_interval_length: pd.Timedelta,
obs_interval_length: pd.Timedelta,
closed_obs: Optional[str],
closed_fx: Optional[str]
) -> Tuple[pd.Series, List[datamodel.ValidationResult]]:
"""Determine intervals to discard after resampling.
Parameters
----------
obs_flags : pd.DataFrame
Output of convert_mask_into_dataframe, plus ISNAN.
quality_flags : tuple of solarforecastarbiter.datamodel.QualityFlagFilter
Flags to process and apply as filters during resampling.
to_discard_before_resample : pd.Series
Boolean Series indicating if a point should be discarded before
resampling. Used when determining if too many points
fx_interval_length : pd.Timedelta
Forecast interval length to resample to.
obs_interval_length : pd.Timedelta
Observation interval length.
closed : {'left', 'right', None}
Interval label convention.
Returns
-------
to_discard_after_resample : pd.Series
Indicates if a point should be discarded (True) or kept (False)
before the resample.
validation_results : list
Elements are
:py:class:`solarforecastarbiter.datamodel.ValidationResult`.
"""
# number of points discarded before resampling in each interval
to_discard_before_resample_count = to_discard_before_resample.resample(
fx_interval_length, closed=closed_obs, label=closed_fx).sum()
# Series to track if a given resampled interval should be discarded
to_discard_after_resample = pd.Series(
False, index=to_discard_before_resample_count.index)
# will be used to determine threshold number of points
interval_ratio = fx_interval_length / obs_interval_length
# track number of flagged intervals in a dict
counts = {}
def apply_flag(quality_flag):
# should we put ISNAN in both the before and during resample exclude?
# use list to ensure column selection works
quality_flags_to_exclude = list(quality_flag.quality_flags) + ['ISNAN']
filter_name = ' OR '.join(quality_flags_to_exclude)
# Reduce DataFrame with relevant flags to bool series.
# could add a QualityFlagFilter.logic key to control
# OR (.any(axis=1)) vs. AND (.all(axis=1))
obs_flag_ser = obs_flags[quality_flags_to_exclude].any(axis=1)
# TODO: add time of day and value boolean tests here,
# then OR with obs_ser and adjust filter_name.
# Series describing number of points in each interval that are flagged
resampled_flags_count = obs_flag_ser.resample(
fx_interval_length, closed=closed_obs, label=closed_fx).sum()
threshold = (
quality_flag.resample_threshold_percentage / 100. * interval_ratio)
# If threshold is 0, any points being flagged counts, but
# don't just throw away all data.
if threshold == 0:
flagged = resampled_flags_count > threshold
else:
flagged = resampled_flags_count >= threshold
return filter_name, flagged
# apply to all quality_flag objects, including those with
# discard_before_resample == True. This ensures that we throw out
# resampled intervals that have too few points.
for quality_flag in quality_flags:
filter_name, flagged = apply_flag(quality_flag)
to_discard_after_resample |= flagged
counts[filter_name] = flagged.sum()
counts['TOTAL DISCARD AFTER RESAMPLE'] = to_discard_after_resample.sum()
validation_results = _counts_to_validation_results(counts, False)
return to_discard_after_resample, validation_results
def _counts_to_validation_results(
counts: Dict[str, int], before_resample: bool
) -> List[datamodel.ValidationResult]:
return [
datamodel.ValidationResult(
flag=k,
count=int(v),
before_resample=before_resample)
for k, v in counts.items()
]
def _search_validation_results(val_results, key):
for res in val_results:
if res.flag == key:
return res.count
[docs]def filter_resample(
fx_obs: Union[datamodel.ForecastObservation, datamodel.ForecastAggregate],
fx_data: Union[pd.Series, pd.DataFrame],
obs_data: pd.DataFrame,
quality_flags: Tuple[datamodel.QualityFlagFilter, ...],
outages: Tuple[datamodel.TimePeriod, ...] = ()
) -> Tuple[
Union[pd.Series, pd.DataFrame],
pd.Series,
List[datamodel.ValidationResult]
]:
"""Filter and resample the observation to the forecast interval length.
Parameters
----------
fx_obs : solarforecastarbiter.datamodel.ForecastObservation, solarforecastarbiter.datamodel.ForecastAggregate
Pair of forecast and observation.
fx_data : pandas.Series or pandas.DataFrame
Timeseries data of the forecast.
obs_data : pandas.DataFrame
Timeseries of values and quality flags of the
observation/aggregate data.
quality_flags : tuple of solarforecastarbiter.datamodel.QualityFlagFilter
Flags to process and apply as filters during resampling.
outages: tuple of :py:class:`solarforecastarbiter.datamodel.TimePeriod`
Time periods to drop from data prior to filtering or alignment.
Returns
-------
forecast_values : pandas.Series or pandas.DataFrame
Same as input data except may be coerced to a safer dtype.
observation_values : pandas.Series
Observation values filtered and resampled.
validation_results : list
Elements are
:py:class:`solarforecastarbiter.datamodel.ValidationResult`.
Notes
-----
The keep/exclude result of each element of the ``quality_flags``
tuple is combined with the OR operation.
For ``quality_flags`` tuple elements where
``QualityFlagFilter.discard_before_resample`` is ``False``, the
``QualityFlagFilter.quality_flags`` are considered during the
resampling operation. The flags of the raw observations are combined
with ``OR``, the total number of flagged points within a resample
period is computed, and intervals are discarded where
``QualityFlagFilter.resample_threshold_percentage`` is exceeded.
Therefore, the following examples can produce different results:
>>> # separate flags. OR computed after resampling.
>>> qflag_1 = QualityFlagFilter(('NIGHTTIME', ), discard_before_resample=False)
>>> qflag_2 = QualityFlagFilter(('CLEARSKY', ), discard_before_resample=False)
>>> # combined flags. OR computed during resampling.
>>> qflag_combined = QualityFlagFilter(('NIGHTTIME', 'CLEARSKY'),
discard_before_resample=False)
Raises
------
ValueError
If fx_obs.forecast.interval_length is less than
fx_obs.observation.interval_length
ValueError
If fx_obs.forecast is an EventForecast and
fx_obs.forecast.interval_length is not equal to
fx_obs.observation.interval_length
""" # noqa: E501
fx = fx_obs.forecast
obs = fx_obs.data_object
# Resample based on forecast type
if isinstance(fx, datamodel.EventForecast):
fx_data = _validate_event_dtype(fx_data)
obs_data['value'] = _validate_event_dtype(obs_data['value'])
obs_resampled, validation_results = _resample_event_obs(
obs, fx, obs_data, quality_flags)
else:
obs_resampled, validation_results = _resample_obs(
obs, fx, obs_data, quality_flags, outages)
return fx_data, obs_resampled, validation_results
[docs]def align(fx_obs, fx_data, obs_data, ref_data, tz):
"""Align the observation data to the forecast data.
Parameters
----------
fx_obs : solarforecastarbiter.datamodel.ForecastObservation, solarforecastarbiter.datamodel.ForecastAggregate
Pair of forecast and observation.
fx_data : pandas.Series or pandas.DataFrame
Timeseries data of the forecast.
obs_data : pandas.Series
Timeseries data of the observation/aggregate after processing
the quality flag column and resampling to match
fx_obs.forecast.interval_length.
ref_data : pandas.Series or pandas.DataFrame or None
Timeseries data of the reference forecast.
tz : str
Timezone to which processed data will be converted.
Returns
-------
forecast_values : pandas.Series or pandas.DataFrame
observation_values : pandas.Series
reference_forecast_values : pandas.Series or pandas.DataFrame or None
results : dict
Keys are strings and values are typically integers that
describe number of discarded and undefined data points.
Notes
-----
This function does not currently account for mismatches in the
`interval_label` of the `fx_obs.observation` and `fx_obs.forecast`.
If ``obs_data`` will be subsampled if it is higher frequency than
fx_data, but users should not rely on this behavior. Instead, use
:py:func:`~.filter_resample` to match the input observations to the
forecast data.
""" # noqa: E501
fx = fx_obs.forecast
obs = fx_obs.data_object
ref_fx = fx_obs.reference_forecast
# Align (forecast is unchanged)
# Remove non-corresponding observations and forecasts, and missing periods
obs_data = obs_data.dropna(how="any")
obs_aligned, fx_aligned = obs_data.align(
fx_data.dropna(how="any"), 'inner')
# another alignment step if reference forecast exists.
# here we drop points that don't exist in all 3 series.
# could set reference forecast to NaN where missing instead.
# could set to 0 instead.
# could build a DataFrame (implicit outer-join), then perform
# alignment using ['forecast', 'observation'] or
# ['forecast', 'observation', 'reference'] selections
if ref_data is not None:
obs_aligned, ref_fx_aligned = obs_aligned.align(
ref_data.dropna(how="any"), 'inner')
fx_aligned = fx_aligned.reindex(obs_aligned.index)
ref_values = ref_fx_aligned.tz_convert(tz)
else:
ref_values = None
# Determine series with timezone conversion
forecast_values = fx_aligned.tz_convert(tz)
observation_values = obs_aligned.tz_convert(tz)
# Return dict summarizing results
discarded_fx_intervals = len(fx_data.dropna(how="any")) - len(fx_aligned)
discarded_obs_intervals = len(obs_data) - len(observation_values)
obs_blurb = "Validated, Resampled " + obs.__blurb__
results = {
DISCARD_DATA_STRING.format(fx.__blurb__): discarded_fx_intervals,
DISCARD_DATA_STRING.format(obs_blurb): discarded_obs_intervals
}
if ref_data is not None:
k = DISCARD_DATA_STRING.format("Reference " + ref_fx.__blurb__)
results[k] = len(ref_data.dropna(how='any')) - len(ref_fx_aligned)
return forecast_values, observation_values, ref_values, results
[docs]def check_reference_forecast_consistency(fx_obs, ref_data):
"""Filter and resample the observation to the forecast interval length.
Parameters
----------
fx_obs : solarforecastarbiter.datamodel.ForecastObservation, solarforecastarbiter.datamodel.ForecastAggregate
Pair of forecast and observation.
ref_data : pandas.Series or pandas.DataFrame or None
Timeseries data of the reference forecast.
Raises
------
ValueError
If fx_obs.reference_forecast is not None but ref_data is None
or vice versa
ValueError
If fx_obs.reference_forecast.interval_label or interval_length
does not match fx_obs.forecast.interval_label or interval_length
""" # noqa: E501
fx = fx_obs.forecast
ref_fx = fx_obs.reference_forecast
if ref_fx is not None and ref_data is None:
raise ValueError(
'ref_data must be supplied if fx_obs.reference_forecast is not '
'None')
elif ref_fx is None and ref_data is not None:
raise ValueError(
'ref_data was supplied but fx_obs.reference_forecast is None')
if ref_fx is not None:
if fx.interval_length != ref_fx.interval_length:
raise ValueError(
f'forecast.interval_length "{fx.interval_length}" must match '
'reference_forecast.interval_length '
f'"{ref_fx.interval_length}"')
if fx.interval_label != ref_fx.interval_label:
raise ValueError(
f'forecast.interval_label "{fx.interval_label}" must match '
f'reference_forecast.interval_label "{ref_fx.interval_label}"')
if isinstance(fx, datamodel.ProbabilisticForecast):
if fx.axis != ref_fx.axis:
raise ValueError(
f'forecast.axis "{fx.axis}" must match '
f'reference_forecast.axis "{ref_fx.axis}"')
[docs]def process_forecast_observations(forecast_observations, filters,
forecast_fill_method, start, end,
data, timezone, costs=tuple(),
outages=tuple()):
"""
Convert ForecastObservations into ProcessedForecastObservations
applying any filters and resampling to align forecast and observation.
Parameters
----------
forecast_observations : list of solarforecastarbiter.datamodel.ForecastObservation, solarforecastarbiter.datamodel.ForecastAggregate
Pairs to process
filters : list of solarforecastarbiter.datamodel.BaseFilter
Filters to apply to each pair.
forecast_fill_method : str
Indicates what process to use for handling missing forecasts.
Currently supports : 'drop', 'forward', and bool or numeric value.
start : pandas.Timestamp
Start date and time for assessing forecast performance.
end : pandas.Timestamp
End date and time for assessing forecast performance.
data : dict
Dict with keys that are the Forecast/Observation/Aggregate object
and values that are the corresponding pandas.Series/DataFrame for
the object. Keys must also include all Forecast objects assigned
to the ``reference_forecast`` attributes of the
``forecast_observations``.
timezone : str
Timezone that data should be converted to
costs : tuple of :py:class:`solarforecastarbiter.datamodel.Cost`
Costs that are referenced by any pairs. Pairs and costs are matched
by the Cost name.
outages : tuple of :py:class:`solarforecastarbiter.datamodel.TimePeriod`
Tuple of time periods during which forecast submissions will be
excluded from analysis.
Returns
-------
tuple of ProcessedForecastObservation
Notes
-----
In the case where the `interval_label` of the `obs` and `fx` do not
match, this function currently returns a
`ProcessedForecastObservation` object with a `interval_label` the
same as the `fx`, regardless of whether the `interval_length` of the
`fx` and `obs` are the same or different.
The processing logic is as follows. For each forecast, observation
pair in ``forecast_observations``:
1. Fill missing forecast data points according to
``forecast_fill_method``.
2. Remove any forecast points associated with an outage.
3. Fill missing reference forecast data points according to
``forecast_fill_method``.
4. Remove any reference forecast or observation points associated
with an outage.
5. Remove observation data points with ``quality_flag`` in
filters. Remaining observation series is discontinuous.
6. Resample observations to match forecast intervals. If at least
10% of the observation intervals within a forecast interval are
valid (not missing or matching ``filters``), the interval is
value is computed from all subintervals. Otherwise the
resampled observation is NaN.
7. Drop NaN observation values.
8. Align observations to match forecast times. Observation times
for which there is not a matching forecast time are dropped on
a forecast by forecast basis.
9. Create
:py:class:`~solarforecastarbiter.datamodel.ProcessedForecastObservation`
with resampled, aligned data and metadata.
""" # NOQA: E501
if not all([isinstance(filter_, datamodel.QualityFlagFilter)
for filter_ in filters]):
logger.warning(
'Only filtering on Quality Flag is currently implemented. '
'Other filters will be discarded.')
filters = tuple(
f for f in filters if isinstance(f, datamodel.QualityFlagFilter))
# create string for tracking forecast fill results.
# this approach supports known methods or filling with contant values.
forecast_fill_str = FORECAST_FILL_STRING_MAP.get(
forecast_fill_method,
FORECAST_FILL_CONST_STRING.format(forecast_fill_method)
)
costs_dict = {c.name: c for c in costs}
# accumulate ProcessedForecastObservations in a dict.
# use a dict so we can keep track of existing names and avoid repeats.
processed_fxobs = {}
for fxobs in forecast_observations:
# accumulate PreprocessingResults from various stages in a list
preproc_results = []
# extract fx and obs data from data dict
try:
fx_data = data[fxobs.forecast]
except KeyError as e:
logger.error(
'Failed to find data for forecast %s: %s',
fxobs.forecast.name, e)
continue
try:
obs_data = data[fxobs.data_object]
except KeyError as e:
logger.error(
'Failed to find data for observation %s: %s',
fxobs.data_object.name, e)
continue
# Get periods where data should be excluded from analysis due
# to outages.
forecast_outage_periods = outage_periods(
fxobs.forecast,
start,
end,
outages
)
# Apply fill to forecast and reference forecast
fx_data, count = apply_fill(fx_data, fxobs.forecast,
forecast_fill_method, start, end)
preproc_results.append(datamodel.PreprocessingResult(
name=FILL_RESULT_TOTAL_STRING.format('', forecast_fill_str),
count=int(count)))
outages_exist = len(outages) > 0
if outages_exist:
# Remove any forecast data that would have been submitted
# during an outage
fx_data, fx_outage_points = remove_outage_periods(
forecast_outage_periods, fx_data, fxobs.forecast.interval_label
)
preproc_results.append(datamodel.PreprocessingResult(
name=OUTAGE_DISCARD_STRING.format('Forecast'),
count=int(fx_outage_points)))
ref_data = data.get(fxobs.reference_forecast, None)
try:
check_reference_forecast_consistency(fxobs, ref_data)
except ValueError as e:
logger.error('Incompatible reference forecast and data: %s', e)
continue
if fxobs.reference_forecast is not None:
ref_data, count = apply_fill(ref_data, fxobs.reference_forecast,
forecast_fill_method, start, end)
preproc_results.append(datamodel.PreprocessingResult(
name=FILL_RESULT_TOTAL_STRING.format(
"Reference ", forecast_fill_str),
count=int(count)))
if outages_exist:
ref_data, ref_outage_points = remove_outage_periods(
forecast_outage_periods, ref_data,
fxobs.reference_forecast.interval_label
)
preproc_results.append(datamodel.PreprocessingResult(
name=OUTAGE_DISCARD_STRING.format('Reference Forecast'),
count=int(ref_outage_points))
)
# filter and resample observation/aggregate data
try:
forecast_values, observation_values, val_results = filter_resample(
fxobs, fx_data, obs_data, filters, forecast_outage_periods)
except Exception as e:
# should figure out the specific exception types to catch
logger.error(
'Failed to filter and resample data for pair (%s, %s): %s',
fxobs.forecast.name, fxobs.data_object.name, e)
continue
if outages_exist:
obs_outage_points_dropped = _search_validation_results(
val_results, 'OUTAGE')
if obs_outage_points_dropped is None:
logger.warning(
'Observation Values Discarded Due To Outage Not Available '
'For Pair (%s, %s)', fxobs.forecast.name,
fxobs.data_object.name)
else:
preproc_results.append(datamodel.PreprocessingResult(
name=OUTAGE_DISCARD_STRING.format('Observation'),
count=int(obs_outage_points_dropped)))
# the total count ultimately shows up in both the validation
# results table and the preprocessing summary table.
total_discard_before_resample = _search_validation_results(
val_results, 'TOTAL DISCARD BEFORE RESAMPLE')
if total_discard_before_resample is None:
logger.warning(
'TOTAL DISCARD BEFORE RESAMPLE not available for pair '
'(%s, %s)', fxobs.forecast.name, fxobs.data_object.name)
else:
preproc_results.append(datamodel.PreprocessingResult(
name='Observation Values Discarded Before Resampling',
count=int(total_discard_before_resample)))
total_discard_after_resample = _search_validation_results(
val_results, 'TOTAL DISCARD AFTER RESAMPLE')
if total_discard_after_resample is None:
logger.warning(
'TOTAL DISCARD AFTER RESAMPLE not available for pair (%s, %s)',
fxobs.forecast.name, fxobs.data_object.name)
else:
preproc_results.append(datamodel.PreprocessingResult(
name='Resampled Observation Values Discarded',
count=int(total_discard_after_resample)))
# Align and create processed pair
try:
forecast_values, observation_values, ref_fx_values, results = \
align(fxobs, forecast_values, observation_values, ref_data,
timezone)
preproc_results.extend(
[datamodel.PreprocessingResult(name=k, count=int(v))
for k, v in results.items()])
except Exception as e:
logger.error(
'Failed to align data for pair (%s, %s): %s',
fxobs.forecast.name, fxobs.data_object.name, e)
continue
logger.info('Processed data successfully for pair (%s, %s)',
fxobs.forecast.name, fxobs.data_object.name)
name = _name_pfxobs(processed_fxobs.keys(), fxobs.forecast)
cost_name = fxobs.cost
cost = costs_dict.get(cost_name)
if cost_name is not None and cost is None:
logger.warning(
'Cannot calculate cost metrics for %s, cost parameters '
'not supplied for cost: %s', name, cost_name)
processed = datamodel.ProcessedForecastObservation(
name=name,
original=fxobs,
interval_value_type=fxobs.forecast.interval_value_type,
interval_length=fxobs.forecast.interval_length,
interval_label=fxobs.forecast.interval_label,
valid_point_count=len(forecast_values),
validation_results=val_results,
preprocessing_results=tuple(preproc_results),
forecast_values=forecast_values,
observation_values=observation_values,
reference_forecast_values=ref_fx_values,
normalization_factor=fxobs.normalization,
uncertainty=fxobs.uncertainty,
cost=cost
)
processed_fxobs[name] = processed
return tuple(processed_fxobs.values())
def _name_pfxobs(current_names, forecast, i=1):
"""Create unique, descriptive name for forecast.
Users should call this function with a ``Forecast`` object. This
will augment the ``Forecast.name`` attribute with probabilistic
descriptors (if needed). The function will then inspect the list of
names in ``current_names``. If the name is not unique, a recursive
call, using a string, will allow up to 99 variants of the name.
Parameters
----------
current_names : list of str
forecast : datamodel.Forecast or str
Returns
-------
str
"""
if isinstance(forecast, str):
# handle input when called recursively
forecast_name = forecast
else:
# handle initial augmentation of forecast.name
forecast_name = forecast.name
if isinstance(forecast, datamodel.ProbabilisticForecastConstantValue):
if forecast.axis == 'x':
forecast_name += (
f' Prob(x <= {forecast.constant_value} '
f'{forecast.constant_value_units})')
else:
forecast_name += f' Prob(f <= x) = {forecast.constant_value}%'
if i > 99:
logger.warning(
'Limit of unique names for identically named forecasts reached.'
' Aligned pairs may have duplicate names.')
return forecast_name
if forecast_name in current_names:
if i == 1:
new_name = f'{forecast_name}-{i:02d}'
else:
new_name = f'{forecast_name[:-3]}-{i:02d}'
return _name_pfxobs(current_names, new_name, i + 1)
else:
return forecast_name
def forecast_report_issue_times(
forecast: datamodel.Forecast,
start: pd.Timestamp,
end: pd.Timestamp
) -> pd.DatetimeIndex:
"""Returns all of the issue times that contribute data
to a report for this forecast. May include issue times that
correspond with data before and after the report to ensure
report coverage.
Parameters
----------
forecast: datamodel.Forecast
Forecast to find issue times for.
start: pd.Timestamp
Start of the report.
end: pd.Timestamp
End of the report.
Returns
-------
pandas.DatetimeIndex
Pandas DatetimeIndex representing all of the issue times.
"""
# Get total forecast horizon to get the time from issue to last value
total_forecast_horizon = forecast.lead_time_to_start + forecast.run_length
# Convert start to utc so we can align with forecast issue time. This
# is necessary because the report start/end are not necessarily aligned
# with forecast issue times or forecast start times.
utc_start = start.tz_convert('UTC')
# Get the last potential issue time that does not contribute
# data to the report. An issue time here would contain data up
# until report_start. We want the first issue time after this
# time.
lookback_start = utc_start - total_forecast_horizon
# Realign to a forecast issue time near the lookback
issue_search_start = lookback_start.replace(
hour=forecast.issue_time_of_day.hour,
minute=forecast.issue_time_of_day.minute
)
# Get the number of forecast runs between issue start and lookback
lookback_diff = issue_search_start - lookback_start
# Floor to round up if negative (issue_search_start before lookback)
# or round down if positive (issue_search_start after lookback)
runs_until_start = np.floor(lookback_diff / forecast.run_length)
# Get the duration of runs between issue_search_start and lookback_start.
run_durations = runs_until_start * forecast.run_length
# Find the issue time immediately after lookback_start by subtracting the
# run durations (which will be negative for issue_search start before
# lookback_start).
first_issue_time = issue_search_start - run_durations
if first_issue_time == lookback_start:
# if first issue time is lookback_start, adjust by one run to find
# the issue time that contributes the first values within the report
first_issue_time += forecast.run_length
# Get all possible issue times that contribute to the report from the
# first issue time, until the lead time of the forecast before the
# end of the report.
issue_times = pd.date_range(
first_issue_time,
end.tz_convert('UTC') - forecast.lead_time_to_start,
freq=forecast.run_length,
closed="left"
)
return issue_times
[docs]def outage_periods(
forecast: datamodel.Forecast,
start: pd.Timestamp,
end: pd.Timestamp,
outages: Tuple[datamodel.TimePeriod, ...]
) -> Tuple[datamodel.TimePeriod, ...]:
"""Converts report outage periods to forecast data periods to
drop from analysis. The returned periods do not account for
interval label.
Parameters
----------
forecast: solarforecastarbiter.datamodel.Forecast
start: pandas.Timestamp
end: pandas.Timestamp
outages: tuple of solarforecastarbiter.datamodel.TimePeriod
List of time ranges to check for forecast issue times.
Returns
-------
tuple of solarforecastarbiter.datamodel.TimePeriod
Times between these values should not be included in analysis.
"""
# First, determine a list of forecast issue times that include data that
# falls within the report
issue_times = forecast_report_issue_times(forecast, start, end)
outage_periods = []
# For each outage, if a forecast submission/issue_time falls within
# the outage, create start/end bounds for the forecast data to exclude.
for outage in outages:
outage_submissions = issue_times[
(issue_times >= outage.start) & (issue_times <= outage.end)
]
for issue_time in outage_submissions:
fx_start = issue_time + forecast.lead_time_to_start
fx_end = fx_start + forecast.run_length
outage_periods.append(datamodel.TimePeriod(
start=fx_start,
end=fx_end
))
return tuple(outage_periods)
[docs]def remove_outage_periods(
outages: Tuple[datamodel.TimePeriod, ...],
data: pd.DataFrame,
interval_label: str
) -> Tuple[pd.DataFrame, int]:
"""Returns a copy of a dataframe with all values within an outage
period dropped.
Parameters
----------
outages: tuple of :py:class:`solarforecastarbiter.datamodel.TimePeriod`
Tuple of dictionaries with start and end keys. Values should be
timestamps denoting the start and end of periods to remove.
data: pandas.DataFrame
The dataframe to drop outage data from.
interval_label: str
The interval label to drop.
Returns
-------
pandas.DataFrame, int
The data DataFrame with outage data dropped, and total
number of points removed.
""" # NOQA
if len(outages) == 0:
return data, 0
# Set to the boolean series of outage data on first iteration
# of loop below
full_outage_index = pd.Series(False, index=data.index)
for outage in outages:
if interval_label == "ending":
outage_index = (data.index > outage.start) & (
data.index <= outage.end)
else:
outage_index = (data.index >= outage.start) & (
data.index < outage.end)
full_outage_index = full_outage_index | outage_index
dropped_total = full_outage_index.sum()
return data[~full_outage_index], dropped_total