| | """
|
| | PostgreSQL Database Module.
|
| |
|
| | Provides connection management and CRUD operations for:
|
| | - Conversations
|
| | - Messages
|
| | - Extracted Intelligence
|
| | """
|
| |
|
| | from typing import Dict, List, Optional, Any
|
| | import os
|
| | import time
|
| | from contextlib import contextmanager
|
| |
|
| | from sqlalchemy import create_engine, text, inspect
|
| | from sqlalchemy.orm import sessionmaker, Session
|
| | from sqlalchemy.exc import SQLAlchemyError
|
| |
|
| | from app.config import settings
|
| | from app.utils.logger import get_logger
|
| |
|
| | logger = get_logger(__name__)
|
| |
|
| |
|
| | engine = None
|
| | SessionLocal = None
|
| |
|
| |
|
| | _postgres_unavailable: bool = False
|
| | _postgres_last_check: float = 0
|
| | _POSTGRES_RECHECK_INTERVAL = 60
|
| |
|
| |
|
| | def init_engine() -> None:
|
| | """
|
| | Initialize SQLAlchemy engine from configuration.
|
| |
|
| | Raises:
|
| | ValueError: If POSTGRES_URL is not configured
|
| | """
|
| | global engine, SessionLocal
|
| |
|
| | if engine is not None:
|
| | return
|
| |
|
| | postgres_url = settings.POSTGRES_URL
|
| |
|
| | if not postgres_url:
|
| | logger.warning("POSTGRES_URL not configured. Database operations will fail.")
|
| | return
|
| |
|
| | try:
|
| | engine = create_engine(
|
| | postgres_url,
|
| | pool_pre_ping=True,
|
| | pool_size=5,
|
| | max_overflow=10,
|
| | echo=False,
|
| | connect_args={"connect_timeout": 2},
|
| | )
|
| | SessionLocal = sessionmaker(bind=engine, autocommit=False, autoflush=False)
|
| | logger.info("PostgreSQL engine initialized successfully")
|
| | except Exception as e:
|
| | logger.error(f"Failed to initialize PostgreSQL engine: {e}")
|
| | raise
|
| |
|
| |
|
| | def get_db_connection():
|
| | """
|
| | Get PostgreSQL database connection.
|
| |
|
| | Returns:
|
| | Database connection object
|
| |
|
| | Raises:
|
| | ConnectionError: If database connection fails
|
| | ValueError: If POSTGRES_URL is not configured
|
| | """
|
| | if engine is None:
|
| | init_engine()
|
| |
|
| | if engine is None:
|
| | raise ConnectionError("PostgreSQL engine not initialized. Check POSTGRES_URL configuration.")
|
| |
|
| | try:
|
| | return engine.connect()
|
| | except SQLAlchemyError as e:
|
| | logger.error(f"Failed to get database connection: {e}")
|
| | raise ConnectionError(f"Database connection failed: {e}") from e
|
| |
|
| |
|
| | @contextmanager
|
| | def get_db_session():
|
| | """
|
| | Get database session context manager.
|
| |
|
| | Yields:
|
| | SQLAlchemy Session
|
| |
|
| | Example:
|
| | with get_db_session() as session:
|
| | # Use session
|
| | pass
|
| | """
|
| | if SessionLocal is None:
|
| | init_engine()
|
| |
|
| | if SessionLocal is None:
|
| | raise ConnectionError("PostgreSQL session factory not initialized. Check POSTGRES_URL configuration.")
|
| |
|
| | session = SessionLocal()
|
| | try:
|
| | yield session
|
| | session.commit()
|
| | except Exception:
|
| | session.rollback()
|
| | raise
|
| | finally:
|
| | session.close()
|
| |
|
| |
|
| | def init_database() -> None:
|
| | """
|
| | Initialize database with schema.
|
| |
|
| | Creates tables:
|
| | - conversations
|
| | - messages
|
| | - extracted_intelligence
|
| |
|
| | Also creates required indexes.
|
| |
|
| | Raises:
|
| | ConnectionError: If database connection fails
|
| | SQLAlchemyError: If schema creation fails
|
| | """
|
| | if engine is None:
|
| | init_engine()
|
| |
|
| | if engine is None:
|
| | raise ConnectionError("PostgreSQL engine not initialized. Check POSTGRES_URL configuration.")
|
| |
|
| |
|
| | schema_statements = [
|
| |
|
| | """
|
| | CREATE TABLE IF NOT EXISTS conversations (
|
| | id SERIAL PRIMARY KEY,
|
| | session_id VARCHAR(255) UNIQUE NOT NULL,
|
| | language VARCHAR(10) NOT NULL,
|
| | persona VARCHAR(50),
|
| | scam_detected BOOLEAN DEFAULT FALSE,
|
| | confidence FLOAT,
|
| | turn_count INTEGER DEFAULT 0,
|
| | created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
| | updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
| | )
|
| | """,
|
| | """
|
| | CREATE TABLE IF NOT EXISTS messages (
|
| | id SERIAL PRIMARY KEY,
|
| | conversation_id INTEGER REFERENCES conversations(id) ON DELETE CASCADE,
|
| | turn_number INTEGER NOT NULL,
|
| | sender VARCHAR(50) NOT NULL,
|
| | message TEXT NOT NULL,
|
| | timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
| | )
|
| | """,
|
| | """
|
| | CREATE TABLE IF NOT EXISTS extracted_intelligence (
|
| | id SERIAL PRIMARY KEY,
|
| | conversation_id INTEGER REFERENCES conversations(id) ON DELETE CASCADE,
|
| | upi_ids TEXT[],
|
| | bank_accounts TEXT[],
|
| | ifsc_codes TEXT[],
|
| | phone_numbers TEXT[],
|
| | phishing_links TEXT[],
|
| | extraction_confidence FLOAT,
|
| | created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
| | )
|
| | """,
|
| |
|
| | "CREATE INDEX IF NOT EXISTS idx_session_id ON conversations(session_id)",
|
| | "CREATE INDEX IF NOT EXISTS idx_conversation_id ON messages(conversation_id)",
|
| | "CREATE INDEX IF NOT EXISTS idx_created_at ON conversations(created_at)",
|
| | "CREATE INDEX IF NOT EXISTS idx_scam_detected ON conversations(scam_detected)",
|
| | ]
|
| |
|
| | try:
|
| | with engine.begin() as conn:
|
| | for statement in schema_statements:
|
| | statement = statement.strip()
|
| | if statement:
|
| | try:
|
| | conn.execute(text(statement))
|
| | except SQLAlchemyError as e:
|
| |
|
| | error_str = str(e).lower()
|
| | if "already exists" not in error_str and "duplicate" not in error_str:
|
| | logger.warning(f"Schema creation warning for statement: {e}")
|
| |
|
| | if "does not exist" in error_str and "index" in statement.lower():
|
| | logger.debug(f"Skipping index creation (table may not exist yet): {e}")
|
| | else:
|
| | raise
|
| |
|
| | logger.info("Database schema initialized successfully")
|
| | except SQLAlchemyError as e:
|
| | logger.error(f"Failed to initialize database schema: {e}")
|
| | raise
|
| |
|
| |
|
| | def verify_schema() -> bool:
|
| | """
|
| | Verify that all required tables and indexes exist.
|
| |
|
| | Returns:
|
| | True if schema is complete, False otherwise
|
| | """
|
| | if engine is None:
|
| | return False
|
| |
|
| | try:
|
| | inspector = inspect(engine)
|
| | tables = inspector.get_table_names()
|
| |
|
| | required_tables = ['conversations', 'messages', 'extracted_intelligence']
|
| | missing_tables = [t for t in required_tables if t not in tables]
|
| |
|
| | if missing_tables:
|
| | logger.warning(f"Missing tables: {missing_tables}")
|
| | return False
|
| |
|
| |
|
| | indexes = inspector.get_indexes('conversations')
|
| | index_names = [idx['name'] for idx in indexes]
|
| | required_indexes = ['idx_session_id', 'idx_created_at', 'idx_scam_detected']
|
| | missing_indexes = [idx for idx in required_indexes if idx not in index_names]
|
| |
|
| | if missing_indexes:
|
| | logger.warning(f"Missing indexes on conversations: {missing_indexes}")
|
| |
|
| | return True
|
| | except Exception as e:
|
| | logger.error(f"Failed to verify schema: {e}")
|
| | return False
|
| |
|
| |
|
| | def save_conversation(session_id: str, conversation_data: Dict[str, Any]) -> int:
|
| | """
|
| | Save conversation to PostgreSQL.
|
| |
|
| | Implements AC-2.3.3: PostgreSQL stores complete logs.
|
| |
|
| | Args:
|
| | session_id: Unique session identifier
|
| | conversation_data: Conversation data including:
|
| | - language: Detected language
|
| | - persona: Active persona name
|
| | - scam_confidence: Detection confidence
|
| | - turn_count: Number of turns
|
| | - messages: List of message dicts
|
| | - extracted_intel: Optional intelligence data
|
| |
|
| | Returns:
|
| | Conversation ID (0 if failed)
|
| | """
|
| | global _postgres_unavailable, _postgres_last_check
|
| |
|
| |
|
| | if _postgres_unavailable:
|
| | if time.time() - _postgres_last_check < _POSTGRES_RECHECK_INTERVAL:
|
| | logger.debug("PostgreSQL unavailable (cached), skipping save")
|
| | return 0
|
| |
|
| | _postgres_unavailable = False
|
| |
|
| | if engine is None:
|
| | init_engine()
|
| |
|
| | if engine is None:
|
| | logger.error("Cannot save conversation: Database not initialized")
|
| | return 0
|
| |
|
| | try:
|
| | with engine.connect() as conn:
|
| |
|
| | check_sql = text(
|
| | "SELECT id FROM conversations WHERE session_id = :session_id"
|
| | )
|
| | result = conn.execute(check_sql, {"session_id": session_id})
|
| | existing = result.fetchone()
|
| |
|
| | if existing:
|
| |
|
| | update_sql = text("""
|
| | UPDATE conversations
|
| | SET language = :language,
|
| | persona = :persona,
|
| | scam_detected = :scam_detected,
|
| | confidence = :confidence,
|
| | turn_count = :turn_count,
|
| | updated_at = CURRENT_TIMESTAMP
|
| | WHERE session_id = :session_id
|
| | RETURNING id
|
| | """)
|
| | result = conn.execute(update_sql, {
|
| | "session_id": session_id,
|
| | "language": conversation_data.get("language", "en"),
|
| | "persona": conversation_data.get("persona"),
|
| | "scam_detected": conversation_data.get("scam_confidence", 0) >= 0.7,
|
| | "confidence": conversation_data.get("scam_confidence", 0.0),
|
| | "turn_count": conversation_data.get("turn_count", 0),
|
| | })
|
| | row = result.fetchone()
|
| | conversation_id = row[0] if row else existing[0]
|
| | else:
|
| |
|
| | insert_sql = text("""
|
| | INSERT INTO conversations
|
| | (session_id, language, persona, scam_detected, confidence, turn_count)
|
| | VALUES (:session_id, :language, :persona, :scam_detected, :confidence, :turn_count)
|
| | RETURNING id
|
| | """)
|
| | result = conn.execute(insert_sql, {
|
| | "session_id": session_id,
|
| | "language": conversation_data.get("language", "en"),
|
| | "persona": conversation_data.get("persona"),
|
| | "scam_detected": conversation_data.get("scam_confidence", 0) >= 0.7,
|
| | "confidence": conversation_data.get("scam_confidence", 0.0),
|
| | "turn_count": conversation_data.get("turn_count", 0),
|
| | })
|
| | row = result.fetchone()
|
| | conversation_id = row[0] if row else 0
|
| |
|
| | conn.commit()
|
| |
|
| |
|
| | messages = conversation_data.get("messages", [])
|
| | if messages and conversation_id > 0:
|
| | save_messages(conversation_id, messages)
|
| |
|
| |
|
| | extracted_intel = conversation_data.get("extracted_intel", {})
|
| | extraction_confidence = conversation_data.get("extraction_confidence", 0.0)
|
| | if extracted_intel and conversation_id > 0:
|
| | save_intelligence(conversation_id, extracted_intel, extraction_confidence)
|
| |
|
| | logger.info(f"Conversation saved: session_id={session_id}, id={conversation_id}")
|
| | return conversation_id
|
| |
|
| | except SQLAlchemyError as e:
|
| | logger.error(f"Failed to save conversation: {e}")
|
| |
|
| | _postgres_unavailable = True
|
| | _postgres_last_check = time.time()
|
| | return 0
|
| |
|
| |
|
| | def get_conversation(session_id: str) -> Optional[Dict[str, Any]]:
|
| | """
|
| | Retrieve conversation by session ID.
|
| |
|
| | Args:
|
| | session_id: Session identifier
|
| |
|
| | Returns:
|
| | Conversation data including messages, or None if not found
|
| | """
|
| | if engine is None:
|
| | init_engine()
|
| |
|
| | if engine is None:
|
| | logger.error("Cannot get conversation: Database not initialized")
|
| | return None
|
| |
|
| | try:
|
| | with engine.connect() as conn:
|
| |
|
| | conv_sql = text("""
|
| | SELECT id, session_id, language, persona, scam_detected,
|
| | confidence, turn_count, created_at, updated_at
|
| | FROM conversations
|
| | WHERE session_id = :session_id
|
| | """)
|
| | result = conn.execute(conv_sql, {"session_id": session_id})
|
| | row = result.fetchone()
|
| |
|
| | if not row:
|
| | return None
|
| |
|
| | conversation_id = row[0]
|
| |
|
| |
|
| | msg_sql = text("""
|
| | SELECT turn_number, sender, message, timestamp
|
| | FROM messages
|
| | WHERE conversation_id = :conversation_id
|
| | ORDER BY turn_number
|
| | """)
|
| | msg_result = conn.execute(msg_sql, {"conversation_id": conversation_id})
|
| | messages = [
|
| | {
|
| | "turn": msg_row[0],
|
| | "sender": msg_row[1],
|
| | "message": msg_row[2],
|
| | "timestamp": msg_row[3].isoformat() if msg_row[3] else None,
|
| | }
|
| | for msg_row in msg_result.fetchall()
|
| | ]
|
| |
|
| |
|
| | intel_sql = text("""
|
| | SELECT upi_ids, bank_accounts, ifsc_codes, phone_numbers,
|
| | phishing_links, extraction_confidence
|
| | FROM extracted_intelligence
|
| | WHERE conversation_id = :conversation_id
|
| | ORDER BY created_at DESC
|
| | LIMIT 1
|
| | """)
|
| | intel_result = conn.execute(intel_sql, {"conversation_id": conversation_id})
|
| | intel_row = intel_result.fetchone()
|
| |
|
| | extracted_intel = {}
|
| | extraction_confidence = 0.0
|
| | if intel_row:
|
| | extracted_intel = {
|
| | "upi_ids": intel_row[0] or [],
|
| | "bank_accounts": intel_row[1] or [],
|
| | "ifsc_codes": intel_row[2] or [],
|
| | "phone_numbers": intel_row[3] or [],
|
| | "phishing_links": intel_row[4] or [],
|
| | }
|
| | extraction_confidence = intel_row[5] or 0.0
|
| |
|
| | return {
|
| | "id": row[0],
|
| | "session_id": row[1],
|
| | "language": row[2],
|
| | "persona": row[3],
|
| | "scam_detected": row[4],
|
| | "scam_confidence": row[5],
|
| | "turn_count": row[6],
|
| | "created_at": row[7].isoformat() if row[7] else None,
|
| | "updated_at": row[8].isoformat() if row[8] else None,
|
| | "messages": messages,
|
| | "extracted_intel": extracted_intel,
|
| | "extraction_confidence": extraction_confidence,
|
| | }
|
| |
|
| | except SQLAlchemyError as e:
|
| | logger.error(f"Failed to get conversation: {e}")
|
| | return None
|
| |
|
| |
|
| | def update_conversation(session_id: str, updates: Dict[str, Any]) -> bool:
|
| | """
|
| | Update existing conversation.
|
| |
|
| | Args:
|
| | session_id: Session identifier
|
| | updates: Fields to update (language, persona, scam_detected, confidence, turn_count)
|
| |
|
| | Returns:
|
| | True if successful, False otherwise
|
| | """
|
| | if engine is None:
|
| | init_engine()
|
| |
|
| | if engine is None:
|
| | logger.error("Cannot update conversation: Database not initialized")
|
| | return False
|
| |
|
| | if not updates:
|
| | return True
|
| |
|
| |
|
| | allowed_fields = {"language", "persona", "scam_detected", "confidence", "turn_count"}
|
| | update_fields = {k: v for k, v in updates.items() if k in allowed_fields}
|
| |
|
| | if not update_fields:
|
| | logger.warning(f"No valid fields to update: {updates.keys()}")
|
| | return False
|
| |
|
| | try:
|
| | with engine.connect() as conn:
|
| |
|
| | set_clauses = [f"{field} = :{field}" for field in update_fields]
|
| | set_clauses.append("updated_at = CURRENT_TIMESTAMP")
|
| | set_clause = ", ".join(set_clauses)
|
| |
|
| | update_sql = text(f"""
|
| | UPDATE conversations
|
| | SET {set_clause}
|
| | WHERE session_id = :session_id
|
| | """)
|
| |
|
| | params = {"session_id": session_id, **update_fields}
|
| | result = conn.execute(update_sql, params)
|
| | conn.commit()
|
| |
|
| | if result.rowcount > 0:
|
| | logger.info(f"Conversation updated: session_id={session_id}")
|
| | return True
|
| | else:
|
| | logger.warning(f"No conversation found to update: session_id={session_id}")
|
| | return False
|
| |
|
| | except SQLAlchemyError as e:
|
| | logger.error(f"Failed to update conversation: {e}")
|
| | return False
|
| |
|
| |
|
| | def save_messages(conversation_id: int, messages: List[Dict[str, Any]]) -> int:
|
| | """
|
| | Save messages to conversation.
|
| |
|
| | Args:
|
| | conversation_id: Parent conversation ID
|
| | messages: List of message dictionaries with turn, sender, message, timestamp
|
| |
|
| | Returns:
|
| | Number of messages saved
|
| | """
|
| | if engine is None:
|
| | init_engine()
|
| |
|
| | if engine is None:
|
| | logger.error("Cannot save messages: Database not initialized")
|
| | return 0
|
| |
|
| | if not messages:
|
| | return 0
|
| |
|
| | try:
|
| | with engine.connect() as conn:
|
| |
|
| | existing_sql = text("""
|
| | SELECT turn_number FROM messages
|
| | WHERE conversation_id = :conversation_id
|
| | """)
|
| | result = conn.execute(existing_sql, {"conversation_id": conversation_id})
|
| | existing_turns = {row[0] for row in result.fetchall()}
|
| |
|
| | saved_count = 0
|
| | for msg in messages:
|
| | turn = msg.get("turn", 0)
|
| |
|
| |
|
| | if turn in existing_turns:
|
| | continue
|
| |
|
| | insert_sql = text("""
|
| | INSERT INTO messages (conversation_id, turn_number, sender, message)
|
| | VALUES (:conversation_id, :turn_number, :sender, :message)
|
| | """)
|
| | conn.execute(insert_sql, {
|
| | "conversation_id": conversation_id,
|
| | "turn_number": turn,
|
| | "sender": msg.get("sender", "unknown"),
|
| | "message": msg.get("message", ""),
|
| | })
|
| | saved_count += 1
|
| | existing_turns.add(turn)
|
| |
|
| | conn.commit()
|
| | logger.debug(f"Saved {saved_count} messages for conversation {conversation_id}")
|
| | return saved_count
|
| |
|
| | except SQLAlchemyError as e:
|
| | logger.error(f"Failed to save messages: {e}")
|
| | return 0
|
| |
|
| |
|
| | def save_intelligence(
|
| | conversation_id: int,
|
| | intelligence: Dict[str, List[str]],
|
| | confidence: float,
|
| | ) -> int:
|
| | """
|
| | Save extracted intelligence to database.
|
| |
|
| | Args:
|
| | conversation_id: Parent conversation ID
|
| | intelligence: Extracted intelligence data with keys:
|
| | - upi_ids, bank_accounts, ifsc_codes, phone_numbers, phishing_links
|
| | confidence: Extraction confidence score
|
| |
|
| | Returns:
|
| | Intelligence record ID (0 if failed)
|
| | """
|
| | if engine is None:
|
| | init_engine()
|
| |
|
| | if engine is None:
|
| | logger.error("Cannot save intelligence: Database not initialized")
|
| | return 0
|
| |
|
| | try:
|
| | with engine.connect() as conn:
|
| | insert_sql = text("""
|
| | INSERT INTO extracted_intelligence
|
| | (conversation_id, upi_ids, bank_accounts, ifsc_codes,
|
| | phone_numbers, phishing_links, extraction_confidence)
|
| | VALUES (:conversation_id, :upi_ids, :bank_accounts, :ifsc_codes,
|
| | :phone_numbers, :phishing_links, :extraction_confidence)
|
| | RETURNING id
|
| | """)
|
| |
|
| | result = conn.execute(insert_sql, {
|
| | "conversation_id": conversation_id,
|
| | "upi_ids": intelligence.get("upi_ids", []),
|
| | "bank_accounts": intelligence.get("bank_accounts", []),
|
| | "ifsc_codes": intelligence.get("ifsc_codes", []),
|
| | "phone_numbers": intelligence.get("phone_numbers", []),
|
| | "phishing_links": intelligence.get("phishing_links", []),
|
| | "extraction_confidence": confidence,
|
| | })
|
| |
|
| | row = result.fetchone()
|
| | intel_id = row[0] if row else 0
|
| |
|
| | conn.commit()
|
| | logger.info(f"Intelligence saved: conversation_id={conversation_id}, id={intel_id}")
|
| | return intel_id
|
| |
|
| | except SQLAlchemyError as e:
|
| | logger.error(f"Failed to save intelligence: {e}")
|
| | return 0
|
| |
|
| |
|
| | def get_conversations_by_date(start_date: str, end_date: str) -> List[Dict[str, Any]]:
|
| | """
|
| | Get conversations within date range.
|
| |
|
| | Args:
|
| | start_date: Start date (ISO format: YYYY-MM-DD)
|
| | end_date: End date (ISO format: YYYY-MM-DD)
|
| |
|
| | Returns:
|
| | List of conversation records
|
| | """
|
| | if engine is None:
|
| | init_engine()
|
| |
|
| | if engine is None:
|
| | logger.error("Cannot query conversations: Database not initialized")
|
| | return []
|
| |
|
| | try:
|
| | with engine.connect() as conn:
|
| | query_sql = text("""
|
| | SELECT id, session_id, language, persona, scam_detected,
|
| | confidence, turn_count, created_at, updated_at
|
| | FROM conversations
|
| | WHERE created_at >= :start_date AND created_at < :end_date
|
| | ORDER BY created_at DESC
|
| | """)
|
| |
|
| | result = conn.execute(query_sql, {
|
| | "start_date": start_date,
|
| | "end_date": end_date,
|
| | })
|
| |
|
| | conversations = []
|
| | for row in result.fetchall():
|
| | conversations.append({
|
| | "id": row[0],
|
| | "session_id": row[1],
|
| | "language": row[2],
|
| | "persona": row[3],
|
| | "scam_detected": row[4],
|
| | "confidence": row[5],
|
| | "turn_count": row[6],
|
| | "created_at": row[7].isoformat() if row[7] else None,
|
| | "updated_at": row[8].isoformat() if row[8] else None,
|
| | })
|
| |
|
| | return conversations
|
| |
|
| | except SQLAlchemyError as e:
|
| | logger.error(f"Failed to get conversations by date: {e}")
|
| | return []
|
| |
|
| |
|
| | def get_scammer_profiles() -> List[Dict[str, Any]]:
|
| | """
|
| | Get aggregated scammer profiles from extracted intelligence.
|
| |
|
| | Returns:
|
| | List of scammer profile data with aggregated phone numbers, UPI IDs, etc.
|
| | """
|
| | if engine is None:
|
| | init_engine()
|
| |
|
| | if engine is None:
|
| | logger.error("Cannot get scammer profiles: Database not initialized")
|
| | return []
|
| |
|
| | try:
|
| | with engine.connect() as conn:
|
| |
|
| | query_sql = text("""
|
| | SELECT c.session_id, c.language, c.persona, c.confidence,
|
| | e.upi_ids, e.bank_accounts, e.ifsc_codes,
|
| | e.phone_numbers, e.phishing_links,
|
| | e.extraction_confidence, c.created_at
|
| | FROM extracted_intelligence e
|
| | JOIN conversations c ON e.conversation_id = c.id
|
| | WHERE c.scam_detected = true
|
| | ORDER BY c.created_at DESC
|
| | """)
|
| |
|
| | result = conn.execute(query_sql)
|
| |
|
| |
|
| | profiles: Dict[str, Dict[str, Any]] = {}
|
| |
|
| | for row in result.fetchall():
|
| | phone_numbers = row[7] or []
|
| | upi_ids = row[4] or []
|
| |
|
| |
|
| | profile_key = None
|
| | if phone_numbers:
|
| | profile_key = phone_numbers[0]
|
| | elif upi_ids:
|
| | profile_key = upi_ids[0]
|
| |
|
| | if not profile_key:
|
| | continue
|
| |
|
| | if profile_key not in profiles:
|
| | profiles[profile_key] = {
|
| | "identifier": profile_key,
|
| | "phone_numbers": set(),
|
| | "upi_ids": set(),
|
| | "bank_accounts": set(),
|
| | "ifsc_codes": set(),
|
| | "phishing_links": set(),
|
| | "languages": set(),
|
| | "personas_encountered": set(),
|
| | "session_count": 0,
|
| | "avg_confidence": 0.0,
|
| | "confidence_sum": 0.0,
|
| | "first_seen": row[10],
|
| | "last_seen": row[10],
|
| | }
|
| |
|
| | profile = profiles[profile_key]
|
| | profile["phone_numbers"].update(phone_numbers)
|
| | profile["upi_ids"].update(upi_ids)
|
| | profile["bank_accounts"].update(row[5] or [])
|
| | profile["ifsc_codes"].update(row[6] or [])
|
| | profile["phishing_links"].update(row[8] or [])
|
| | profile["languages"].add(row[1])
|
| | if row[2]:
|
| | profile["personas_encountered"].add(row[2])
|
| | profile["session_count"] += 1
|
| | profile["confidence_sum"] += row[3] or 0.0
|
| | if row[10] and row[10] < profile["first_seen"]:
|
| | profile["first_seen"] = row[10]
|
| | if row[10] and row[10] > profile["last_seen"]:
|
| | profile["last_seen"] = row[10]
|
| |
|
| |
|
| | result_profiles = []
|
| | for profile in profiles.values():
|
| | profile["phone_numbers"] = list(profile["phone_numbers"])
|
| | profile["upi_ids"] = list(profile["upi_ids"])
|
| | profile["bank_accounts"] = list(profile["bank_accounts"])
|
| | profile["ifsc_codes"] = list(profile["ifsc_codes"])
|
| | profile["phishing_links"] = list(profile["phishing_links"])
|
| | profile["languages"] = list(profile["languages"])
|
| | profile["personas_encountered"] = list(profile["personas_encountered"])
|
| | profile["avg_confidence"] = (
|
| | profile["confidence_sum"] / profile["session_count"]
|
| | if profile["session_count"] > 0 else 0.0
|
| | )
|
| | del profile["confidence_sum"]
|
| | profile["first_seen"] = (
|
| | profile["first_seen"].isoformat()
|
| | if profile["first_seen"] else None
|
| | )
|
| | profile["last_seen"] = (
|
| | profile["last_seen"].isoformat()
|
| | if profile["last_seen"] else None
|
| | )
|
| | result_profiles.append(profile)
|
| |
|
| | return result_profiles
|
| |
|
| | except SQLAlchemyError as e:
|
| | logger.error(f"Failed to get scammer profiles: {e}")
|
| | return []
|
| |
|
| |
|
| | def delete_conversation(session_id: str) -> bool:
|
| | """
|
| | Delete a conversation and all related data.
|
| |
|
| | Args:
|
| | session_id: Session identifier
|
| |
|
| | Returns:
|
| | True if deleted, False otherwise
|
| | """
|
| | if engine is None:
|
| | init_engine()
|
| |
|
| | if engine is None:
|
| | logger.error("Cannot delete conversation: Database not initialized")
|
| | return False
|
| |
|
| | try:
|
| | with engine.connect() as conn:
|
| |
|
| | delete_sql = text("""
|
| | DELETE FROM conversations
|
| | WHERE session_id = :session_id
|
| | """)
|
| | result = conn.execute(delete_sql, {"session_id": session_id})
|
| | conn.commit()
|
| |
|
| | if result.rowcount > 0:
|
| | logger.info(f"Conversation deleted: session_id={session_id}")
|
| | return True
|
| | return False
|
| |
|
| | except SQLAlchemyError as e:
|
| | logger.error(f"Failed to delete conversation: {e}")
|
| | return False
|
| |
|
| |
|
| | def get_conversation_stats() -> Dict[str, Any]:
|
| | """
|
| | Get aggregated conversation statistics.
|
| |
|
| | Returns:
|
| | Dictionary with statistics
|
| | """
|
| | if engine is None:
|
| | init_engine()
|
| |
|
| | if engine is None:
|
| | return {"error": "Database not initialized"}
|
| |
|
| | try:
|
| | with engine.connect() as conn:
|
| | stats_sql = text("""
|
| | SELECT
|
| | COUNT(*) as total_conversations,
|
| | SUM(CASE WHEN scam_detected THEN 1 ELSE 0 END) as scam_count,
|
| | AVG(confidence) as avg_confidence,
|
| | AVG(turn_count) as avg_turns,
|
| | COUNT(DISTINCT language) as language_count
|
| | FROM conversations
|
| | """)
|
| | result = conn.execute(stats_sql)
|
| | row = result.fetchone()
|
| |
|
| | if row:
|
| | return {
|
| | "total_conversations": row[0] or 0,
|
| | "scam_count": row[1] or 0,
|
| | "avg_confidence": float(row[2]) if row[2] else 0.0,
|
| | "avg_turns": float(row[3]) if row[3] else 0.0,
|
| | "language_count": row[4] or 0,
|
| | }
|
| |
|
| | return {
|
| | "total_conversations": 0,
|
| | "scam_count": 0,
|
| | "avg_confidence": 0.0,
|
| | "avg_turns": 0.0,
|
| | "language_count": 0,
|
| | }
|
| |
|
| | except SQLAlchemyError as e:
|
| | logger.error(f"Failed to get conversation stats: {e}")
|
| | return {"error": str(e)}
|
| |
|