Source code for solarforecastarbiter.reports.figures.plotly_figures

"""
Functions to make all of the metrics figures for Solar Forecast Arbiter reports
using Plotly.
"""
import base64
import calendar
from copy import deepcopy
import datetime as dt
from itertools import cycle
from pathlib import Path
import logging


import pandas as pd
from plotly import __version__ as plotly_version
import plotly.graph_objects as go
import numpy as np
from matplotlib import cm
from matplotlib.colors import Normalize

from solarforecastarbiter import datamodel
from solarforecastarbiter.metrics.event import _event2count
import solarforecastarbiter.plotting.utils as plot_utils


logger = logging.getLogger(__name__)
D3_PALETTE = ['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd', '#8c564b',
              '#e377c2', '#7f7f7f', '#bcbd22', '#17becf', '#aec7e8', '#ffbb78',
              '#98df8a', '#ff9896', '#c5b0d5', '#c49c94', '#f7b6d2', '#c7c7c7',
              '#dbdb8d', '#9edae5']
PALETTE = (D3_PALETTE[::2] + D3_PALETTE[1::2])


def gen_grays(num_colors):
    """Generate a grayscale color list of length num_colors.
    """
    rgb_delta = int(255/num_colors + 1)
    color_list = ["#{h}{h}{h}".format(h=hex(i*rgb_delta)[2:])
                  for i in range(num_colors)]
    return color_list


_num_obs_colors = 3
# drop white
OBS_PALETTE = gen_grays(_num_obs_colors)
OBS_PALETTE.reverse()
OBS_PALETTE_TD_RANGE = pd.timedelta_range(
            freq='10min', end='60min', periods=_num_obs_colors)

# list of matplotlib's perceptually uniform sequential color pallettes
PROBABILISTIC_PALETTES = ['viridis', 'plasma', 'inferno', 'magma', 'cividis']

PLOT_BGCOLOR = '#FFF'
PLOT_MARGINS = {'l': 50, 'r': 50, 'b': 50, 't': 100, 'pad': 4}
PLOT_LAYOUT_DEFAULTS = {
    'autosize': True,
    'height': 250,
    'margin': PLOT_MARGINS,
    'plot_bgcolor': PLOT_BGCOLOR,
    'title_font_size': 16,
    'font': {'size': 14}
}

SORT_UPDATEMENU_DROPDOWN = [{
        "buttons": [
            dict(
                method="restyle",
                label="Original Order",
                args=[{'visible': [True, False, False, False, False]}],
            ),
            dict(
                method="restyle",
                label="ᐁ Value",
                args=[{'visible': [False, True, False, False, False]}],
            ),
            dict(
                method="restyle",
                label="ᐃ Value",
                args=[{'visible': [False, False, True, False, False]}],
            ),
            dict(
                method="restyle",
                label="ᐁ Name",
                args=[{'visible': [False, False, False, True, False]}],
            ),
            dict(
                method="restyle",
                label="ᐃ Name",
                args=[{'visible': [False, False, False, False, True]}],
            )
        ],
        "direction": "down",
        "showactive": True,
        "xanchor": 'center',
        "x": 0.025,
        "yanchor": 'bottom',
        "pad": {'b': 5},
        "active": 0,
    }
]

# Used to adjust plot height when many x axis labels or long labels  are
# present. The length of the longest label of the plot will be multiplies by
# this value and added o the height of PLOT_LAYOUT_DEFAULTS to determine the
# new height.
X_LABEL_HEIGHT_FACTOR = 11

# If for some reason, the fail.pdf (just a pdf with some text that
# pdf generation failed) is unavailable, use an empty pdf
try:
    with open(Path(__file__).parent / 'fail.pdf', 'rb') as f:
        fail_pdf = base64.a85encode(f.read()).decode()
except Exception:
    fail_pdf = ',u@!!/MSk8$73+IY58P_+>=pV@VQ644<Q:NASu.&BHT/T0Ha7#+<Vd[7VQ[\\ATAnH7VlLTAOL*>De*Dd5!B<pFE1r$D$kNX1K6%.6<uqiV.X\\GOIXKoa;)c"!&3^A=pehYA92j5ARTE_ASu$s@VQ6-+>=pV@VQ5m+<WEu$>"*cDdmGg1E\\@oDdmGg4?Ns74pkk=A8bpl$8N_X+E(_($9UEn03!49AKWX&@:s-o,p4oL+<Vd[:gnBUDKI!U+>=p9$6UH6026"gBjj>HGT^350H`%l0J5:A+>>E,2\'?03+<Vd[6Z6jaASuU2+>b2p+ArOh+<W=-Ec6)>+?Van+<VdL+<W=:H#R=;01U&$F`7[1+<VdL+>6Y902ut#DKBc*Eb0,uGmYZ:+<VdL01d:.Eckq#+<VdL+<W=);]m_]AThctAPu#b$6UH65!B;r+<W=8ATMd4Ear[%+>Y,o+ArP14pkk=A8bpl$8EYW+E(_($9UEn03!49AKWX&@:s.m$6UH602$"iF!+[01*A7n;BT6P+<Vd[6Z7*bF<E:F5!B<bDIdZpC\'ljA0Hb:CC\'m\'c+>6Q3De+!#ATAnA@ps(lD]gbe0fCX<+=LoFFDu:^0/$gDBl\\-)Ea`p#Bk)3:DfTJ>.1.1?+>6*&ART[pDf.sOFCcRC6om(W1,(C>0K1^?0ebFC/MK+20JFp_5!B<bDIdZpC\'lmB0Hb:CC\'m\'c+>6]>E+L.F6Xb(FCi<qn+<Vd[:gn!JF!*1[0Ha7#5!B<bDIdZpC\'o3+AS)9\'+?0]^0JG170JG170H`822)@*4AfqF70JG170JG:B0d&/(0JG1\'DBK9?0JG170JG4>0d&/(0JG1\'DBK9?0JG170JG4<0H`&\'0JG1\'DBK9?0JG170JG182\'=S,0JG1\'DBK9?0JG170JG493?U"00JG1\'DBK9?0JG170JG=?2BX\\-0JG1\'DBK9?0JG170JG@B1*A8)0JG1\'DBK:.Ea`ZuATA,?4<Q:UBmO>53!pcN+>6W2Dfd*\\+>=p9$6UH601g%nD]gq\\0Ha7#5!B<pFCB33G]IA-$8sUq$7-ue:IYZ'  # NOQA


