Source code for xedocs.schemas.corrections.base_corrections

import re
import rframe
import datetime
import pandas as pd

from typing import ClassVar, List
from pydantic import validator, BaseModel, Field, root_validator
from rframe.dispatchers import are_equal
from rframe.types import TimeInterval

from ..._settings import settings
from ..base_schemas import VersionedXeDoc


def camel_to_snake(name):
    name = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", name)
    return re.sub("([a-z0-9])([A-Z])", r"\1_\2", name).lower()


class Review(BaseModel):
    reviewer: str = Field(max_length=80)
    approved: bool = False
    comments: str = ""


[docs] class BaseCorrectionSchema(VersionedXeDoc): """Base class for all correction schemas. This class ensures: - the _ALIAS attribute is always unique - schema includes a version index - changing already set values is disallowed """ _ALIAS: ClassVar = "" _CATEGORY = "corrections" _CORRECTIONS = {} __MUTABLE__ = ('comments',) created_date: datetime.datetime = Field(default_factory=settings.clock.current_datetime) comments: str = ""
[docs] @validator("created_date") def normalize_tz(cls, v): """Normalize timezone info. Pandas requires uniform timezone awareness """ return settings.clock.normalize_tz(v)
def __init_subclass__(cls) -> None: if cls._ALIAS in cls.__dict__ and cls._ALIAS not in cls._CORRECTIONS: cls._CORRECTIONS[cls._ALIAS] = cls super().__init_subclass__()
[docs] def pre_update(self, datasource, new): """This method is called if the `new` document is being saved and self was found to already exist in the datasource. By default we check that all values are the same. The reason this execption is needed is because the found document may not actually exist in the datasource and may be interpolated, so we allow updating documents with identical values. Otherwise we raise an error, preventing the update. """ for name, value in self.column_values.items(): if name in self.__MUTABLE__: # This field is mutable, ignore. continue assert are_equal(value, getattr(new, name)), f"{name} field is immutable."
[docs] def pre_delete(self, datasource, **kwargs): raise RuntimeError("Corrections are append only.")
[docs] class TimeIntervalCorrection(BaseCorrectionSchema): """Base class for time-interval corrections - Adds an Interval index of type datetime - Enforces rules on updating intervals: Can only change the right side of an interval if right side is None and the new right side is after the cutoff time (default is 2 hours after current time). The cutoff is set to prevent values changing after already being used for processing data. """ class Config: allow_population_by_field_name = True _ALIAS = "" time: rframe.Interval[datetime.datetime] = rframe.IntervalIndex()
[docs] @validator("time", pre=True) def time_string_to_interval(cls, v): """Convert str to time interval""" if isinstance(v, str): try: if "," in v: utc = settings.clock.utc left, right = v.split(",") left, right = pd.to_datetime(left, utc=utc), pd.to_datetime(right, utc=utc) v = TimeInterval(left=left, right=right) else: v = pd.to_datetime(v, utc=utc) except: pass return v
[docs] @root_validator(pre=True) def run_id_to_time_interval(cls, values): if "run_id" in values: run_id = values.pop("run_id") try: values["time"] = settings.run_id_to_interval(run_id) except: values["time"] = run_id return values
[docs] @classmethod def url_protocol(cls, attr, **labels): labels["time"] = settings.extract_time(labels) return super().url_protocol(attr, **labels)
[docs] def pre_update(self, datasource, new): """Since intervals can extend beyond the current time, we want to allow changes to the end time shortening the interval to a point in the future since these values have not yet been used for processing. """ clock = settings.clock current_left = clock.normalize_tz(self.time.left) current_right = clock.normalize_tz(self.time.right) new_left = clock.normalize_tz(new.time.left) new_right = clock.normalize_tz(new.time.right) cutoff = clock.cutoff_datetime(buffer=60) if clock.after_cutoff(current_left) and clock.after_cutoff(new_left): # current and new interval are completely in the future # all changes are allowed return if current_right > new_right: # Interval is being shortened. # We only allow shortening intervals that extend beyong the cutoff time assert clock.after_cutoff( current_right ), f"Can only shorten intervals \ that ends after {cutoff}" # The resulting interval must extend beyong the cutoff time assert clock.after_cutoff( new_right ), f"Can only shorten an interval \ to end after {cutoff}" # Only allow changes to the right side of the interval assert ( current_left == new_left ), f"Can only change endtime of existing interval. \ start time must be {self.time.left}" for name, value in self.column_values.items(): if name in self.__MUTABLE__: # This field is mutable, ignore. continue assert are_equal(value, getattr(new, name)), f"{name} field is immutable."
[docs] def pre_delete(self, datasource, **kwargs): if settings.clock.after_cutoff(self.time.left): # We allow deletion of future values for all versions # if they are completely in the future return # all other cases, deletion is forbiden. raise RuntimeError("Corrections are append only.")
[docs] @classmethod def validity_intervals(cls, datasource=None, **labels): """Returns a list of intervals that are valid for the given labels""" ivs = cls.unique(datasource, fields="time", **labels) ivs = sorted(ivs) if not ivs: return [] merged = ivs[:1] for iv in ivs[1:]: if iv.left == merged[-1].right: merged[-1] = merged[-1].clone(right=iv.right) else: merged.append(iv) return merged
def can_extrapolate(doc): # only extrapolate ONLINE versions # and up until the current time. if doc["version"] == "ONLINE": ts = pd.to_datetime(doc["time"]).to_pydatetime() clock = settings.clock return not clock.after_cutoff(ts) return False
[docs] class TimeSampledCorrection(BaseCorrectionSchema): """Base class for time-sampled corrections - Adds an interpolating index of type datetime - Enforces rules on inserting new data points Since extrapolation is allowed for ONLINE versions Inserting new points before the cutoff is disallowed This is to prevent setting values for times already processed using the extrapolated values. When inserting an ONLINE value after the cutoff, a new document with equal values to the extrapolated values is inserted at the current time to prevent the inserted document from affecting the interpolated values that have already been used for processing. """ class Config: allow_population_by_field_name = True _ALIAS = "" time: datetime.datetime = rframe.InterpolatingIndex( extrapolate=can_extrapolate, )
[docs] @validator("time", pre=True) def normalize_time(cls, v): """Normalize time""" return settings.clock.normalize_tz(v)
[docs] @root_validator(pre=True) def run_id_to_time(cls, values): if "run_id" in values: try: run_id = values.pop("run_id") values["time"] = settings.run_id_to_time(run_id) except: values["time"] = run_id return values
[docs] @classmethod def url_protocol(cls, attr, **labels): labels["time"] = settings.extract_time(labels) return super().url_protocol(attr, **labels)
[docs] def freeze_values(self, datasource): new_index = self.index_labels new_index["time"] = settings.clock.cutoff_datetime(buffer=1) existing = self.find(datasource, **new_index) # If values for the cutoff time are already set, the values may have been # used for processing. We add a sample at the # cutoff time to force interpolation and extrapolation # to match from the last existing sample until the cutoff. if existing: new_doc = existing[0] new_doc.save(datasource)
[docs] def pre_insert(self, datasource): # Inserting ONLINE versions can affect the past # since extrapolation until the current time is allowed # extrapolation will just give the last value, # verses interpolation which is calculated from the last # value and also the newly inserted value. # For this reason, when inserting ONLINE versions # an additional document is inserted with the current time # and values equal to the latest existing document. # This sets all values from the latest document time until now # permanently to the values that would have been used in processing. if self.version == "ONLINE": clock = settings.clock cutoff = clock.cutoff_datetime(buffer=60) assert clock.after_cutoff( self.time ), f"Can only insert online \ values after {cutoff}." self.freeze_values(datasource)
[docs] def pre_delete(self, datasource, **kwargs): cutoff = settings.clock.cutoff_datetime(buffer=60) assert settings.clock.after_cutoff( self.time ), f"Can only delete \ values after {cutoff}." if self.version == "ONLINE": # deleting ONLINE values can affect interpolation of # older values so we need to freeze the values up # until the current time. self.freeze_values(datasource)
[docs] @classmethod def validity_intervals(cls, datasource=None, **labels): left = cls.min(datasource, fields="time", **labels) right = cls.max(datasource, fields="time", **labels) if left is None or right is None: return [] if "version" not in labels or labels["version"] == "ONLINE": right = max(settings.clock.cutoff_datetime(), right) return [rframe.Interval[datetime.datetime](left=left, right=right)]