Source code for xedocs.schemas.analysis.processing_requests
import datetime
import pydantic
import rframe
try:
import utilix
uconfig = utilix.uconfig
from utilix import xent_collection
except ImportError:
uconfig = None
def xent_collection(**kwargs):
raise RuntimeError("utilix not configured")
from pydantic import validator
from typing import Literal
from .base_analysis import BaseAnalysisSchema
from ..._settings import settings
RSE_TYPE = Literal[
"SURFSARA_USERDISK",
"SDSC_USERDISK",
"LNGS_USERDISK",
"UC_OSG_USERDISK",
"UC_DALI_USERDISK",
"CNAF_USERDISK",
]
def xeauth_user():
if uconfig is None:
return "unknown"
return uconfig.get("xeauth", "api_user", fallback="unknown")
[docs]
class ProcessingRequest(BaseAnalysisSchema):
"""Schema definition for a processing request"""
_ALIAS = "processing_requests"
data_type: str = rframe.Index()
lineage_hash: str = rframe.Index()
run_id: str = rframe.Index()
destination: RSE_TYPE = rframe.Index(default="UC_DALI_USERDISK")
user: str = pydantic.Field(default_factory=xeauth_user)
request_date: datetime.datetime = pydantic.Field(
default_factory=datetime.datetime.utcnow
)
priority: int = -1
comments: str = ""
[docs]
def pre_update(self, datasource, new):
if new.user != self.user:
raise ValueError(new.user)
if new.run_id != self.run_id:
raise ValueError(new.run)
[docs]
def latest_context(self):
import utilix
import pymongo
contexts = utilix.xent_collection("contexts")
ctx = contexts.find_one(
{f"hashes.{self.data_type}": self.lineage_hash},
projection={"context": "$name", "env": "$tag", "_id": 0},
sort=[("date_added", pymongo.DESCENDING)],
)
return dict(ctx)