"""State management for DCA intents with optional Panorama gateway persistence.""" from __future__ import annotations import copy import time import logging from datetime import datetime, timezone from threading import Lock from typing import Any, Dict, List, Optional from src.integrations.panorama_gateway import ( PanoramaGatewayClient, PanoramaGatewayError, PanoramaGatewaySettings, get_panorama_settings, ) DCA_SESSION_ENTITY = "dca-sessions" DCA_HISTORY_ENTITY = "dca-histories" def _utc_now_iso() -> str: return datetime.utcnow().replace(tzinfo=timezone.utc).isoformat() def _identifier(user_id: str, conversation_id: str) -> str: return f"{user_id}:{conversation_id}" def _as_float(value: Any) -> Optional[float]: if value is None: return None try: return float(value) except (TypeError, ValueError): return None class DcaStateRepository: """Stores DCA agent state via Panorama's gateway with local fallback.""" _instance: "DcaStateRepository" | None = None _instance_lock: Lock = Lock() def __init__( self, *, client: PanoramaGatewayClient | None = None, settings: PanoramaGatewaySettings | None = None, history_limit: int = 10, ) -> None: self._logger = logging.getLogger(__name__) self._history_limit = history_limit try: self._settings = settings or get_panorama_settings() self._client = client or PanoramaGatewayClient(self._settings) self._use_gateway = True except ValueError: self._settings = None self._client = None self._use_gateway = False self._init_local_store() def _init_local_store(self) -> None: if not hasattr(self, "_state"): self._state = {"intents": {}, "metadata": {}, "history": {}} def _fallback_to_local_store(self) -> None: if self._use_gateway: self._logger.warning("Panorama gateway unavailable for DCA state; switching to in-memory fallback.") self._use_gateway = False self._init_local_store() def _handle_gateway_failure(self, exc: PanoramaGatewayError) -> None: self._logger.warning( "Panorama gateway error (%s) for DCA repository: %s", getattr(exc, "status_code", "unknown"), getattr(exc, "payload", exc), ) self._fallback_to_local_store() # ---- Singleton helpers ------------------------------------------------- @classmethod def instance(cls) -> "DcaStateRepository": if cls._instance is None: with cls._instance_lock: if cls._instance is None: cls._instance = cls() return cls._instance @classmethod def reset(cls) -> None: with cls._instance_lock: cls._instance = None # ---- Core API ---------------------------------------------------------- def load_intent(self, user_id: str, conversation_id: str) -> Optional[Dict[str, Any]]: if not self._use_gateway: self._init_local_store() record = self._state["intents"].get(_identifier(user_id, conversation_id)) if not record: return None return copy.deepcopy(record.get("intent")) session = self._get_session(user_id, conversation_id) if not self._use_gateway: return self.load_intent(user_id, conversation_id) if not session: return None return session.get("intent") or None def persist_intent( self, user_id: str, conversation_id: str, intent: Dict[str, Any], metadata: Dict[str, Any], done: bool, summary: Optional[Dict[str, Any]] = None, ) -> List[Dict[str, Any]]: if not self._use_gateway: self._init_local_store() key = _identifier(user_id, conversation_id) now = time.time() if done: self._state["intents"].pop(key, None) else: self._state["intents"][key] = {"intent": copy.deepcopy(intent), "updated_at": now} if metadata: meta_copy = copy.deepcopy(metadata) meta_copy["updated_at"] = now self._state["metadata"][key] = meta_copy if done and summary: history = self._state["history"].setdefault(key, []) summary_copy = copy.deepcopy(summary) summary_copy.setdefault("timestamp", now) history.append(summary_copy) self._state["history"][key] = history[-self._history_limit :] return self.get_history(user_id, conversation_id) try: if done: if summary: self._create_history_entry(user_id, conversation_id, summary) self._delete_session(user_id, conversation_id) else: payload = self._session_payload(intent, metadata) self._upsert_session(user_id, conversation_id, payload) return self.get_history(user_id, conversation_id) except PanoramaGatewayError as exc: self._handle_gateway_failure(exc) return self.persist_intent(user_id, conversation_id, intent, metadata, done, summary) def set_metadata( self, user_id: str, conversation_id: str, metadata: Dict[str, Any], ) -> None: if not self._use_gateway: self._init_local_store() key = _identifier(user_id, conversation_id) if metadata: meta_copy = copy.deepcopy(metadata) meta_copy["updated_at"] = time.time() self._state["metadata"][key] = meta_copy else: self._state["metadata"].pop(key, None) return try: if not metadata: self._delete_session(user_id, conversation_id) return session = self._get_session(user_id, conversation_id) if not self._use_gateway: return self.set_metadata(user_id, conversation_id, metadata) intent = session.get("intent") if session else {} payload = self._session_payload(intent or {}, metadata) self._upsert_session(user_id, conversation_id, payload) except PanoramaGatewayError as exc: self._handle_gateway_failure(exc) self.set_metadata(user_id, conversation_id, metadata) def clear_metadata(self, user_id: str, conversation_id: str) -> None: self.set_metadata(user_id, conversation_id, {}) def clear_intent(self, user_id: str, conversation_id: str) -> None: if not self._use_gateway: self._init_local_store() self._state["intents"].pop(_identifier(user_id, conversation_id), None) self._state["metadata"].pop(_identifier(user_id, conversation_id), None) return try: self._delete_session(user_id, conversation_id) except PanoramaGatewayError as exc: self._handle_gateway_failure(exc) self.clear_intent(user_id, conversation_id) def get_metadata(self, user_id: str, conversation_id: str) -> Dict[str, Any]: if not self._use_gateway: self._init_local_store() record = self._state["metadata"].get(_identifier(user_id, conversation_id)) if not record: return {} entry = copy.deepcopy(record) ts = entry.pop("updated_at", None) if ts is not None: entry["updated_at"] = datetime.fromtimestamp(float(ts), tz=timezone.utc).isoformat() return entry session = self._get_session(user_id, conversation_id) if not self._use_gateway: return self.get_metadata(user_id, conversation_id) if not session: return {} intent = session.get("intent") or {} metadata: Dict[str, Any] = { "event": session.get("event"), "status": session.get("status"), "stage": session.get("stage"), "missing_fields": session.get("missingFields") or [], "next_field": session.get("nextField"), "pending_question": session.get("pendingQuestion"), "choices": session.get("choices") or [], "error": session.get("errorMessage"), "user_id": user_id, "conversation_id": conversation_id, } metadata.update(intent) history = self.get_history(user_id, conversation_id) if history: metadata["history"] = history updated_at = session.get("updatedAt") if updated_at: metadata["updated_at"] = updated_at return metadata def get_history( self, user_id: str, conversation_id: str, limit: Optional[int] = None, ) -> List[Dict[str, Any]]: if not self._use_gateway: self._init_local_store() history = self._state["history"].get(_identifier(user_id, conversation_id), []) if not history: return [] records = copy.deepcopy(history) if limit: records = records[-limit:] for record in records: ts = record.get("timestamp") if ts is not None: record["timestamp"] = datetime.fromtimestamp(float(ts), tz=timezone.utc).isoformat() return records effective_limit = limit or self._history_limit try: result = self._client.list( DCA_HISTORY_ENTITY, { "where": {"userId": user_id, "conversationId": conversation_id}, "orderBy": {"recordedAt": "desc"}, "take": effective_limit, }, ) except PanoramaGatewayError as exc: if exc.status_code == 404: return [] self._handle_gateway_failure(exc) return self.get_history(user_id, conversation_id, limit) except ValueError: self._logger.warning("Invalid DCA history response from gateway; falling back to local store.") self._fallback_to_local_store() return self.get_history(user_id, conversation_id, limit) data = result.get("data", []) if isinstance(result, dict) else [] history: List[Dict[str, Any]] = [] for entry in data: summary_text = entry.get("summary") meta_payload = entry.get("metadata") or {} if not meta_payload: meta_payload = { "workflow_type": entry.get("workflowType"), "cadence": entry.get("cadence"), "tokens": entry.get("tokens"), "amounts": entry.get("amounts"), "strategy": entry.get("strategy"), "venue": entry.get("venue"), "slippage_bps": entry.get("slippageBps"), "stop_conditions": entry.get("stopConditions"), } error_message = entry.get("errorMessage") if error_message: meta_payload = dict(meta_payload) meta_payload["error"] = error_message history.append( { "timestamp": entry.get("recordedAt"), "summary": summary_text, "metadata": meta_payload, } ) return history # ---- Gateway helpers --------------------------------------------------- def _session_payload(self, intent: Dict[str, Any], metadata: Dict[str, Any]) -> Dict[str, Any]: payload = { "intent": intent, "event": metadata.get("event"), "status": metadata.get("status"), "stage": metadata.get("stage"), "missingFields": metadata.get("missing_fields") or [], "nextField": metadata.get("next_field"), "pendingQuestion": metadata.get("ask") or metadata.get("pending_question"), "choices": metadata.get("choices") or [], "errorMessage": metadata.get("error"), "updatedAt": metadata.get("updated_at") or _utc_now_iso(), } return payload def _get_session(self, user_id: str, conversation_id: str) -> Dict[str, Any] | None: identifier = _identifier(user_id, conversation_id) try: return self._client.get(DCA_SESSION_ENTITY, identifier) except PanoramaGatewayError as exc: if exc.status_code == 404: return None self._handle_gateway_failure(exc) return None def _upsert_session(self, user_id: str, conversation_id: str, payload: Dict[str, Any]) -> None: identifier = _identifier(user_id, conversation_id) record = {**payload, "updatedAt": _utc_now_iso()} try: self._client.update(DCA_SESSION_ENTITY, identifier, record) except PanoramaGatewayError as exc: if exc.status_code != 404: raise create_payload = { "userId": user_id, "conversationId": conversation_id, "tenantId": self._settings.tenant_id, **record, } try: self._client.create(DCA_SESSION_ENTITY, create_payload) except PanoramaGatewayError as create_exc: if create_exc.status_code == 409: return if create_exc.status_code == 404: raise raise def _delete_session(self, user_id: str, conversation_id: str) -> None: identifier = _identifier(user_id, conversation_id) try: self._client.delete(DCA_SESSION_ENTITY, identifier) except PanoramaGatewayError as exc: if exc.status_code != 404: self._handle_gateway_failure(exc) raise def _create_history_entry(self, user_id: str, conversation_id: str, summary: Dict[str, Any]) -> None: history_payload = { "userId": user_id, "conversationId": conversation_id, "summary": summary.get("summary"), "workflowType": summary.get("workflow_type"), "cadence": summary.get("cadence"), "tokens": summary.get("tokens"), "amounts": summary.get("amounts"), "strategy": summary.get("strategy"), "venue": summary.get("venue"), "slippageBps": summary.get("slippage_bps"), "stopConditions": summary.get("stop_conditions"), "metadata": summary, "errorMessage": summary.get("error"), "recordedAt": _utc_now_iso(), "tenantId": self._settings.tenant_id, } try: self._client.create(DCA_HISTORY_ENTITY, history_payload) except PanoramaGatewayError as exc: if exc.status_code == 404: raise elif exc.status_code != 409: raise