File size: 2,249 Bytes
c2b7a7b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
"""Blueprint framework defining facts, notes, and execution workflow."""

from __future__ import annotations

from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Dict, List, Optional

from ..data.catalog import DataCatalog


@dataclass
class Fact:
    """Atomic fact used to ground LLM responses."""

    subject: str
    predicate: str
    value: Any
    source: Optional[str] = None
    confidence: float = 1.0


@dataclass
class BlueprintResult:
    """Structured output of a blueprint execution."""

    name: str
    parameters: Dict[str, Any]
    facts: List[Fact]
    notes: List[str] = field(default_factory=list)

    def as_dict(self) -> Dict[str, Any]:
        return {
            "blueprint": self.name,
            "parameters": self.parameters,
            "facts": [fact.__dict__ for fact in self.facts],
            "notes": self.notes,
        }


@dataclass
class AnalysisContext:
    """Inputs provided to every blueprint."""

    catalog: DataCatalog
    events: List[Dict[str, Any]]


class Blueprint(ABC):
    """Abstract base for reusable analytic plans."""

    name: str

    @abstractmethod
    def run(self, context: AnalysisContext, **kwargs: Any) -> BlueprintResult:
        raise NotImplementedError


def normalize_datetime(value: Any) -> Optional[datetime]:
    """Utility used by event blueprints to normalize timestamps."""
    if value is None:
        return None
    if isinstance(value, datetime):
        return value
    if isinstance(value, (int, float)):
        try:
            return datetime.fromtimestamp(value)
        except (OSError, OverflowError, ValueError):
            return None
    if isinstance(value, str):
        cleaned = value.strip()
        if not cleaned:
            return None
        try:
            if cleaned.endswith("Z"):
                return datetime.fromisoformat(cleaned.replace("Z", "+00:00"))
            return datetime.fromisoformat(cleaned)
        except ValueError:
            pass
        for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d"):
            try:
                return datetime.strptime(cleaned, fmt)
            except ValueError:
                continue
    return None