from typing import List import asyncio from sqlalchemy.ext.asyncio import AsyncSession from app.db.models import ScamEvent, Entity, EventEntity from app.schemas.response import AnalyzeResponse from app.db.session import AsyncSessionLocal from sqlalchemy import select, update import uuid from datetime import datetime from app.services.scam_graph import scam_graph from app.services.simulation_engine import simulation_engine class MemoryService: async def save_event(self, response: AnalyzeResponse, metadata: dict, embedding: list = None): async with AsyncSessionLocal() as session: # 1. Save Event to Postgres event = ScamEvent( id=uuid.UUID(response.id), text=response.text, source=response.source, risk_score=response.risk_score, decision=response.decision, event_metadata={ "reasons": response.reasons, "intent": response.intent.dict(), "entities": response.entities.dict(), "live_intel": { "evidence": response.evidence, "claimed_brand": response.claimed_brand, "official_verify": response.official_verify.dict() if response.official_verify else None, "next_steps": response.next_steps, "verdict_synthesis": response.verdict_synthesis, "breadcrumbs": response.breadcrumbs, "similarity": response.similarity }, **metadata }, embedding=embedding ) session.add(event) # 2. Add to Scam Journey Graph (Depth) graph_entities = { "phones": response.entities.phones, "upi_ids": response.entities.upi_ids, "links": response.entities.domains } # Normalize intent signals for graph signals = { "urgency": int(response.intent.urgency * 10), "impersonation": int(response.intent.impersonation * 10), "payment": int(response.intent.payment * 10) } scam_graph.add_event(response.source, graph_entities, signals) # 3. Optimization: Autonomous Security Drill if response.risk_score > 85: drill_scenario = f"The user has received this suspicious intake: '{response.text[:200]}...'. Simulation: What is the most likely social engineering escalation the scammer will try next?" # Trigger background simulation asyncio.create_task(simulation_engine.run_simulation(drill_scenario, {"source": response.source, "risk": response.risk_score})) # 4. Process Entities for relational mapping for phone in response.entities.phones: await self._upsert_entity(session, "phone", phone, event.id) for upi in response.entities.upi_ids: await self._upsert_entity(session, "upi", upi, event.id) for domain in response.entities.domains: await self._upsert_entity(session, "domain", domain, event.id) await session.commit() async def _upsert_entity(self, session: AsyncSession, e_type: str, value: str, event_id: uuid.UUID): stmt = select(Entity).where(Entity.type == e_type, Entity.value == value) result = await session.execute(stmt) entity = result.scalar_one_or_none() if not entity: entity = Entity(type=e_type, value=value) session.add(entity) await session.flush() else: entity.last_seen = datetime.utcnow() # Link to event link = EventEntity(event_id=event_id, entity_id=entity.id) session.add(link) async def check_overlap(self, phones: List[str], upis: List[str], domains: List[str]) -> List[dict]: """Detect if these entities have appeared in previous reports.""" async with AsyncSessionLocal() as session: overlaps = [] all_vals = [("phone", p) for p in phones] + [("upi", u) for u in upis] + [("domain", d) for d in domains] for e_type, val in all_vals: stmt = select(Entity).where(Entity.type == e_type, Entity.value == val) result = await session.execute(stmt) entity = result.scalar_one_or_none() if entity: overlaps.append({ "source": "Janus Memory Graph", "signal": "infrastructure_overlap", "value": val, "severity": "high", "explanation": f"This '{val}' has been encountered in previous reports. Recidivism detected." }) return overlaps memory_service = MemoryService()