Source code for solarforecastarbiter.validation.validator

# -*- coding: utf-8 -*-
"""
Created on Fri Feb 15 14:08:20 2019

@author: cwhanse
"""

import numpy as np
import pandas as pd
from pvlib.tools import cosd
from pvlib.irradiance import clearsky_index


from solarforecastarbiter.validation.quality_mapping import mask_flags


QCRAD_LIMITS = {'ghi_ub': {'mult': 1.5, 'exp': 1.2, 'min': 100},
                'dhi_ub': {'mult': 0.95, 'exp': 1.2, 'min': 50},
                'dni_ub': {'mult': 1.0, 'exp': 0.0, 'min': 0},
                'ghi_lb': -4, 'dhi_lb': -4, 'dni_lb': -4}

QCRAD_CONSISTENCY = {
    'ghi_ratio':
        {'low_zenith':
            {'zenith_bounds': [0.0, 75], 'ghi_bounds': [50, np.Inf],
             'ratio_bounds': [0.92, 1.08]},
         'high_zenith':
            {'zenith_bounds': [75, 93], 'ghi_bounds': [50, np.Inf],
             'ratio_bounds': [0.85, 1.15]}},
    'dhi_ratio':
        {'low_zenith':
            {'zenith_bounds': [0.0, 75], 'ghi_bounds': [50, np.Inf],
             'ratio_bounds': [0.0, 1.05]},
         'high_zenith':
            {'zenith_bounds': [75, 93], 'ghi_bounds': [50, np.Inf],
             'ratio_bounds': [0.0, 1.10]}}}


def _check_limits(val, lb=None, ub=None, lb_ge=False, ub_le=False):
    """ Returns True where lb < (or <=) val < (or <=) ub
    """
    if lb_ge:
        lb_op = np.greater_equal
    else:
        lb_op = np.greater
    if ub_le:
        ub_op = np.less_equal
    else:
        ub_op = np.less

    if (lb is not None) & (ub is not None):
        return lb_op(val, lb) & ub_op(val, ub)
    elif lb is not None:
        return lb_op(val, lb)
    elif ub is not None:
        return ub_op(val, ub)
    else:
        raise ValueError('must provide either upper or lower bound')


def _QCRad_ub(dni_extra, sza, lim):
    cosd_sza = cosd(sza)
    cosd_sza[cosd_sza < 0] = 0
    return lim['mult'] * dni_extra * cosd_sza**lim['exp'] + lim['min']


