Source code for xedocs.schemas.corrections.base_references

import datetime
from typing import ClassVar, Literal
from pydantic import constr
import rframe

from xedocs import settings

from .base_corrections import BaseCorrectionSchema, TimeIntervalCorrection


[docs] class CorrectionReference(TimeIntervalCorrection): """A CorrectionReference document references one or more corrections by storing the name and labels required to locate the correction in a datasource """ _ALIAS = "" # arbitrary alias for this reference, # this should match the straxen config name alias: str = rframe.Index(max_length=50) # the global version version: str = rframe.Index(max_length=20) # validity interval of the document time: rframe.Interval[datetime.datetime] = rframe.IntervalIndex() # Name of the correction being referenced correction: constr(max_length=50) # The attribute in the correction being referenced e.g `value` attribute: constr(max_length=50) # The index labels being referenced, eg pmt=[1,2,3], version='v3' etc. labels: dict
[docs] def load(self, datasource=None, **overrides): """Load the referenced documents from the given datasource. """ labels = dict(self.labels, **overrides) if self.correction not in BaseCorrectionSchema._CORRECTIONS: raise KeyError(f"Reference to undefined schema name {self.correction}") schema = BaseCorrectionSchema._CORRECTIONS[self.correction] return schema.find(datasource, **labels)
@property def url_config(self): """Convert reference to a URLConfig URL""" import straxen url = f"{self.correction}://{self.attribute}" url = straxen.URLConfig.format_url_kwargs(url, **self.labels) return url @property def config_dict(self): return {self.name: self.url_config}
[docs] class BaseResourceReference(TimeIntervalCorrection): _ALIAS = "" fmt: ClassVar = "text" value: str
[docs] def pre_insert(self, datasource): """require the existence of the resource being referenced prior to inserting a new document. This is to avoid typos etc. """ self.load() super().pre_insert(datasource)
[docs] def load(self, **kwargs): import straxen kwargs.setdefault("fmt", self.fmt) return straxen.get_resource(self.value, **kwargs)
@property def url_config(self): return f"resource://{self.value}?fmt={self.fmt}"
[docs] class BaseMap(BaseResourceReference): _ALIAS = "" _ALLOWED_FORMATS = ['json.gz', 'json', 'tar.gz', 'tar', 'pkl.gz', 'pkl', 'npy', 'dill.gz','dill', 'npy_pickle', 'binary', 'text', 'txt', 'csv'] algorithm: Literal["cnn", "gcn", "mlp"] = rframe.Index() value: str @property def fmt(self): for fmt in self._ALLOWED_FORMATS: if self.value.endswith(fmt): break else: raise ValueError(f"Unknown format for {self.value}") return fmt @property def local_file(self): import straxen downloader = straxen.MongoDownloader() return downloader.download_single(self.value) @property def file(self): import straxen file = straxen.get_resource(self.value, fmt=self.fmt) return file @property def map(self): import straxen return straxen.InterpolatingMap(self.file)