zico-agent / src /agents /dca /storage.py
ColettoG's picture
add: lending agent
a64d26e
"""State management for DCA intents with optional Panorama gateway persistence."""
from __future__ import annotations
import copy
import time
import logging
from datetime import datetime, timezone
from threading import Lock
from typing import Any, Dict, List, Optional
from src.integrations.panorama_gateway import (
PanoramaGatewayClient,
PanoramaGatewayError,
PanoramaGatewaySettings,
get_panorama_settings,
)
DCA_SESSION_ENTITY = "dca-sessions"
DCA_HISTORY_ENTITY = "dca-histories"
def _utc_now_iso() -> str:
return datetime.utcnow().replace(tzinfo=timezone.utc).isoformat()
def _identifier(user_id: str, conversation_id: str) -> str:
return f"{user_id}:{conversation_id}"
def _as_float(value: Any) -> Optional[float]:
if value is None:
return None
try:
return float(value)
except (TypeError, ValueError):
return None
class DcaStateRepository:
"""Stores DCA agent state via Panorama's gateway with local fallback."""
_instance: "DcaStateRepository" | None = None
_instance_lock: Lock = Lock()
def __init__(
self,
*,
client: PanoramaGatewayClient | None = None,
settings: PanoramaGatewaySettings | None = None,
history_limit: int = 10,
) -> None:
self._logger = logging.getLogger(__name__)
self._history_limit = history_limit
try:
self._settings = settings or get_panorama_settings()
self._client = client or PanoramaGatewayClient(self._settings)
self._use_gateway = True
except ValueError:
self._settings = None
self._client = None
self._use_gateway = False
self._init_local_store()
def _init_local_store(self) -> None:
if not hasattr(self, "_state"):
self._state = {"intents": {}, "metadata": {}, "history": {}}
def _fallback_to_local_store(self) -> None:
if self._use_gateway:
self._logger.warning("Panorama gateway unavailable for DCA state; switching to in-memory fallback.")
self._use_gateway = False
self._init_local_store()
def _handle_gateway_failure(self, exc: PanoramaGatewayError) -> None:
self._logger.warning(
"Panorama gateway error (%s) for DCA repository: %s",
getattr(exc, "status_code", "unknown"),
getattr(exc, "payload", exc),
)
self._fallback_to_local_store()
# ---- Singleton helpers -------------------------------------------------
@classmethod
def instance(cls) -> "DcaStateRepository":
if cls._instance is None:
with cls._instance_lock:
if cls._instance is None:
cls._instance = cls()
return cls._instance
@classmethod
def reset(cls) -> None:
with cls._instance_lock:
cls._instance = None
# ---- Core API ----------------------------------------------------------
def load_intent(self, user_id: str, conversation_id: str) -> Optional[Dict[str, Any]]:
if not self._use_gateway:
self._init_local_store()
record = self._state["intents"].get(_identifier(user_id, conversation_id))
if not record:
return None
return copy.deepcopy(record.get("intent"))
session = self._get_session(user_id, conversation_id)
if not self._use_gateway:
return self.load_intent(user_id, conversation_id)
if not session:
return None
return session.get("intent") or None
def persist_intent(
self,
user_id: str,
conversation_id: str,
intent: Dict[str, Any],
metadata: Dict[str, Any],
done: bool,
summary: Optional[Dict[str, Any]] = None,
) -> List[Dict[str, Any]]:
if not self._use_gateway:
self._init_local_store()
key = _identifier(user_id, conversation_id)
now = time.time()
if done:
self._state["intents"].pop(key, None)
else:
self._state["intents"][key] = {"intent": copy.deepcopy(intent), "updated_at": now}
if metadata:
meta_copy = copy.deepcopy(metadata)
meta_copy["updated_at"] = now
self._state["metadata"][key] = meta_copy
if done and summary:
history = self._state["history"].setdefault(key, [])
summary_copy = copy.deepcopy(summary)
summary_copy.setdefault("timestamp", now)
history.append(summary_copy)
self._state["history"][key] = history[-self._history_limit :]
return self.get_history(user_id, conversation_id)
try:
if done:
if summary:
self._create_history_entry(user_id, conversation_id, summary)
self._delete_session(user_id, conversation_id)
else:
payload = self._session_payload(intent, metadata)
self._upsert_session(user_id, conversation_id, payload)
return self.get_history(user_id, conversation_id)
except PanoramaGatewayError as exc:
self._handle_gateway_failure(exc)
return self.persist_intent(user_id, conversation_id, intent, metadata, done, summary)
def set_metadata(
self,
user_id: str,
conversation_id: str,
metadata: Dict[str, Any],
) -> None:
if not self._use_gateway:
self._init_local_store()
key = _identifier(user_id, conversation_id)
if metadata:
meta_copy = copy.deepcopy(metadata)
meta_copy["updated_at"] = time.time()
self._state["metadata"][key] = meta_copy
else:
self._state["metadata"].pop(key, None)
return
try:
if not metadata:
self._delete_session(user_id, conversation_id)
return
session = self._get_session(user_id, conversation_id)
if not self._use_gateway:
return self.set_metadata(user_id, conversation_id, metadata)
intent = session.get("intent") if session else {}
payload = self._session_payload(intent or {}, metadata)
self._upsert_session(user_id, conversation_id, payload)
except PanoramaGatewayError as exc:
self._handle_gateway_failure(exc)
self.set_metadata(user_id, conversation_id, metadata)
def clear_metadata(self, user_id: str, conversation_id: str) -> None:
self.set_metadata(user_id, conversation_id, {})
def clear_intent(self, user_id: str, conversation_id: str) -> None:
if not self._use_gateway:
self._init_local_store()
self._state["intents"].pop(_identifier(user_id, conversation_id), None)
self._state["metadata"].pop(_identifier(user_id, conversation_id), None)
return
try:
self._delete_session(user_id, conversation_id)
except PanoramaGatewayError as exc:
self._handle_gateway_failure(exc)
self.clear_intent(user_id, conversation_id)
def get_metadata(self, user_id: str, conversation_id: str) -> Dict[str, Any]:
if not self._use_gateway:
self._init_local_store()
record = self._state["metadata"].get(_identifier(user_id, conversation_id))
if not record:
return {}
entry = copy.deepcopy(record)
ts = entry.pop("updated_at", None)
if ts is not None:
entry["updated_at"] = datetime.fromtimestamp(float(ts), tz=timezone.utc).isoformat()
return entry
session = self._get_session(user_id, conversation_id)
if not self._use_gateway:
return self.get_metadata(user_id, conversation_id)
if not session:
return {}
intent = session.get("intent") or {}
metadata: Dict[str, Any] = {
"event": session.get("event"),
"status": session.get("status"),
"stage": session.get("stage"),
"missing_fields": session.get("missingFields") or [],
"next_field": session.get("nextField"),
"pending_question": session.get("pendingQuestion"),
"choices": session.get("choices") or [],
"error": session.get("errorMessage"),
"user_id": user_id,
"conversation_id": conversation_id,
}
metadata.update(intent)
history = self.get_history(user_id, conversation_id)
if history:
metadata["history"] = history
updated_at = session.get("updatedAt")
if updated_at:
metadata["updated_at"] = updated_at
return metadata
def get_history(
self,
user_id: str,
conversation_id: str,
limit: Optional[int] = None,
) -> List[Dict[str, Any]]:
if not self._use_gateway:
self._init_local_store()
history = self._state["history"].get(_identifier(user_id, conversation_id), [])
if not history:
return []
records = copy.deepcopy(history)
if limit:
records = records[-limit:]
for record in records:
ts = record.get("timestamp")
if ts is not None:
record["timestamp"] = datetime.fromtimestamp(float(ts), tz=timezone.utc).isoformat()
return records
effective_limit = limit or self._history_limit
try:
result = self._client.list(
DCA_HISTORY_ENTITY,
{
"where": {"userId": user_id, "conversationId": conversation_id},
"orderBy": {"recordedAt": "desc"},
"take": effective_limit,
},
)
except PanoramaGatewayError as exc:
if exc.status_code == 404:
return []
self._handle_gateway_failure(exc)
return self.get_history(user_id, conversation_id, limit)
except ValueError:
self._logger.warning("Invalid DCA history response from gateway; falling back to local store.")
self._fallback_to_local_store()
return self.get_history(user_id, conversation_id, limit)
data = result.get("data", []) if isinstance(result, dict) else []
history: List[Dict[str, Any]] = []
for entry in data:
summary_text = entry.get("summary")
meta_payload = entry.get("metadata") or {}
if not meta_payload:
meta_payload = {
"workflow_type": entry.get("workflowType"),
"cadence": entry.get("cadence"),
"tokens": entry.get("tokens"),
"amounts": entry.get("amounts"),
"strategy": entry.get("strategy"),
"venue": entry.get("venue"),
"slippage_bps": entry.get("slippageBps"),
"stop_conditions": entry.get("stopConditions"),
}
error_message = entry.get("errorMessage")
if error_message:
meta_payload = dict(meta_payload)
meta_payload["error"] = error_message
history.append(
{
"timestamp": entry.get("recordedAt"),
"summary": summary_text,
"metadata": meta_payload,
}
)
return history
# ---- Gateway helpers ---------------------------------------------------
def _session_payload(self, intent: Dict[str, Any], metadata: Dict[str, Any]) -> Dict[str, Any]:
payload = {
"intent": intent,
"event": metadata.get("event"),
"status": metadata.get("status"),
"stage": metadata.get("stage"),
"missingFields": metadata.get("missing_fields") or [],
"nextField": metadata.get("next_field"),
"pendingQuestion": metadata.get("ask") or metadata.get("pending_question"),
"choices": metadata.get("choices") or [],
"errorMessage": metadata.get("error"),
"updatedAt": metadata.get("updated_at") or _utc_now_iso(),
}
return payload
def _get_session(self, user_id: str, conversation_id: str) -> Dict[str, Any] | None:
identifier = _identifier(user_id, conversation_id)
try:
return self._client.get(DCA_SESSION_ENTITY, identifier)
except PanoramaGatewayError as exc:
if exc.status_code == 404:
return None
self._handle_gateway_failure(exc)
return None
def _upsert_session(self, user_id: str, conversation_id: str, payload: Dict[str, Any]) -> None:
identifier = _identifier(user_id, conversation_id)
record = {**payload, "updatedAt": _utc_now_iso()}
try:
self._client.update(DCA_SESSION_ENTITY, identifier, record)
except PanoramaGatewayError as exc:
if exc.status_code != 404:
raise
create_payload = {
"userId": user_id,
"conversationId": conversation_id,
"tenantId": self._settings.tenant_id,
**record,
}
try:
self._client.create(DCA_SESSION_ENTITY, create_payload)
except PanoramaGatewayError as create_exc:
if create_exc.status_code == 409:
return
if create_exc.status_code == 404:
raise
raise
def _delete_session(self, user_id: str, conversation_id: str) -> None:
identifier = _identifier(user_id, conversation_id)
try:
self._client.delete(DCA_SESSION_ENTITY, identifier)
except PanoramaGatewayError as exc:
if exc.status_code != 404:
self._handle_gateway_failure(exc)
raise
def _create_history_entry(self, user_id: str, conversation_id: str, summary: Dict[str, Any]) -> None:
history_payload = {
"userId": user_id,
"conversationId": conversation_id,
"summary": summary.get("summary"),
"workflowType": summary.get("workflow_type"),
"cadence": summary.get("cadence"),
"tokens": summary.get("tokens"),
"amounts": summary.get("amounts"),
"strategy": summary.get("strategy"),
"venue": summary.get("venue"),
"slippageBps": summary.get("slippage_bps"),
"stopConditions": summary.get("stop_conditions"),
"metadata": summary,
"errorMessage": summary.get("error"),
"recordedAt": _utc_now_iso(),
"tenantId": self._settings.tenant_id,
}
try:
self._client.create(DCA_HISTORY_ENTITY, history_payload)
except PanoramaGatewayError as exc:
if exc.status_code == 404:
raise
elif exc.status_code != 409:
raise