def _value_frame_dict(idx, pfxobs, column=None):
    if column is None:
        forecast_values = pfxobs.forecast_values
    else:
        if pfxobs.forecast_values is not None:
            forecast_values = pfxobs.forecast_values[column]
        else:
            forecast_values = None
    value_frame_dict = {
        'pair_index': idx,
        'observation_values': pfxobs.observation_values,
        'forecast_values': forecast_values,
    }
    return value_frame_dict


def _meta_row_dict(idx, pfxobs, **kwargs):
    forecast_object = kwargs.pop('forecast_object', None)
    if forecast_object is None:
        forecast_object = pfxobs.original.forecast

    # Check for a case where we're adding metadata for a constant value, but
    # the pair contains a whole ProbabilisticForecast
    if (isinstance(forecast_object,
                   datamodel.ProbabilisticForecastConstantValue)
        and
        isinstance(pfxobs.original.forecast,
                   datamodel.ProbabilisticForecast)):
        distribution = str(hash((
            pfxobs.original.forecast,
            pfxobs.original.forecast.interval_length,
            pfxobs.original.forecast.interval_value_type,
            pfxobs.original.forecast.interval_label)))
    else:
        distribution = None
    try:
        axis = forecast_object.axis
    except AttributeError:
        axis = None
    try:
        constant_value = forecast_object.constant_value
    except AttributeError:
        constant_value = None
    meta = {
        'pair_index': idx,
        'observation_name': _obs_name(pfxobs.original),
        'forecast_name': _fx_name(
            forecast_object, pfxobs.original.data_object),
        'interval_label': pfxobs.interval_label,
        'interval_length': pfxobs.interval_length,
        'forecast_type': pfxobs.original.__class__.__name__,
        'axis': axis,
        'constant_value': constant_value,
        'observation_hash': str(hash((
            pfxobs.original.data_object,
            pfxobs.interval_length,
            pfxobs.interval_value_type,
            pfxobs.interval_label))),
        'forecast_hash': str(hash((
            forecast_object,
            pfxobs.interval_length,
            pfxobs.interval_value_type,
            pfxobs.interval_label))),
        'observation_color': _obs_color(
            pfxobs.interval_length),
        'distribution': distribution
    }
    meta.update(kwargs)
    return meta


