Source code for solarforecastarbiter.reference_forecasts.forecast

"""
Functions for forecasting.
"""
import datetime

import pandas as pd
import numpy as np

from solarforecastarbiter import pvmodel

# some functions originally implemented in pvlib-python.
# find pvlib-python license in licenses directory.


[docs]def cloud_cover_to_ghi_linear(cloud_cover, ghi_clear, offset=35): """ Convert cloud cover to GHI using a linear relationship. 0% cloud cover returns ghi_clear. 100% cloud cover returns offset*ghi_clear. Parameters ---------- cloud_cover: numeric Cloud cover in %. ghi_clear: numeric GHI under clear sky conditions. offset: numeric, default 35 Determines the minimum GHI. Returns ------- ghi: numeric Cloudy sky GHI. References ---------- Larson et. al. "Day-ahead forecasting of solar power output from photovoltaic plants in the American Southwest" Renewable Energy 91, 11-20 (2016). """ offset = offset / 100. cloud_cover = cloud_cover / 100. ghi = (offset + (1 - offset) * (1 - cloud_cover)) * ghi_clear return ghi
[docs]def cloud_cover_to_irradiance_ghi_clear(cloud_cover, ghi_clear, zenith): """ Estimates irradiance from cloud cover in the following steps: 1. Estimate cloudy sky GHI using a function of cloud_cover and ghi_clear: :py:func:`cloud_cover_to_ghi_linear` 2. Estimate cloudy sky DNI and DHI using the Erbs model. Parameters ---------- site : datamodel.Site cloud_cover : Series Cloud cover in %. zenith : Series Solar zenith Returns ------- ghi : pd.Series, dni : pd.Series, dhi : pd.Series """ ghi = cloud_cover_to_ghi_linear(cloud_cover, ghi_clear) dni, dhi = pvmodel.complete_irradiance_components(ghi, zenith) return ghi, dni, dhi
[docs]def cloud_cover_to_irradiance(latitude, longitude, elevation, cloud_cover, apparent_zenith, zenith): """ Estimates irradiance from cloud cover in the following steps: 1. Determine clear sky GHI using Ineichen model and climatological turbidity. 2. Estimate cloudy sky GHI using a function of cloud_cover and ghi_clear: :py:func:`cloud_cover_to_ghi_linear` 3. Estimate cloudy sky DNI and DHI using the Erbs model. Don't use this function if you already have clear sky GHI. Instead, use :py:func:`cloud_cover_to_irradiance_ghi_clear` Parameters ---------- latitude : float longitude : float elevation : float cloud_cover : Series Cloud cover in %. apparent_zenith : Series Solar apparent zenith zenith : Series Solar zenith Returns ------- ghi : pd.Series dni : pd.Series dhi : pd.Series See also -------- cloud_cover_to_irradiance_ghi_clear cloud_cover_to_ghi_linear """ cs = pvmodel.calculate_clearsky(latitude, longitude, elevation, apparent_zenith) ghi, dni, dhi = cloud_cover_to_irradiance_ghi_clear( cloud_cover, cs['ghi'], zenith) return ghi, dni, dhi
[docs]def resample(arg, freq='1h', label=None): """Resamples an argument, allowing for None. Use with map. Useful for applying resample to unknown model output (e.g. AC power is None if no plant metadata is provided). Parameters ---------- arg : pd.Series or None freq : str label : str Sets pandas resample's label and closed kwargs. Returns ------- pd.Series or None """ if arg is None: return None else: return arg.resample(freq, label=label, closed=label).mean()
[docs]def reindex_fill_slice(arg, freq='5min', label=None, start=None, end=None, start_slice=None, end_slice=None, fill_method='interpolate'): """Reindex data to shorter intervals (create NaNs) from `start` to `end`, fill NaNs in gaps using `fill_method`, fill NaNs before first valid time using bfill, fill NaNs after last valid time using ffill, then slice output from `start_slice` to `end_slice`. """ if arg is None or arg.empty: return arg if start is None: start_reindex = arg.index[0] else: start_reindex = min(pd.Timestamp(start), arg.index[0]) if end is None: end_reindex = arg.index[-1] else: end_reindex = max(pd.Timestamp(end), arg.index[-1]) index = pd.date_range(start=start_reindex, end=end_reindex, freq=freq, closed=label) reindexed = arg.reindex(index) filled = getattr(reindexed, fill_method)() sliced = filled.loc[start_slice:end_slice].bfill().ffill() return sliced
[docs]def unmix_intervals(mixed, lower=0, upper=100): """Convert mixed interval averages into pure interval averages. For example, the GFS 3 hour output contains the following data: * forecast hour 3: average cloud cover from 0 - 3 hours * forecast hour 6: average cloud cover from 0 - 6 hours * forecast hour 9: average cloud cover from 6 - 9 hours * forecast hour 12: average cloud cover from 6 - 12 hours and so on. This function returns: * forecast hour 3: average cloud cover from 0 - 3 hours * forecast hour 6: average cloud cover from 3 - 6 hours * forecast hour 9: average cloud cover from 6 - 9 hours * forecast hour 12: average cloud cover from 9 - 12 hours Parameters ---------- data : pd.Series The first time must be the first output of a cycle. lower : None or float Lower bound. Useful for handling numerical precision issues in input data. upper : None or float Upper bound of output. Useful for handling numerical precision issues in input data. Returns ------- pd.Series Data is the unmixed interval average with ending label. """ intervals = (mixed.index[1:] - mixed.index[:-1]).unique() if len(intervals) > 1: raise ValueError('multiple interval lengths detected. slice forecasts ' 'into sections with unique interval lengths first.') interval = intervals[0] start = mixed.index[0] mixed_vals = np.array(mixed) if interval == pd.Timedelta('1h'): _check_start_time(start, interval) # mixed_1...mixed_6 are the values in the raw forecast file at # each hour of the mixed interval period. Let f1...f6 be unmixed, # true hourly average values. The relationship is: # mixed_1 = f1 # mixed_2 = (f1 + f2) / 2 # mixed_3 = (f1 + f2 + f3) / 3 # mixed_4 = (f1 + f2 + f3 + f4) / 4 # mixed_5 = (f1 + f2 + f3 + f4 + f5) / 5 # mixed_6 = (f1 + f2 + f3 + f4 + f5 + f6) / 6 # some algebra will show that the f1...f6 can be obtained as # coded below. # the cycle repeats itself after 6 hours so # mixed_7 = f7 # mixed_8 = (f8 + f9) / 2 ... # To efficiently compute the values for all forecast times, # we use slices for every 6th element, calculate the forecasts # at every 6th point, then interleave them by constructing a 2D # array and reshaping it to a 1D array. mixed_1 = mixed_vals[0::6] mixed_2 = mixed_vals[1::6] mixed_3 = mixed_vals[2::6] mixed_4 = mixed_vals[3::6] mixed_5 = mixed_vals[4::6] mixed_6 = mixed_vals[5::6] f1 = mixed_1 f2 = 2 * mixed_2 - mixed_1 f3 = 3 * mixed_3 - 2 * mixed_2 f4 = 4 * mixed_4 - 3 * mixed_3 f5 = 5 * mixed_5 - 4 * mixed_4 f6 = 6 * mixed_6 - 5 * mixed_5 f = np.array([f1, f2, f3, f4, f5, f6]) elif interval == pd.Timedelta('3h'): _check_start_time(start, interval) # similar to above, but # mixed_3 = f_0_3 # mixed_6 = (f_0_3 + f_3_6) / 2 f3 = mixed_vals[0::2] f6 = 2 * mixed_vals[1::2] - f3 f = np.array([f3, f6]) else: raise ValueError('mixed period must be 6 hours and data interval must ' 'be 3 hours or 1 hour') unmixed = pd.Series(f.reshape(-1, order='F'), index=mixed.index) unmixed = unmixed.clip(lower=0, upper=100) return unmixed
def _check_start_time(start, interval): """ start : pd.Timestamp Must be timezone aware. interval : pd.Timedelta Time between data points. """ # for this to work, the first value must belong to the # first time of a mixed interval period. Assuming GFS data... start_time = start.tz_convert('UTC').time() allowed_times_interval = { pd.Timedelta('1h'): (1, 7, 13, 19), pd.Timedelta('3h'): (3, 9, 15, 21) } allowed_times = allowed_times_interval[interval] if any(start_time == datetime.time(t) for t in allowed_times): return else: raise ValueError(f'for {interval} mixed intervals, start time must ' f'be one of {allowed_times}Z hours')
[docs]def sort_gefs_frame(frame): """Sort a DataFrame from a GEFS forecast. Column 0 is the smallest value at each time. Column 20 is the largest value at each time. """ if frame is None: return frame else: return pd.DataFrame(np.sort(frame), index=frame.index)