| """Thread-safe state management for Magentic agents. |
| |
| Uses contextvars to ensure isolation between concurrent requests (e.g., multiple users |
| searching simultaneously via Gradio). |
| """ |
|
|
| from contextvars import ContextVar |
| from typing import TYPE_CHECKING, Any, cast |
|
|
| from pydantic import BaseModel |
|
|
| from src.services.research_memory import ResearchMemory |
|
|
| if TYPE_CHECKING: |
| from src.services.embedding_protocol import EmbeddingServiceProtocol |
| from src.utils.models import Evidence |
|
|
|
|
| class MagenticState(BaseModel): |
| """Mutable state for a Magentic workflow session.""" |
|
|
| |
| memory: Any = None |
|
|
| model_config = {"arbitrary_types_allowed": True} |
|
|
| |
|
|
| async def add_evidence(self, evidence: list["Evidence"]) -> int: |
| """Add evidence to memory with deduplication and embedding storage. |
| |
| This method delegates to ResearchMemory.store_evidence() which: |
| 1. Performs semantic deduplication (threshold 0.9) |
| 2. Stores unique evidence in the vector store |
| 3. Caches evidence for retrieval |
| |
| Args: |
| evidence: List of Evidence objects to store. |
| |
| Returns: |
| Number of new (non-duplicate) evidence items stored. |
| """ |
| if self.memory is None: |
| return 0 |
|
|
| memory: ResearchMemory = self.memory |
| initial_count = len(memory.evidence_ids) |
| await memory.store_evidence(evidence) |
| return len(memory.evidence_ids) - initial_count |
|
|
| @property |
| def embedding_service(self) -> "EmbeddingServiceProtocol | None": |
| """Get the embedding service from memory.""" |
| if self.memory is None: |
| return None |
| |
| from src.services.embedding_protocol import EmbeddingServiceProtocol |
|
|
| return cast(EmbeddingServiceProtocol | None, self.memory._embedding_service) |
|
|
|
|
| |
| _magentic_state_var: ContextVar[MagenticState | None] = ContextVar("magentic_state", default=None) |
|
|
|
|
| def init_magentic_state( |
| query: str, embedding_service: "EmbeddingServiceProtocol | None" = None |
| ) -> MagenticState: |
| """Initialize a new state for the current context.""" |
| memory = ResearchMemory(query=query, embedding_service=embedding_service) |
| state = MagenticState(memory=memory) |
| _magentic_state_var.set(state) |
| return state |
|
|
|
|
| def get_magentic_state() -> MagenticState: |
| """Get the current state. Raises RuntimeError if not initialized.""" |
| state = _magentic_state_var.get() |
| if state is None: |
| raise RuntimeError("MagenticState not initialized. Call init_magentic_state() first.") |
| return state |
|
|