[docs]def construct_timeseries_dataframe(report): """Construct two standardized Dataframes for the timeseries and scatter plot functions. One with timeseries data for all observations, aggregates, and forecasts in the report, and the other with associated metadata sharing a common `pair_index` key. Parameters ---------- report: :py:class:`solarforecastarbiter.datamodel.Report` Returns ------- data : pandas.DataFrame Keys are an integer `pair_index` for pairing values with the metadata in the metadata_cds, and two pandas.Series, `observation_values` and `forecast_values`. metadata : pandas.DataFrame This dataframe has the following columns: - `pair_index`: Integer for pairing metadata with the values in the data dataframe. - `observation_name`: Observation name. - `forecast_name`: Forecast name. - `interval_label`: Interval label of the processed forecast and observation data. - `observation_hash`: Hash of the original observation object and the `datamodel.ProcessedForecastObservations` metadata. - `forecast_hash`: Hash of the original forecast object and the `datamodel.ProcessedForecastObservations` metadata. """ # NOQA value_frames = [] meta_rows = [] # enumerate won't work because of the conditional for loop, so # manually keep track of the index idx = 0 for pfxobs in report.raw_report.processed_forecasts_observations: if isinstance(pfxobs.original.forecast, datamodel.ProbabilisticForecast): for cvfx in pfxobs.original.forecast.constant_values: value_frame_dict = _value_frame_dict( idx, pfxobs, column=str(cvfx.constant_value)) if value_frame_dict['forecast_values'] is None: continue # specify fx type so we know the const value fx came from a # ProbabilisticForecast meta_row_dict = _meta_row_dict( idx, pfxobs, forecast_object=cvfx, forecast_type='ProbabilisticForecast') value_frames.append(pd.DataFrame(value_frame_dict)) meta_rows.append(meta_row_dict) idx += 1 else: value_frame_dict = _value_frame_dict(idx, pfxobs) if value_frame_dict['forecast_values'] is None: continue meta_row_dict = _meta_row_dict(idx, pfxobs) value_frames.append(pd.DataFrame(value_frame_dict)) meta_rows.append(meta_row_dict) idx += 1 if value_frames: data = pd.concat(value_frames) else: data = pd.DataFrame() metadata = pd.DataFrame(meta_rows) # convert data to report timezone data = data.tz_convert(report.raw_report.timezone) data = data.rename_axis('timestamp') return data, metadata
def _fill_timeseries(df, interval_length): """Returns a dataframe with a datetimeindex with regular frequency of interval_length minutes. Previously missing values will be filled with nans. Useful for creating gaps in plotted timeseries data. Parameters ---------- df: pandas.DataFrame Dataframe with timeseries data. interval_length: numpy.timedelta64 Interval length of the processed forecast observation. Returns ------- pandas.DataFrame DataFrame with filled datetime index data. """ if not df.index.empty: start = df.index[0] end = df.index[-1] freq_mins = int(interval_length / np.timedelta64(1, 'm')) filled_idx = pd.date_range(start, end, freq=f'{freq_mins}min') return df.reindex(filled_idx) else: return df def _obs_name(fx_obs): # TODO: add code to ensure obs names are unique # should be unique if plotting by hash and name includes # pfxobs.interval_length, pfxobs.interval_value_type, pfxobs.interval_label # name doens't need them if they are the same as the fx_obs parameters # since that would guarantee uniqueness name = fx_obs.data_object.name if fx_obs.forecast.name == fx_obs.data_object.name: if isinstance(fx_obs.data_object, datamodel.Observation): name += ' Observation' else: name += ' Aggregate' return name def _fx_name(forecast, data_object): # TODO: add code to ensure fx names are unique forecast_name = forecast.name if isinstance(forecast, datamodel.ProbabilisticForecastConstantValue): if forecast.axis == 'x': forecast_name += \ f' Prob(x <= {forecast.constant_value} {forecast.units})' else: forecast_name += f' Prob(f <= x) = {forecast.constant_value}%' if forecast_name == data_object.name: forecast_name += ' Forecast' return forecast_name def _obs_color(interval_length): idx = np.searchsorted(OBS_PALETTE_TD_RANGE, interval_length) obs_color = OBS_PALETTE[idx] return obs_color def _boolean_filter_indices_by_pair(value_cds, pair_index): return value_cds.data['pair_index'] == pair_index def _none_or_values0(metadata, key): value = metadata.get(key) if value is not None: value = value.values[0] return value def _extract_metadata_from_df(metadata_df, hash_, hash_key, keep_pairs=False): # dataframe that is subset of total metadata dataframe metadata = metadata_df[metadata_df[hash_key] == hash_] if keep_pairs: pair_index = metadata['pair_index'] else: pair_index = metadata['pair_index'].values[0] # unclear why we don't use metadata.iloc[0] for most meta = { 'pair_index': pair_index, 'observation_name': metadata['observation_name'].values[0], 'forecast_name': metadata['forecast_name'].values[0], 'interval_label': metadata['interval_label'].values[0], # np.timedelta64. unclear why we'd want this 'interval_length': metadata['interval_length'].values[0], 'observation_color': metadata['observation_color'].values[0], } meta['forecast_type'] = _none_or_values0(metadata, 'forecast_type') meta['axis'] = _none_or_values0(metadata, 'axis') meta['constant_value'] = _none_or_values0(metadata, 'constant_value') return meta def _legend_text(name, max_length=20): """Inserts <br> tags in a name to mimic word-wrap behavior for long names in the legend of timeseries plots. Parameters ---------- name: str The name/string to apply word-wrap effect to. max_length: int The maximum length of any line of text. Note that this will not break words across lines, but on the closest following space. Returns ------- str The name after it is split appropriately. """ if len(name) > max_length: temp = [] new = [] for part in name.split(' '): if len(' '.join(temp + [part])) > max_length: new.append(' '.join(temp)) temp = [part] else: temp.append(part) if temp: new.append(' '.join(temp)) return '<br>'.join(new) else: return name def formatted_interval(interval): """Converts an interval_length timedelta into a string for display Parameters ---------- minutes: np.timedelta64 Returns ------- str The interval as a string, displayed in the largest units possible without mixing units(up to days) """ if (interval % np.timedelta64(1, 'h') == 0): return f'{np.timedelta64(interval, "h").astype(int)}h' else: return f'{np.timedelta64(interval, "m").astype(int)}m' def _plot_obs_timeseries(fig, timeseries_value_df, timeseries_meta_df): # construct graph objects in random hash order. collect them in a list # along with the pair index. Then add traces in order of pair index. gos = [] # construct graph objects in random hash order for obs_hash in np.unique(timeseries_meta_df['observation_hash']): # if observation is used multiple times, takes the plotting metadata # from the first use of it but returns pair_index for all instances metadata = _extract_metadata_from_df( timeseries_meta_df, obs_hash, 'observation_hash', keep_pairs=True) # bool Series for every point in timeseries_value_df ts_this_obs_hash = timeseries_value_df['pair_index'].isin( metadata['pair_index'] ) # DataFrame for only points with the right observation hash ts_this_obs_hash_true_only = timeseries_value_df[ts_this_obs_hash] # which of those remaining rows are duplicated? duplicated = ts_this_obs_hash_true_only.index.duplicated() # rows are initially ordered by pair_index, then dt index. # now we only want the dt index and no longer care about the pair_index # _fill_timeseries will use index[0] and index[-1] to determine time # range to plot, so failing to sort can prevent data from being plotted ts_to_plot = ts_this_obs_hash_true_only[~duplicated].sort_index() plot_kwargs = plot_utils.line_or_step_plotly( metadata['interval_label']) data = _fill_timeseries( ts_to_plot, metadata['interval_length'], ) if data['observation_values'].isnull().all(): continue # Append the interval length and labelling to each observation # to ensure unique names interval_text = formatted_interval(metadata["interval_length"]) label_text = metadata['interval_label'] observation_legend_name = _legend_text( f'{metadata["observation_name"]} {interval_text} {label_text}' ) go_ = go.Scattergl( y=data['observation_values'], x=data.index, name=observation_legend_name, legendgroup=observation_legend_name, showlegend=True, marker=dict(color=metadata['observation_color']), connectgaps=False, **plot_kwargs) # collect in list. sorting can safely be done on the first index. first_pair_index = metadata['pair_index'].values[0] gos.append((first_pair_index, go_)) # Add traces in order of pair index for idx, go_ in sorted(gos, key=lambda x: x[0]): fig.add_trace(go_) def _plot_fx_timeseries(fig, timeseries_value_df, timeseries_meta_df, axis): palette = cycle(PALETTE) # pull metadata to plot in random hash order. collect them in a list # along with the pair index. Then add traces in order of pair index. metadatas = [] # pull metadata to plot in random hash order for fx_hash in np.unique(timeseries_meta_df['forecast_hash']): metadata = _extract_metadata_from_df( timeseries_meta_df, fx_hash, 'forecast_hash') if metadata['axis'] not in axis: # we're looking at a different kind of forecast than what we wanted # to plot continue # collect in list metadatas.append((metadata['pair_index'], metadata)) for idx, metadata in sorted(metadatas, key=lambda x: x[0]): pair_idcs = timeseries_value_df['pair_index'] == metadata['pair_index'] # probably treat axis == None and axis == y separately in the future. # currently no need for a separate axis == x treatment either, so # removed an if statement on the axis. plot_kwargs = plot_utils.line_or_step_plotly( metadata['interval_label']) data = _fill_timeseries( timeseries_value_df[pair_idcs], metadata['interval_length'], ) plot_kwargs['marker'] = dict(color=next(palette)) go_ = go.Scattergl( y=data['forecast_values'], x=data.index, name=_legend_text(metadata['forecast_name']), legendgroup=metadata['forecast_name'], showlegend=True, connectgaps=False, **plot_kwargs) fig.add_trace(go_) def _plot_fx_distribution_timeseries( fig, timeseries_value_df, timeseries_meta_df, axis): palette = cycle(PROBABILISTIC_PALETTES) gos = [] for dist_hash in np.unique(timeseries_meta_df['distribution']): # indices to constant values in the metadata df cv_indices = timeseries_meta_df['distribution'] == dist_hash # sort constant values cv_metadata = timeseries_meta_df[cv_indices] cv_metadata = cv_metadata.sort_values('constant_value') cv_metadata = cv_metadata.reset_index() # Get a colormap for mapping fill colors color_map = cm.get_cmap(next(palette)) color_scaler = cm.ScalarMappable( Normalize(vmin=0, vmax=1), color_map, ) symmetric_percentiles = plot_utils.percentiles_are_symmetric( cv_metadata['constant_value'].tolist()) # Plot confidence intervals for idx, cv in cv_metadata.iterrows(): pair_idcs = timeseries_value_df['pair_index'] == cv['pair_index'] data = _fill_timeseries( timeseries_value_df[pair_idcs], cv['interval_length']) # Fill missing data with 0 to avoid plotly bugs encountered with # go.Scatter fill and missing data. data = data.fillna(0) if idx == 0: # The first value will act as the lower bound for other values # to fill down to. fill = None showlegend = True else: fill = 'tonexty' showlegend = False # Split name of the distribution from the current constant value constant_label_index = cv['forecast_name'].find('Prob(') - 1 fx_name = cv['forecast_name'][:constant_label_index] cv_label = cv['forecast_name'][constant_label_index:] if symmetric_percentiles: # Since plotly always fills below the line, for constants below # 50%, use the previous value to mimic fill upward behavior. # E.g. fill downward from 5% to 0% with the 100% interval. if cv['constant_value'] <= 50 and idx != 0: fill_value = cv_metadata.iloc[idx - 1]['constant_value'] else: fill_value = cv['constant_value'] # When constant values are symmetric, create intervals # centered around the 50th percentile fill_value = 2 * abs(fill_value - 50) else: # convert to complement percentile to invert shading, such that # bright colors appear at 0 and dark at 100 when plotted. fill_value = 100 - cv['constant_value'] fill_color = plot_utils.distribution_fill_color( color_scaler, fill_value) plot_kwargs = plot_utils.line_or_step_plotly(cv['interval_label']) go_ = go.Scatter( x=data.index, y=data['forecast_values'], name=_legend_text(fx_name), hovertemplate=( f'<b>{ cv_label }<br>' '<b>Value<b>: %{y}<br>' '<b>Time<b>: %{x}<br>'), connectgaps=False, mode='lines', fill=fill, showlegend=showlegend, legendgroup=cv['distribution'], fillcolor=fill_color, line=dict( color=fill_color, ), **plot_kwargs, ) # Add traces in order of pair index gos.append((cv['pair_index'], go_)) for idx, go_ in sorted(gos, key=lambda x: x[0]): fig.add_trace(go_)
[docs]def timeseries(timeseries_value_df, timeseries_meta_df, start, end, units, axis, timezone='UTC'): """ Timeseries plot of one or more forecasts and observations. Parameters ---------- timeseries_value_df: pandas.DataFrame DataFrame of timeseries data. See :py:func:`solarforecastarbiter.reports.figures.construct_timeseries_dataframe` for format. timeseries_meta_df: pandas.DataFrame DataFrame of metadata for each Observation Forecast pair. See :py:func:`solarforecastarbiter.reports.figures.construct_timeseries_dataframe` for format. start : pandas.Timestamp Report start time end : pandas.Timestamp Report end time axis : {(None,), ('x',), ('y',), (None, 'y')} Specifies the kinds of forecast to plot. None is appropriate for deterministic forecasts, 'x' for probabilistic forecasts with axis = 'x', and 'y' for probabilistic forecasts with axis = 'y'. Observations, deterministic forecasts, and probabilistic forecasts may all be plotted together if axis = (None, 'y'). Observations will not be plotted if axis = ('x',). timezone : str Timezone consistent with the data in the timeseries_metadata_df. Returns ------- plotly.Figure """ # NOQA: E501 # might want to make fig=None a kwarg and modify this line to # fig = fig if fig is not None else go.Figure() fig = go.Figure() if 'x' in axis: ylabel = 'Probability (%)' else: ylabel = f'Data ({units})' # adds observation traces to fig _plot_obs_timeseries(fig, timeseries_value_df, timeseries_meta_df) # add forecast traces that have correct axis to fig # get indices of probabilistic forecasts with axis y to create special # shaded distribution plots y_distribution_indices = ( timeseries_meta_df['distribution'].notna() & (timeseries_meta_df['axis'] == 'y') ) non_y_distribution_meta_df = timeseries_meta_df[~y_distribution_indices] distribution_meta_df = timeseries_meta_df[y_distribution_indices] _plot_fx_timeseries( fig, timeseries_value_df, non_y_distribution_meta_df, axis) _plot_fx_distribution_timeseries( fig, timeseries_value_df, distribution_meta_df, axis) fig.update_xaxes(title_text=f'Time ({timezone})', showgrid=True, gridwidth=1, gridcolor='#CCC', showline=True, linewidth=1, linecolor='black', ticks='outside') fig.update_yaxes(title_text=ylabel, showgrid=True, gridwidth=1, gridcolor='#CCC', showline=True, linewidth=1, linecolor='black', ticks='outside', fixedrange=True) fig.update_layout( legend=dict(font=dict(size=10)), ) return fig
def _get_scatter_limits(df): extremes = [np.nan] for kind in ('forecast_values', 'observation_values'): arr = np.asarray(df[kind]).astype(float) if len(arr) != 0: extremes.append(np.nanmin(arr)) extremes.append(np.nanmax(arr)) min_ = np.nanmin(extremes) if np.isnan(min_): min_ = -999 max_ = np.nanmax(extremes) if np.isnan(max_): max_ = 999 return min_, max_
[docs]def scatter(timeseries_value_df, timeseries_meta_df, units): """ Adds Scatter plot traces of one or more forecasts and observations to the figure. Parameters ---------- timeseries_value_df: pandas.DataFrame DataFrame of timeseries data. See :py:func:`solarforecastarbiter.reports.figures.construct_timeseries_dataframe` for format. timeseries_meta_df: pandas.DataFrame DataFrame of metadata for each Observation Forecast pair. See :py:func:`solarforecastarbiter.reports.figures.construct_timeseries_dataframe` for format. Returns ------- plotly.Figure """ # NOQA scatter_range = _get_scatter_limits(timeseries_value_df) palette = cycle(PALETTE) fig = go.Figure() # pull metadata to plot in random hash order. collect them in a list # along with the pair index. Then add traces in order of pair index. metadatas = [] # accumulate labels and plot objects for manual legend for fxhash in np.unique(timeseries_meta_df['forecast_hash']): metadata = _extract_metadata_from_df( timeseries_meta_df, fxhash, 'forecast_hash') if metadata['axis'] == 'x': # don't know how to represent probability forecasts on a # physical value vs. physical value plot. continue # collect in list metadatas.append((metadata['pair_index'], metadata)) # plot in order of pair index for idx, metadata in sorted(metadatas, key=lambda x: x[0]): pair_idcs = timeseries_value_df['pair_index'] == metadata['pair_index'] data = timeseries_value_df[pair_idcs] if data['observation_values'].isnull().all(): # observation values were not included, skip pair continue go_ = go.Scattergl( x=data['observation_values'], y=data['forecast_values'], name=_legend_text(metadata['forecast_name']), showlegend=True, legendgroup=metadata['forecast_name'], marker=dict(color=next(palette), opacity=0.25), mode='markers') fig.add_trace(go_) label = f'({units})' x_label = 'Observed ' + label y_label = 'Forecast ' + label nticks = 10 fig.update_xaxes(title_text=x_label, showgrid=True, gridwidth=1, gridcolor='#CCC', showline=True, linewidth=1, linecolor='black', ticks='outside', range=scatter_range, nticks=nticks) fig.update_yaxes(title_text=y_label, showgrid=True, gridwidth=1, gridcolor='#CCC', showline=True, linewidth=1, linecolor='black', ticks='outside', range=scatter_range, nticks=nticks) return fig
def event_histogram(timeseries_value_df, timeseries_meta_df): """ Adds histogram plot traces of the event outcomes of one or more event forecasts and observations to the figure. Parameters ---------- timeseries_value_df: pandas.DataFrame DataFrame of timeseries data. See :py:func:`solarforecastarbiter.reports.figures.construct_timeseries_dataframe` for format. timeseries_meta_df: pandas.DataFrame DataFrame of metadata for each Observation Forecast pair. See :py:func:`solarforecastarbiter.reports.figures.construct_timeseries_dataframe` for format. Returns ------- plotly.Figure """ # NOQA fig = go.Figure() palette = cycle(PALETTE) # accumulate labels and plot objects for manual legend for fxhash in np.unique(timeseries_meta_df['forecast_hash']): metadata = _extract_metadata_from_df( timeseries_meta_df, fxhash, 'forecast_hash') pair_idcs = timeseries_value_df['pair_index'] == metadata['pair_index'] data = timeseries_value_df[pair_idcs] if data['observation_values'].isnull().all(): continue tp, fp, tn, fn = _event2count(data["observation_values"], data["forecast_values"]) x = ["True Pos.", "False Pos.", "True Neg.", "False Neg."] y = [tp, fp, tn, fn] fig.add_trace(go.Bar( x=x, y=y, name=_legend_text(metadata['forecast_name']), showlegend=True, legendgroup=metadata['forecast_name'], marker_color=next(palette), )) # update axes x_label = "Outcome" y_label = "Count" fig.update_xaxes(title_text=x_label, showgrid=True, gridwidth=0, gridcolor='#CCC', showline=True, linewidth=1, linecolor='black', ticks='outside') fig.update_yaxes(title_text=y_label, showgrid=True, gridwidth=1, gridcolor='#CCC', showline=True, linewidth=1, linecolor='black', ticks='outside') return fig def configure_axes(fig, x_axis_kwargs, y_axis_kwargs): """Applies plotly axes configuration to display zero line and grid, and the configuration passed in x_axis_kwargs and y_axis kwargs. Currently configured to supply base layout for metric plots. Parameters ---------- fig: plotly.graph_objects.Figure x_axis_kwargs: dict Dictionary to expand as arguments to fig.update_xaxes. y_axis_kwargs: dict Dictionary to expand as arguments to fig.update_x_axes. """ fig.update_xaxes(showline=True, linewidth=1, linecolor='black', ticks='outside') if x_axis_kwargs: fig.update_xaxes(**x_axis_kwargs) fig.update_yaxes(showgrid=True, gridwidth=1, gridcolor='#CCC') fig.update_yaxes(showline=True, linewidth=1, linecolor='black', ticks='outside') if y_axis_kwargs: fig.update_yaxes(**y_axis_kwargs)
[docs]def construct_metrics_dataframe(metrics, rename=None): """ Possibly bad assumptions: * metrics contains keys: name, Total, etc. Parameters ---------- metrics : list of datamodel.MetricResults Each metric dict is for a different forecast. Forecast name is specified by the name key. rename : function or None Function of one argument that is applied to each forecast name. Returns ------- df: pandas.DataFrame Dataframe of computed metrics for the report. """ if rename: f = rename else: def f(x): return x # NOQA data = [] for metric_result in metrics: for mvalue in metric_result.values: new = { 'name': metric_result.name, 'abbrev': f(metric_result.name), 'category': mvalue.category, 'metric': mvalue.metric, 'value': mvalue.value } if new['category'] == 'date': new['index'] = dt.datetime.strptime( mvalue.index, '%Y-%m-%d') else: new['index'] = mvalue.index data.append(new) df = pd.DataFrame(data, columns=[ 'name', 'abbrev', 'category', 'metric', 'value', 'index' ]) return df
def abbreviate(x, limit=3): # might need to add logic to ensure uniqueness # and/or enforce max length using textwrap.shorten components = x.split(' ') out_components = [] for c in components: if len(c) <= limit: out = c elif c.upper() == c: # probably an acronym out = c elif c == 'Prob(f' or c == 'Prob(x': # special case for probabilistic forecast labelling out = c else: out = f'{c[0:limit]}.' out_components.append(out) return ' '.join(out_components)
[docs]def bar(df, metric): """ Create a bar graph comparing a single metric across forecasts. Parameters ---------- df: pandas Dataframe Metric dataframe by :py:func:`solarforecastarbiter.reports.figures.construct_metrics_dataframe` metric: str The metric to plot. This value should be found in df['metric']. Returns ------- plotly.Figure A bar chart representing the total category of the metric for each forecast. """ # NOQA data = df[(df['category'] == 'total') & (df['metric'] == metric)] y_range = None x_axis_kwargs = {} x_values = [] # Ensure data aligns with the x labels by pre-sorting. x_labels are sorted # by the groupby process below. data = data.sort_values('abbrev') # to avoid stacking, add BOM characters to fx with # same abbreviated name. GH463 for val, ser in data[['abbrev']].groupby('abbrev'): x_values += [val + ('\ufeff' * i) for i in range(len(ser))] x_values = pd.Series(x_values, name='abbrev') palette = cycle(PALETTE) palette = [next(palette) for _ in x_values] data = data.assign(palette=palette) metric_name = datamodel.ALLOWED_METRICS[metric] # remove height limit when long abbreviations are used or there are more # than 5 pairs to problems with labels being cut off. plot_layout_args = deepcopy(PLOT_LAYOUT_DEFAULTS) # ok to cut off BOM characters at the end of the labels longest_x_label = x_values.map(lambda x: len(x.rstrip('\ufeff'))).max() if longest_x_label > 15 or x_values.size > 6: # Set explicit height and set automargin on x axis to allow for dynamic # sizing to accomodate long x axis labels. Height is set based on # length of longest x axis label, due to a failure that can occur when # plotly determines there is not enough space for automargins to work. plot_height = plot_layout_args['height'] + ( longest_x_label * X_LABEL_HEIGHT_FACTOR) plot_layout_args['height'] = plot_height x_axis_kwargs = {'automargin': True} if longest_x_label > 60: x_axis_kwargs.update({'tickangle': 90}) elif longest_x_label > 30: x_axis_kwargs.update({'tickangle': 45}) # Create dataframes for each sort (name, value) data_val_asc = data.sort_values(by=['value', 'name'], ascending=True) data_val_desc = data.sort_values(by=['value', 'name'], ascending=False) data_name_asc = data.sort_values(by=['name'], ascending=True) data_name_desc = data.sort_values(by=['name'], ascending=False) fig = go.Figure() fig.add_trace(go.Bar(x=x_values, y=data['value'], text=data['name'], visible=True, marker=go.bar.Marker(color=palette), hovertemplate='(%{text}, %{y})<extra></extra>')) fig.add_trace(go.Bar(x=data_val_asc['name'], y=data_val_asc['value'], text=data_val_asc['abbrev'], visible=False, marker=go.bar.Marker(color=data_val_asc['palette']), hovertemplate='(%{text}, %{y})<extra></extra>')) fig.add_trace(go.Bar(x=data_val_desc['name'], y=data_val_desc['value'], text=data_val_desc['abbrev'], visible=False, marker=go.bar.Marker(color=data_val_desc['palette']), hovertemplate='(%{text}, %{y})<extra></extra>')) fig.add_trace(go.Bar(x=data_name_asc['name'], y=data_name_asc['value'], text=data_name_asc['abbrev'], visible=False, marker=go.bar.Marker(color=data_name_asc['palette']), hovertemplate='(%{text}, %{y})<extra></extra>')) fig.add_trace(go.Bar(x=data_name_desc['name'], y=data_name_desc['value'], text=data_name_desc['abbrev'], visible=False, marker=go.bar.Marker(color=data_name_desc['palette']), hovertemplate='(%{text}, %{y})<extra></extra>')) updatemenus = SORT_UPDATEMENU_DROPDOWN if len(x_values) <= 1: updatemenus = None fig.update_layout( title=f'<b>{metric_name}</b>', xaxis_title=metric_name, updatemenus=updatemenus, **plot_layout_args) configure_axes(fig, x_axis_kwargs, y_range) return fig
def calc_y_start_end(y_min, y_max, pad_factor=1.03): """ Determine y axis start, end. Parameters ---------- y_min : float y_max : float pad_factor : float Number by which to multiply the start, end. Returns ------- start, end : float, float """ # limits cannot be nans or infs y_min = np.nan_to_num(y_min) y_max = np.nan_to_num(y_max) if y_max < 0: # all negative, so set range from y_min to 0 start = y_min end = 0 elif y_min > 0: # all positive, so set range from 0 to y_max start = 0 end = y_max else: start = y_min end = y_max # if y_max or min was +/- inf then padding will result in overflow # that can be ignored with np.errstate(over='ignore'): start, end = pad_factor * start, pad_factor * end return start, end
[docs]def bar_subdivisions(df, category, metric): """ Create bar graphs comparing a single metric across subdivisions of time for multiple forecasts. e.g.:: Fx 1 MAE | |_________________ Fx 2 MAE | |_________________ Year, Month of the year, etc. Parameters ---------- df: pandas.DataFrame Fields must be kind and the names of the forecasts category : str One of the available metrics grouping categories (e.g., total) metric : str One of the available metrics (e.g. mae) Returns ------- figs : dict of figures """ palette = cycle(PALETTE) figs = {} human_category = datamodel.ALLOWED_CATEGORIES[category] metric_name = datamodel.ALLOWED_METRICS[metric] x_axis_label = human_category y_axis_label = metric_name data = df[(df['category'] == category) & (df['metric'] == metric)] x_offset = None # Special handling for x-axis with dates if category == 'weekday': x_ticks = calendar.day_abbr[0:] x_axis_kwargs = {'tickvals': x_ticks, 'range': (-.5, len(x_ticks))} elif category == 'hour': x_ticks = list(range(25)) x_axis_kwargs = {'tickvals': x_ticks, 'range': (-.5, len(x_ticks))} # plotly's offset of 0, makes the bars left justified at the tick x_offset = 0 elif category == 'year': x_axis_kwargs = {'dtick': 1} elif category == 'date': # Sets a '{month} {day}' tick label format when zoomed in to one week # of data. Plotly's default behavior at this zoom range is to display # date and time, which causes crowding. Ranges are defined in # miliseconds, with 604800000 being 7 days, and None being the absolute # minimum. When zoomed out beyond one week, Plotly's default behavior # takes over and intelligently displays day, month and year reducing to # month and year as the user zooms out further. x_axis_kwargs = {'tickformatstops': [ dict(dtickrange=[None, 604800000], value='%b %e'), ] } else: x_axis_kwargs = {} y_data = np.asarray(data['value']) if len(y_data) == 0 or np.isnan(y_data).all(): y_range = (None, None) else: y_min = np.nanmin(y_data) y_max = np.nanmax(y_data) y_range = calc_y_start_end(y_min, y_max) y_axis_kwargs = {'range': y_range} unique_names = np.unique(np.asarray(data['name'])) palette = [next(palette) for _ in unique_names] for i, name in enumerate(unique_names): plot_data = data[data['name'] == name] if len(plot_data['index']): x_values = plot_data['index'] else: x_values = [] if category == 'weekday': # Fill with mon-fri values and pass to enforce displaying the full # week of data. y_values = [plot_data[plot_data['index'] == day]['value'].iloc[0] if not plot_data[plot_data['index'] == day].empty else np.nan for day in x_ticks] x_values = x_ticks else: y_values = plot_data['value'] # Create figure title = name + ' ' + metric_name fig = go.Figure() fig.add_trace(go.Bar(x=x_values, y=y_values, offset=x_offset, marker=go.bar.Marker(color=palette[i]))) fig.update_layout( title=f'<b>{title}</b>', xaxis_title=x_axis_label, yaxis_title=y_axis_label, **PLOT_LAYOUT_DEFAULTS) configure_axes(fig, x_axis_kwargs, y_axis_kwargs) figs[name] = fig return figs
def nested_bar(): raise NotImplementedError def joint_distribution(): raise NotImplementedError def marginal_distribution(): raise NotImplementedError def taylor_diagram(): raise NotImplementedError def probabilistic_timeseries(): raise NotImplementedError def reliability_diagram(): raise NotImplementedError def rank_histogram(): raise NotImplementedError
[docs]def output_svg(fig): """ Generates an SVG from the Plotly figure. Errors in the process are logged and an SVG with error text is returned. Parameters ---------- fig : plotly.graph_objects.Figure Returns ------- svg : str """ try: svg = fig.to_image(format='svg').decode('utf-8') except Exception: try: name = fig.layout.title['text'][3:-4] except Exception: name = 'unnamed' logger.error('Could not generate SVG for figure %s', name) svg = ( '<svg width="100%" height="100%">' '<text x="50" y="50" class="alert alert-error">' 'Unable to generate SVG plot.' '</text>' '</svg>') return svg
def output_pdf(fig): """ Generates an PDF from the Plotly figure. Errors in the process are logged and an PDF with error text is returned. Parameters ---------- fig : plotly.graph_objects.Figure Returns ------- pdf : str An ASCII-85 encoded PDF """ # If height is explicitly set on the plot, remove it before generating # a pdf. Needs to be reset at the end of the function. height = None if fig.layout.height is not None: height = fig.layout.pop('height') try: pdf = base64.a85encode( fig.to_image(format='pdf') ).decode('utf-8') except Exception: try: name = fig.layout.title['text'][3:-4] except Exception: name = 'unnamed' logger.error('Could not generate PDF for figure %s', name) # should have same text as fail SVG pdf = fail_pdf # replace height if removed if height is not None: fig.layout.height = height return pdf
[docs]def raw_report_plots(report, metrics): """Create a RawReportPlots object from the metrics of a report. Parameters ---------- report: :py:class:`solarforecastarbiter.datamodel.Report` metrics: tuple of :py:class:`solarforecastarbiter.datamodel.MetricResult` Returns ------- :py:class:`solarforecastarbiter.datamodel.RawReportPlots` """ metrics_df = construct_metrics_dataframe(metrics, rename=abbreviate) # Create initial bar figures figure_dict = {} # Components for other metrics for category in report.report_parameters.categories: for metric in report.report_parameters.metrics: if category == 'total': fig = bar(metrics_df, metric) figure_dict[f'total::{metric}::all'] = fig else: figs = bar_subdivisions(metrics_df, category, metric) for name, fig in figs.items(): figure_dict[f'{category}::{metric}::{name}'] = fig mplots = [] for k, v in figure_dict.items(): cat, met, name = k.split('::', 2) figure_spec = v.to_json() pdf = output_pdf(v) mplots.append(datamodel.PlotlyReportFigure( name=name, category=cat, metric=met, spec=figure_spec, pdf=pdf, figure_type='bar')) out = datamodel.RawReportPlots(tuple(mplots), plotly_version) return out
[docs]def timeseries_plots(report): """Return the components for timeseries and scatter plots of the processed forecasts and observations. Parameters ---------- report: :py:class:`solarforecastarbiter.datamodel.Report` Returns ------- timeseries_spec: str String json specification of the timeseries plot. None if no forecast values are available. scatter_spec: None or str String json specification of the scatter plot. None if no observation values are available. timeseries_prob_spec: None or str If report contains a probabilistic forecast with axis='x', string json specification of the probability vs. time plot. Otherwise None. includes_distribution: bool True if the a plot was created for a pair containing a ProbabilisticForecast. """ value_df, meta_df = construct_timeseries_dataframe(report) if value_df.empty: # No forecast data, don't plot anything return None, None, None, False pfxobs = report.raw_report.processed_forecasts_observations units = pfxobs[0].original.forecast.units units = units.replace('^2', '<sup>2</sup>') # data (units) vs time plot for the observation, deterministic fx, # and y-axis probabilistic fx ts_fig = timeseries( value_df, meta_df, report.report_parameters.start, report.report_parameters.end, units, (None, 'y'), report.raw_report.timezone) ts_fig.update_layout( plot_bgcolor=PLOT_BGCOLOR, font=dict(size=14), margin=PLOT_MARGINS, ) if ts_fig.data: ts_fig_json = ts_fig.to_json() else: ts_fig_json = None # probability vs time plot for the x-axis probabilistic fx if any( ( isinstance(pfxob.original.forecast, ( datamodel.ProbabilisticForecast, datamodel.ProbabilisticForecastConstantValue)) and pfxob.original.forecast.axis == 'x') for pfxob in pfxobs ): ts_prob_fig = timeseries( value_df, meta_df, report.report_parameters.start, report.report_parameters.end, units, ('x',), report.raw_report.timezone) ts_prob_fig.update_layout( plot_bgcolor=PLOT_BGCOLOR, font=dict(size=14), margin=PLOT_MARGINS, ) if ts_prob_fig.data: ts_prob_fig_json = ts_prob_fig.to_json() else: ts_prob_fig_json = None else: ts_prob_fig_json = None # switch secondary plot based on forecast type pfxobs = report.raw_report.processed_forecasts_observations fx = pfxobs[0].original.forecast if isinstance(fx, datamodel.EventForecast): scat_fig = event_histogram(value_df, meta_df) scat_fig.update_layout( plot_bgcolor=PLOT_BGCOLOR, font=dict(size=14), margin=PLOT_MARGINS, ) else: margin = PLOT_MARGINS.copy() margin.pop('pad', None) scat_fig = scatter(value_df, meta_df, units) scat_fig.update_layout( plot_bgcolor=PLOT_BGCOLOR, font=dict(size=14), width=700, height=500, autosize=False, xaxis=dict(scaleanchor="y", scaleratio=1, constrain="domain"), yaxis=dict(constrain="domain"), margin=margin, ) if scat_fig.data: scat_fig_json = scat_fig.to_json() else: scat_fig_json = None includes_distribution = ts_fig_json is not None and any( ( isinstance(pfxob.original.forecast, datamodel.ProbabilisticForecast) and pfxob.original.forecast.axis == 'y') for pfxob in pfxobs) return (ts_fig_json, scat_fig_json, ts_prob_fig_json, includes_distribution)