[docs]@mask_flags('LIMITS EXCEEDED') def check_ghi_limits_QCRad(ghi, solar_zenith, dni_extra, limits=None): """ Tests for physical limits on GHI using the QCRad criteria. Test passes if a value > lower bound and value < upper bound. Lower bounds are constant for all tests. Upper bounds are calculated as .. math:: ub = min + mult * dni_extra * cos( solar_zenith)^exp Parameters ---------- ghi : Series Global horizontal irradiance in W/m^2 solar_zenith : Series Solar zenith angle in degrees dni_extra : Series Extraterrestrial normal irradiance in W/m^2 limits : dict, default QCRAD_LIMITS for keys 'ghi_ub', 'dhi_ub', 'dni_ub', value is a dict with keys {'mult', 'exp', 'min'}. For keys 'ghi_lb', 'dhi_lb', 'dni_lb', value is a float. Returns ------- ghi_limit_flag : Series True if value passes physically-possible test """ if not limits: limits = QCRAD_LIMITS ghi_ub = _QCRad_ub(dni_extra, solar_zenith, limits['ghi_ub']) ghi_limit_flag = _check_limits(ghi, limits['ghi_lb'], ghi_ub) ghi_limit_flag.name = 'ghi_limit_flag' return ghi_limit_flag
[docs]@mask_flags('LIMITS EXCEEDED') def check_dhi_limits_QCRad(dhi, solar_zenith, dni_extra, limits=None): """ Tests for physical limits on DHI using the QCRad criteria. Test passes if a value > lower bound and value < upper bound. Lower bounds are constant for all tests. Upper bounds are calculated as .. math:: ub = min + mult * dni_extra * cos( solar_zenith)^exp Parameters ---------- dhi : Series Diffuse horizontal irradiance in W/m^2 solar_zenith : Series Solar zenith angle in degrees dni_extra : Series Extraterrestrial normal irradiance in W/m^2 limits : dict, default QCRAD_LIMITS for keys 'ghi_ub', 'dhi_ub', 'dni_ub', value is a dict with keys {'mult', 'exp', 'min'}. For keys 'ghi_lb', 'dhi_lb', 'dni_lb', value is a float. Returns ------- dhi_limit_flag : Series True if value passes physically-possible test """ if not limits: limits = QCRAD_LIMITS dhi_ub = _QCRad_ub(dni_extra, solar_zenith, limits['dhi_ub']) dhi_limit_flag = _check_limits(dhi, limits['dhi_lb'], dhi_ub) dhi_limit_flag.name = 'dhi_limit_flag' return dhi_limit_flag
[docs]@mask_flags('LIMITS EXCEEDED') def check_dni_limits_QCRad(dni, solar_zenith, dni_extra, limits=None): """ Tests for physical limits on DNI using the QCRad criteria. Test passes if a value > lower bound and value < upper bound. Lower bounds are constant for all tests. Upper bounds are calculated as .. math:: ub = min + mult * dni_extra * cos( solar_zenith)^exp Parameters ---------- dni : Series Direct normal irradiance in W/m^2 solar_zenith : Series Solar zenith angle in degrees dni_extra : Series Extraterrestrial normal irradiance in W/m^2 limits : dict, default QCRAD_LIMITS for keys 'ghi_ub', 'dhi_ub', 'dni_ub', value is a dict with keys {'mult', 'exp', 'min'}. For keys 'ghi_lb', 'dhi_lb', 'dni_lb', value is a float. Returns ------- dni_limit_flag : Series True if value passes physically-possible test """ if not limits: limits = QCRAD_LIMITS dni_ub = _QCRad_ub(dni_extra, solar_zenith, limits['dni_ub']) dni_limit_flag = _check_limits(dni, limits['dni_lb'], dni_ub) dni_limit_flag.name = 'dni_limit_flag' return dni_limit_flag
[docs]@mask_flags('LIMITS EXCEEDED') def check_irradiance_limits_QCRad(solar_zenith, dni_extra, ghi=None, dhi=None, dni=None, limits=None): """ Tests for physical limits on GHI, DHI or DNI using the QCRad criteria. Test passes if a value > lower bound and value < upper bound. Lower bounds are constant for all tests. Upper bounds are calculated as .. math:: ub = min + mult * dni_extra * cos( solar_zenith)^exp Parameters ---------- solar_zenith : Series Solar zenith angle in degrees dni_extra : Series Extraterrestrial normal irradiance in W/m^2 ghi : Series or None, default None Global horizontal irradiance in W/m^2 dhi : Series or None, default None Diffuse horizontal irradiance in W/m^2 dni : Series or None, default None Direct normal irradiance in W/m^2 limits : dict, default QCRAD_LIMITS for keys 'ghi_ub', 'dhi_ub', 'dni_ub', value is a dict with keys {'mult', 'exp', 'min'}. For keys 'ghi_lb', 'dhi_lb', 'dni_lb', value is a float. Returns ------- ghi_limit_flag : Series or None, default None True if value passes physically-possible test dhi_limit_flag : Series or None, default None dhi_limit_flag : Series or None, default None References ---------- [1] C. N. Long and Y. Shi, An Automated Quality Assessment and Control Algorithm for Surface Radiation Measurements, The Open Atmospheric Science Journal 2, pp. 23-37, 2008. """ if not limits: limits = QCRAD_LIMITS if ghi is not None: ghi_limit_flag = check_ghi_limits_QCRad(ghi, solar_zenith, dni_extra, limits=None) else: ghi_limit_flag = None if dhi is not None: dhi_limit_flag = check_dhi_limits_QCRad(dhi, solar_zenith, dni_extra, limits=None) else: dhi_limit_flag = None if dni is not None: dni_limit_flag = check_dni_limits_QCRad(dni, solar_zenith, dni_extra, limits=None) else: dni_limit_flag = None return ghi_limit_flag, dhi_limit_flag, dni_limit_flag
def _get_bounds(bounds): return (bounds['ghi_bounds'][0], bounds['ghi_bounds'][1], bounds['zenith_bounds'][0], bounds['zenith_bounds'][1], bounds['ratio_bounds'][0], bounds['ratio_bounds'][1]) def _check_irrad_ratio(ratio, ghi, sza, bounds): # unpack bounds dict ghi_lb, ghi_ub, sza_lb, sza_ub, ratio_lb, ratio_ub = _get_bounds(bounds) # for zenith set lb_ge to handle edge cases, e.g., zenith=0 return ((_check_limits(sza, lb=sza_lb, ub=sza_ub, lb_ge=True)) & (_check_limits(ghi, lb=ghi_lb, ub=ghi_ub)) & (_check_limits(ratio, lb=ratio_lb, ub=ratio_ub)))
[docs]@mask_flags('INCONSISTENT IRRADIANCE COMPONENTS') def check_irradiance_consistency_QCRad(ghi, solar_zenith, dni_extra, dhi, dni, param=None): """ Checks consistency of GHI, DHI and DNI. Not valid for night time. Parameters ---------- ghi : Series Global horizontal irradiance in W/m^2 solar_zenith : Series Solar zenith angle in degrees dni_extra : Series Extraterrestrial normal irradiance in W/m^2 dhi : Series Diffuse horizontal irradiance in W/m^2 dni : Series Direct normal irradiance in W/m^2 param : dict keys are 'ghi_ratio' and 'dhi_ratio'. For each key, value is a dict with keys 'high_zenith' and 'low_zenith'; for each of these keys, value is a dict with keys 'zenith_bounds', 'ghi_bounds', and 'ratio_bounds' and value is an ordered pair [lower, upper] of float. Returns ------- consistent_components : Series True if ghi, dhi and dni components are consistent. diffuse_ratio_limit : Series True if diffuse to ghi ratio passes limit test. References ---------- [1] C. N. Long and Y. Shi, An Automated Quality Assessment and Control Algorithm for Surface Radiation Measurements, The Open Atmospheric Science Journal 2, pp. 23-37, 2008. """ if not param: param = QCRAD_CONSISTENCY # sum of components component_sum = dni * cosd(solar_zenith) + dhi ghi_ratio = ghi / component_sum dhi_ratio = dhi / ghi bounds = param['ghi_ratio'] consistent_components = ( _check_irrad_ratio(ratio=ghi_ratio, ghi=component_sum, sza=solar_zenith, bounds=bounds['high_zenith']) | _check_irrad_ratio(ratio=ghi_ratio, ghi=component_sum, sza=solar_zenith, bounds=bounds['low_zenith'])) consistent_components.name = 'consistent_components' bounds = param['dhi_ratio'] diffuse_ratio_limit = ( _check_irrad_ratio(ratio=dhi_ratio, ghi=ghi, sza=solar_zenith, bounds=bounds['high_zenith']) | _check_irrad_ratio(ratio=dhi_ratio, ghi=ghi, sza=solar_zenith, bounds=bounds['low_zenith'])) diffuse_ratio_limit.name = 'diffuse_ratio_limit' return consistent_components, diffuse_ratio_limit
[docs]@mask_flags('LIMITS EXCEEDED') def check_temperature_limits(temp_air, temp_limits=(-10., 50.)): """ Checks for extreme temperatures. Parameters ---------- temp_air : Series Air temperature in Celsius temp_limits : tuple, default (-10, 50) (lower bound, upper bound) for temperature. Returns ------- extreme_temp_flag : Series True if temp_air > lower bound and temp_air < upper bound. """ extreme_temp_flag = _check_limits(temp_air, lb=temp_limits[0], ub=temp_limits[1]) extreme_temp_flag.name = 'extreme_temp_flag' return extreme_temp_flag
[docs]@mask_flags('LIMITS EXCEEDED') def check_wind_limits(wind_speed, wind_limits=(0., 60.)): """ Checks for extreme wind speeds. Parameters ---------- wind_speed : Series Wind speed m/s wind_limits : tuple, default (0, 60) (lower bound, upper bound) for wind speed. Returns ------- extreme_wind_flag : Series True if wind_speed > lower bound and wind_speed < upper bound. """ extreme_wind_flag = _check_limits(wind_speed, lb=wind_limits[0], ub=wind_limits[1], lb_ge=True) extreme_wind_flag.name = 'extreme_wind_flag' return extreme_wind_flag
[docs]@mask_flags('LIMITS EXCEEDED') def check_rh_limits(rh, rh_limits=(0, 100)): """ Checks for extreme relative humidity. Parameters ---------- rh : Series Relative humidity in % rh_limits : tuple, default (0, 100) (lower bound, upper bound) for relative humidity Returns ------- flags : Series True if rh >= lower bound and rh <= upper bound. """ flags = _check_limits(rh, lb=rh_limits[0], ub=rh_limits[1], lb_ge=True, ub_le=True) return flags
[docs]@mask_flags('CLEARSKY EXCEEDED') def check_ghi_clearsky(ghi, ghi_clearsky, kt_max=1.1): """ Flags GHI values greater than clearsky values. Parameters ---------- ghi : Series Global horizontal irradiance in W/m^2 ghi_clearsky : Series Global horizontal irradiance in W/m^2 under clear sky conditions kt_max : float maximum clearness index that defines when ghi exceeds clear-sky value. Returns ------- flags : Series True if ghi is less than or equal to clear sky value. """ kt = clearsky_index(ghi, ghi_clearsky, max_clearsky_index=np.Inf) flags = _check_limits(kt, ub=kt_max, ub_le=True) return flags
[docs]@mask_flags('CLEARSKY EXCEEDED') def check_poa_clearsky(poa_global, poa_clearsky, kt_max=1.1): """ Flags plane of array irradiance values greater than clearsky values. Parameters ---------- poa_global : Series Plane of array irradiance in W/m^2 poa_clearsky : Series Plane of array irradiance under clear sky conditions, in W/m^2 kt_max : float maximum allowed ratio of poa_global to poa_clearsky Returns ------- flags : Series True if poa_global is less than or equal to clear sky value. """ kt = clearsky_index(poa_global, poa_clearsky, max_clearsky_index=np.Inf) flags = _check_limits(kt, ub=kt_max, ub_le=True) return flags
[docs]@mask_flags('NIGHTTIME') def check_irradiance_day_night(solar_zenith, max_zenith=87): """ Checks for day/night periods based on solar zenith. Parameters ---------- solar_zenith : Series Solar zenith angle in degrees max_zenith : maximum zenith angle for a daylight time Returns ------- flags : Series True when solar zenith is less than max_zenith. """ flags = _check_limits(solar_zenith, ub=max_zenith) return flags
[docs]@mask_flags('UNEVEN FREQUENCY') def check_timestamp_spacing(times, freq): """ Checks if spacing between times conforms to freq. Parameters ---------- times : DatetimeIndex freq : string or Timedelta Expected frequency of times Returns ------- flags : Series True when the difference between one time and the time before conforms to freq """ if not isinstance(freq, pd.Timedelta): freq = pd.Timedelta(freq) # first value is NaT, rest are timedeltas delta = times.to_series(keep_tz=True).diff() flags = delta == freq flags.iloc[0] = True return flags
def _all_close_to_first(x, rtol=1e-5, atol=1e-8): """ Returns True if all values in x are close to x[0]. Parameters ---------- x : array rtol : float, default 1e-5 relative tolerance for detecting a change in data values atol : float, default 1e-8 absolute tolerance for detecting a change in data values Returns ------- Boolean """ return np.allclose(a=x, b=x[0], rtol=rtol, atol=atol)
[docs]@mask_flags('STALE VALUES', invert=False) def detect_stale_values(x, window=3, rtol=1e-5, atol=1e-8): """ Detects stale data. For a window of length N, the last value (index N-1) is considered stale if all values in the window are close to the first value (index 0). Parameters ---------- x : Series data to be processed window : int, default 3 number of consecutive values which, if unchanged, indicates stale data rtol : float, default 1e-5 relative tolerance for detecting a change in data values atol : float, default 1e-8 absolute tolerance for detecting a change in data values Parameters rtol and atol have the same meaning as in numpy.allclose Returns ------- flags : Series True if the value is part of a stale sequence of data Raises ------ ValueError if window < 2 """ if window < 2: raise ValueError('window set to {}, must be at least 2'.format(window)) flags = x.rolling(window=window).apply( _all_close_to_first, raw=True, kwargs={'rtol': rtol, 'atol': atol} ).fillna(False).astype(bool) return flags
[docs]@mask_flags('INTERPOLATED VALUES', invert=False) def detect_interpolation(x, window=3, rtol=1e-5, atol=1e-8): """ Detects sequences of data which appear linear. Sequences are linear if the first difference appears to be constant. For a window of length N, the last value (index N-1) is flagged if all values in the window appear to be a line segment. Parameters ---------- x : series data to be processed window : int, default 3 number of sequential values that, if the first difference is constant, are classified as a linear sequence rtol : float, default 1e-5 tolerance relative to max(abs(x.diff()) for detecting a change atol : float, default 1e-8 absolute tolerance for detecting a change in first difference Returns ------- flags : Series True if the value is part of a linear sequence Raises ------ ValueError if window < 3 """ if window < 3: raise ValueError('window set to {}, must be at least 3'.format(window)) # reduce window by 1 because we're passing the first difference flags = detect_stale_values(x.diff(periods=1), window=window-1, rtol=rtol, atol=atol) return flags
[docs]def detect_levels(x, count=3, num_bins=100): """ Detects plateau levels in data. Parameters ---------- x : Series data to be processed count : int number of plateaus to return num_bins : int number of bins to use in histogram that finds plateau levels Returns ------- levels : list of tuples (left, right) values of the interval in x with a detected plateau, in decreasing order of count of x values in the interval. List length is given by the kwarg count """ hist, bin_edges = np.histogram(x, bins=num_bins, density=True) level_index = np.argsort(hist * -1) # decreasing order levels = [(bin_edges[i], bin_edges[i + 1]) for i in level_index[:count]] return levels, bin_edges
def _label_clipping(x, window, frac): """ Returns Series with True at the end of each window with sum(x(window)) >= window * frac. """ tmp = x.rolling(window).sum() y = (tmp >= window * frac) & x.astype(bool) return y
[docs]@mask_flags('CLIPPED VALUES', invert=False) def detect_clipping(ac_power, window=4, fraction_in_window=0.75, rtol=5e-3, levels=2): """ Detects clipping in a series of AC power. Possible clipped power levels are found by detect_levels. Within each sliding window, clipping is indicated when at least fraction_in_window of points are close to a clipped power level. Parameters ---------- ac_power : Series data to be processed window : int number of data points defining the length of a rolling window fraction_in_window : float fraction of points which indicate clipping if AC power at each point is close to the plateau level rtol : float a point is close to a clipped level M if abs(ac_power - M) < rtol * max(ac_power) levels : int number of clipped power levels to consider. Returns ------- flags : Series True when clipping is indicated. """ num_bins = np.ceil(1.0 / rtol).astype(int) flags = pd.Series(index=ac_power.index, data=False) power_plateaus, bins = detect_levels(ac_power, count=levels, num_bins=num_bins) for lower, upper in power_plateaus: temp = pd.Series(index=ac_power.index, data=0.0) temp.loc[(ac_power >= lower) & (ac_power <= upper)] = 1.0 flags = flags | _label_clipping(temp, window=window, frac=fraction_in_window) return flags