Source code for solarforecastarbiter.metrics.calculator

"""
Metric calculation functions.
"""
import calendar
from functools import partial
import logging

import numpy as np
import pandas as pd


from solarforecastarbiter import datamodel
from solarforecastarbiter.metrics import (
    deterministic, probabilistic, event, summary)


logger = logging.getLogger(__name__)

SEASONS = ("DJF", "MAM", "JJA", "SON")


[docs]def calculate_metrics(processed_pairs, categories, metrics): """ Loop through the forecast-observation pairs and calculate metrics. Normalization is determined by the attributes of the input objects. If ``processed_fx_obs.uncertainty`` is not ``None``, a deadband equal to the uncertainty will be used by the metrics that support it. Parameters ---------- processed_pairs : List of datamodel.ProcessedForecastObservation categories : list of str List of categories to compute metrics over. metrics : list of str List of metrics to be computed. Returns ------- list List of datamodel.MetricResult for each datamodel.ProcessedForecastObservation """ calc_metrics = [] for proc_fxobs in processed_pairs: # determine type of metrics to calculate if isinstance(proc_fxobs.original.forecast, (datamodel.ProbabilisticForecast, datamodel.ProbabilisticForecastConstantValue)): try: calc_metrics.append(calculate_probabilistic_metrics( proc_fxobs, categories, metrics)) except (RuntimeError, ValueError) as e: logger.error('Failed to calculate probabilistic metrics' ' for %s: %s', proc_fxobs.name, e) elif isinstance(proc_fxobs.original.forecast, datamodel.EventForecast): try: calc_metrics.append(calculate_event_metrics( proc_fxobs, categories, metrics )) except RuntimeError as e: logger.error('Failed to calculate event metrics for %s: %s', proc_fxobs.name, e) else: try: calc_metrics.append(calculate_deterministic_metrics( proc_fxobs, categories, metrics)) except RuntimeError as e: logger.error('Failed to calculate deterministic metrics' ' for %s: %s', proc_fxobs.name, e) return calc_metrics
def _apply_deterministic_metric_func(metric, fx, obs, **kwargs): """Helper function to deal with variable number of arguments possible for deterministic metric functions.""" metric_func = deterministic._MAP[metric][0] # the keyword arguments that will be passed to the functions _kw = {} # deadband could be supplied as None, so this is a little cleaner # than a try/except pattern deadband = kwargs.get('deadband', None) if metric in deterministic._DEADBAND_ALLOWED and deadband: # metrics assumes fractional deadband, datamodel assumes %, so / 100 deadband_frac = deadband / 100 _kw['error_fnc'] = partial( deterministic.error_deadband, deadband=deadband_frac) if metric == 'cost': _kw['cost_params'] = kwargs['cost_params'] # ref is an arg, but seems cleaner to handle as a kwarg here if metric in deterministic._REQ_REF_FX: _kw['ref'] = kwargs['ref_fx'] # same arg/kwarg comment as for ref if metric in deterministic._REQ_NORM: _kw['norm'] = kwargs['normalization'] return metric_func(obs, fx, **_kw) def _apply_probabilistic_metric_func(metric, fx, fx_prob, obs, **kwargs): """Helper function to deal with variable number of arguments possible for probabilistic metric functions.""" metric_func = probabilistic._MAP[metric][0] if metric in probabilistic._REQ_REF_FX: return metric_func(obs, fx, fx_prob, kwargs['ref_fx'], kwargs['ref_fx_prob']) else: return metric_func(obs, fx, fx_prob) def _apply_event_metric_func(metric, fx, obs, **kwargs): """Helper function to deal with variable number of arguments possible for event metric functions.""" metric_func = event._MAP[metric][0] return metric_func(obs, fx)
[docs]def calculate_deterministic_metrics(processed_fx_obs, categories, metrics): """ Calculate deterministic metrics for the processed data using the provided categories and metric types. Normalization is determined by the attributes of the input objects. If ``processed_fx_obs.uncertainty`` is not ``None``, a deadband equal to the uncertainty will be used by the metrics that support it. Parameters ---------- processed_fx_obs : datamodel.ProcessedForecastObservation categories : list of str List of categories to compute metrics over. metrics : list of str List of metrics to be computed. Returns ------- solarforecastarbiter.datamodel.MetricResult Contains all the computed metrics by categories. Raises ------ RuntimeError If there is no forecast, observation timeseries data or no metrics are specified. """ out = { 'name': processed_fx_obs.name, 'forecast_id': processed_fx_obs.original.forecast.forecast_id, } try: out['observation_id'] = processed_fx_obs.original.observation.observation_id # NOQA: E501 except AttributeError: out['aggregate_id'] = processed_fx_obs.original.aggregate.aggregate_id fx = processed_fx_obs.forecast_values obs = processed_fx_obs.observation_values # Check reference forecast is from processed pair, if needed ref_fx = processed_fx_obs.reference_forecast_values if ref_fx is None: ref_fx = np.nan # avoids issues with None and deadband masking # No data or metrics if fx.empty: raise RuntimeError("No forecast timeseries data.") elif obs.empty: raise RuntimeError("No observation timeseries data.") elif len(metrics) == 0: raise RuntimeError("No metrics specified.") # Dataframe for grouping df = pd.DataFrame({'forecast': fx, 'observation': obs, 'reference': ref_fx}) # get normalization factor normalization = processed_fx_obs.normalization_factor # get uncertainty. deadband = processed_fx_obs.uncertainty cost_params = processed_fx_obs.cost # Force `groupby` to be consistent with `interval_label`, i.e., if # `interval_label == ending`, then the last interval should be in the bin if processed_fx_obs.interval_label == "ending": df.index -= pd.Timedelta("1ns") metric_vals = [] # Calculate metrics for category in set(categories): index_category = _index_category(category, df) # Calculate each metric for metric_ in metrics: # Group by category for cat, group in df.groupby(index_category): # Calculate res = _apply_deterministic_metric_func( metric_, group.forecast, group.observation, ref_fx=group.reference, normalization=normalization, deadband=deadband, cost_params=cost_params) # Change category label of the group from numbers # to e.g. January or Monday if category == 'month': cat = calendar.month_abbr[cat] elif category == 'weekday': cat = calendar.day_abbr[cat] metric_vals.append(datamodel.MetricValue( category, metric_, str(cat), res)) out['values'] = _sort_metrics_vals( metric_vals, datamodel.ALLOWED_DETERMINISTIC_METRICS) calc_metrics = datamodel.MetricResult.from_dict(out) return calc_metrics
def _sort_metrics_vals(metrics_vals, mapping): """ Parameters ---------- metrics_vals : list Elements are datamodel.MetricValue mapping : dict Metrics mapping defined in datamodel. Returns ------- tuple Sorted elements. Sorting order is: * Category * Metric * Index * Value """ metric_ordering = list(mapping.keys()) category_ordering = list(datamodel.ALLOWED_CATEGORIES.keys()) def sorter(metricval): if metricval.category == 'month': index_order = calendar.month_abbr[0:13].index(metricval.index) elif metricval.category == 'weekday': index_order = calendar.day_abbr[0:7].index(metricval.index) elif metricval.category == 'season': index_order = SEASONS.index(metricval.index) else: index_order = metricval.index return ( category_ordering.index(metricval.category), metric_ordering.index(metricval.metric), index_order, metricval.value ) metrics_sorted = tuple(sorted(metrics_vals, key=sorter)) return metrics_sorted
[docs]def calculate_probabilistic_metrics(processed_fx_obs, categories, metrics): """ Calculate probabilistic metrics for the processed data using the provided categories and metric types. Will calculate distribution metrics (e.g., Continuous Ranked Probability Score (CRPS)) over all constant values if forecast is a datamodel.ProbabilisticForecast is provided, or single value metrics (e.g., Briar Score (BS)) if forecast is a datamodel.ProbabilisticForecastConstantValue. Parameters ---------- processed_fx_obs : datamodel.ProcessedForecastObservation Forecasts must be datamodel.ProbabilisticForecast (CDFs) or datamodel.ProbabilisticForecastConstantValue (single values) categories : list of str List of categories to compute metrics over. metrics : list of str List of metrics to be computed. Returns ------- datamodel.MetricsResult Raises ------ RuntimeError If there is no forecast, observation timeseries data or no metrics are specified. ValueError If original and reference forecast ``axis`` values do not match. """ out = {'name': processed_fx_obs.name} try: obs_dict = {'observation_id': processed_fx_obs.original.observation.observation_id} except AttributeError: obs_dict = {'aggregate_id': processed_fx_obs.original.aggregate.aggregate_id} out.update(obs_dict) # Determine forecast physical and probability values by axis fx_fx_prob = _transform_prob_forecast_value_and_prob(processed_fx_obs) obs = processed_fx_obs.observation_values if processed_fx_obs.reference_forecast_values is None: ref_fx_fx_prob = [(np.nan, np.nan)]*len(fx_fx_prob) else: if (processed_fx_obs.original.forecast.axis != processed_fx_obs.original.reference_forecast.axis): # could put this check in datamodel (and others from preprocessing) raise ValueError("Mismatched `axis` between " "forecast and reference forecast.") ref_fx_fx_prob = _transform_prob_forecast_value_and_prob( processed_fx_obs, attr='reference_forecast_values') out.update({'reference_forecast_id': processed_fx_obs.original.reference_forecast.forecast_id}) # NOQA: E501 # No data or metrics if (not fx_fx_prob or any([fx[0].empty or fx[1].empty for fx in fx_fx_prob])): raise RuntimeError("Missing probabilistic forecast timeseries data.") elif obs.empty: raise RuntimeError("No observation timeseries data.") elif len(metrics) == 0: raise RuntimeError("No metrics specified.") fx = processed_fx_obs.original.forecast out['forecast_id'] = fx.forecast_id # Determine proper metrics by instance type if isinstance(fx, datamodel.ProbabilisticForecastConstantValue): _metrics = list(set(metrics) - set(probabilistic._REQ_DIST)) elif isinstance(fx, datamodel.ProbabilisticForecast): _metrics = list(set(metrics) & set(probabilistic._REQ_DIST)) # Compute metrics and create MetricResult results = _calculate_probabilistic_metrics_from_df( _create_prob_dataframe(obs, fx_fx_prob, ref_fx_fx_prob), categories, _metrics, processed_fx_obs.original.forecast.interval_label) out['values'] = _sort_metrics_vals( results, datamodel.ALLOWED_PROBABILISTIC_METRICS) result = datamodel.MetricResult.from_dict(out) return result
def _calculate_probabilistic_metrics_from_df(data_df, categories, metrics, interval_label): """ Calculate probabilistic metrics for the processed data using the provided categories and metric types. Parameters ---------- data_df : pandas.DataFrame DataFrame that contains all timeseries values on the same index. categories : list of str List of categories to compute metrics over. metrics : list of str List of metrics to be computed. interval_label : str Returns ------- list of tuples of datamodel.MetricValue Contains all the computed metrics by categories. Each tuple is associated with a datamodel.ProbabilisticForecastConstantValue. """ metric_values = [] # Force `groupby` to be consistent with `interval_label`, i.e., if # `interval_label == ending`, then the last interval should be in the # bin if interval_label == "ending": data_df.index -= pd.Timedelta("1ns") # Calculate metrics for category in set(categories): index_category = _index_category(category, data_df) # Calculate each metric for metric_ in set(metrics): # Group by category for cat, group in data_df.groupby(index_category): try: ref_fx_vals = group.xs('reference_forecast', level=1, axis=1).to_numpy() # NOQA: E501 ref_fx_prob_vals = group.xs('reference_probability', level=1, axis=1).to_numpy() # NOQA E501 if ref_fx_vals.size == ref_fx_vals.shape[0]: ref_fx_vals = ref_fx_vals.T[0] ref_fx_prob_vals = ref_fx_prob_vals.T[0] except KeyError: ref_fx_vals = np.nan ref_fx_prob_vals = np.nan fx_vals = group.xs('forecast', level=1, axis=1).to_numpy() fx_prob_vals = group.xs('probability', level=1, axis=1).to_numpy() # NOQA: E501 if fx_vals.size == fx_vals.shape[0]: fx_vals = fx_vals.T[0] fx_prob_vals = fx_prob_vals.T[0] obs_vals = group[(None, 'observation')].to_numpy() # Calculate res = _apply_probabilistic_metric_func( metric_, fx_vals, fx_prob_vals, obs_vals, ref_fx=ref_fx_vals, ref_fx_prob=ref_fx_prob_vals) # Change category label of the group from numbers # to e.g. January or Monday if category == 'month': cat = calendar.month_abbr[cat] elif category == 'weekday': cat = calendar.day_abbr[cat] metric_values.append(datamodel.MetricValue( category, metric_, str(cat), res)) return metric_values def _create_prob_dataframe(obs, fx_fx_prob, ref_fx_fx_prob): """ Creates a DataFrame for grouping for the probabilistic forecasts. Parameters ---------- obs : pandas.Series fx_fx_prob : list of tuple of pandas.Series ref_fx_fx_prob : list of tuple of pandas.Series Returns ------- pandas.DataFrame DataFrame combining all data with column names being tuples of constant_value and one of 'observation', 'forecast', 'probability', 'reference_forecast' or 'reference_probability'. For 'observation' the constant_value is None. """ data_dict = {(None, 'observation'): obs} for orig, ref in zip(fx_fx_prob, ref_fx_fx_prob): fx, fx_prob = orig ref_fx, ref_fx_prob = ref data_dict[(fx.name, 'forecast')] = fx data_dict[(fx_prob.name, 'probability')] = fx_prob try: data_dict[(ref_fx.name, 'reference_forecast')] = ref_fx data_dict[(ref_fx_prob.name, 'reference_probability')] = ref_fx_prob # NOQA: E501 except AttributeError: pass return pd.DataFrame(data_dict) def _transform_prob_forecast_value_and_prob(proc_fx_obs, attr='forecast_values'): """ Helper function that returns ordered list of series of physical values and probabilities for a datamodel.ProcessedForecastObservation Parameters ---------- proc_fx_obs : datamodel.ProcessedForecastObservation attr : str The attribute of proc_fx_obs that contains the forecast value of interest. Typically forecast_values or reference_forecast_values. Returns ------- fx_fx_prob : list of tuples of (n,) array_like Forecast physical values and forecast probabilities. """ fx_fx_prob = [] data = getattr(proc_fx_obs, attr) # Need to convert to DataFrame of ProbabilisticForecastConstantValue # for consistency with ProbabilisticForecas that provides a DataFrame if isinstance(data, pd.Series): # set the name to a float because it will default to 'value' data = data.to_frame( name=proc_fx_obs.original.forecast.constant_value) for col in data.columns: if proc_fx_obs.original.forecast.axis == 'x': fx_prob = data[col] fx = pd.Series([float(col)]*fx_prob.size, index=fx_prob.index, name=col) else: # 'y' fx = data[col] fx_prob = pd.Series([float(col)]*fx.size, index=fx.index, name=col) fx_fx_prob.append((fx, fx_prob)) return fx_fx_prob
[docs]def calculate_event_metrics(proc_fx_obs, categories, metrics): """ Calculate event metrics for the processed data using the provided categories and metric types. Parameters ---------- proc_fx_obs : datamodel.ProcessedForecastObservation categories : list of str List of categories to compute metrics over. metrics : list of str List of metrics to be computed. Returns ------- solarforecastarbiter.datamodel.MetricResult Contains all the computed metrics by categories. Raises ------ RuntimeError If there is no forecast, observation timeseries data or no metrics are specified. """ out = { 'name': proc_fx_obs.name, 'forecast_id': proc_fx_obs.original.forecast.forecast_id, 'observation_id': proc_fx_obs.original.observation.observation_id } fx = proc_fx_obs.forecast_values obs = proc_fx_obs.observation_values # No data or metrics if fx.empty: raise RuntimeError("No Forecast timeseries data.") elif obs.empty: raise RuntimeError("No Observation timeseries data.") elif len(metrics) == 0: raise RuntimeError("No metrics specified.") # Dataframe for grouping df = pd.concat({'forecast': fx, 'observation': obs}, axis=1) metric_vals = [] # Calculate metrics for category in set(categories): index_category = _index_category(category, df) # Calculate each metric for metric_ in set(metrics): # Group by category for cat, group in df.groupby(index_category): # Calculate res = _apply_event_metric_func( metric_, group.forecast, group.observation ) # Change category label of the group from numbers # to e.g. January or Monday if category == 'month': cat = calendar.month_abbr[cat] elif category == 'weekday': cat = calendar.day_abbr[cat] metric_vals.append(datamodel.MetricValue( category, metric_, str(cat), res)) out['values'] = _sort_metrics_vals(metric_vals, datamodel.ALLOWED_EVENT_METRICS) calc_metrics = datamodel.MetricResult.from_dict(out) return calc_metrics
def _index_category(category, df): # total (special category) if category == 'total': index_category = lambda x: 0 # NOQA: E731 elif category == 'season': index_category = _season_from_months(df.index.month) else: index_category = getattr(df.index, category) return index_category def _season_from_months(months): """Compute season (DJF, MAM, JJA, SON) from month ordinal""" # Copied from xarray. see xarray license in LICENSES seasons = np.array(SEASONS) months = np.asarray(months) return seasons[(months // 3) % 4] def _is_deterministic_forecast(proc_fxobs): return type(proc_fxobs.original.forecast) is datamodel.Forecast def _is_event_forecast(proc_fxobs): return type(proc_fxobs.original.forecast) is datamodel.EventForecast def _assemble_summary_stats_df(processed_fx_obs): # calculate stats for all obs, deterministic fx, and event fx # do not calculate stats for prob fx dfd = {'observation': processed_fx_obs.observation_values} def add_forecasts_to_frame(): dfd['forecast'] = processed_fx_obs.forecast_values ref_fx = processed_fx_obs.reference_forecast_values if ref_fx is not None: dfd['reference_forecast'] = ref_fx if _is_event_forecast(processed_fx_obs): mapping = summary._EVENT_MAP add_forecasts_to_frame() elif _is_deterministic_forecast(processed_fx_obs): mapping = summary._DETERMINISTIC_MAP add_forecasts_to_frame() else: mapping = summary._DETERMINISTIC_MAP df = pd.DataFrame(dfd) return df, mapping
[docs]def calculate_summary_statistics(processed_fx_obs, categories): """ Calculate summary statistics for the processed data using the provided categories and all metrics defined in :py:mod:`.summary`. Parameters ---------- proc_fx_obs : datamodel.ProcessedForecastObservation categories : list of str List of categories to compute metrics over. Returns ------- solarforecastarbiter.datamodel.MetricResult Contains all the computed statistics by category. Raises ------ RuntimeError If there is no data to summarize """ out = {'name': processed_fx_obs.name, 'forecast_id': processed_fx_obs.original.forecast.forecast_id, 'is_summary': True} try: out['observation_id'] = \ processed_fx_obs.original.observation.observation_id except AttributeError: out['aggregate_id'] = \ processed_fx_obs.original.aggregate.aggregate_id df, mapping = _assemble_summary_stats_df(processed_fx_obs) if df.empty: raise RuntimeError('No data to calculate summary statistics for.') # Force `groupby` to be consistent with `interval_label`, i.e., if # `interval_label == ending`, then the last interval should be in the bin if processed_fx_obs.interval_label == "ending": df.index -= pd.Timedelta("1ns") metric_vals = [] # Calculate metrics for category in set(categories): index_category = _index_category(category, df) # Calculate each summary statistic for metric, (metric_func, _) in mapping.items(): # Group by category for cat, group in df.groupby(index_category): res = group.apply(metric_func) # Change category label of the group from numbers # to e.g. January or Monday if category == 'month': cat = calendar.month_abbr[cat] elif category == 'weekday': cat = calendar.day_abbr[cat] metric_vals.extend([ datamodel.MetricValue( category, f'{obj}_{metric}', str(cat), val) for obj, val in res.items() ]) out['values'] = _sort_metrics_vals( metric_vals, {f'{type_}_{k}': v for k, v in datamodel.ALLOWED_SUMMARY_STATISTICS.items() for type_ in ('forecast', 'observation', 'reference_forecast')}) calc_stats = datamodel.MetricResult.from_dict(out) return calc_stats
[docs]def calculate_all_summary_statistics(processed_pairs, categories): """ Loop through the forecast-observation pairs and calculate summary statistics. Parameters ---------- processed_pairs : list List of datamodel.ProcessedForecastObservation categories : list of str List of categories to compute metrics over. Returns ------- list List of datamodel.MetricResult for each datamodel.ProcessedForecastObservation """ stats = [] for proc_fxobs in processed_pairs: try: stats.append(calculate_summary_statistics(proc_fxobs, categories)) except RuntimeError as e: logger.error('Failed to calculate summary statistics' ' for %s: %s', proc_fxobs.name, e) return stats