| | """ |
| | FastAPI endpoints for the ScamShield AI API. |
| | |
| | Implements Task 8.1: FastAPI Endpoints |
| | |
| | Acceptance Criteria: |
| | - AC-4.1.1: Returns 200 OK for valid requests |
| | - AC-4.1.2: Returns 400 for invalid input |
| | - AC-4.1.3: Response matches schema |
| | - AC-4.1.5: Response time <2s (p95) |
| | |
| | GUVI Integration: |
| | - Supports GUVI's exact input format with nested message object |
| | - Includes x-api-key authentication |
| | - Returns camelCase fields for GUVI evaluator compatibility |
| | - Sends callback to GUVI evaluation endpoint on completion |
| | """ |
| |
|
| | from typing import Optional, List, Any, Dict, Set, Union |
| | from datetime import datetime |
| | import uuid |
| | import time |
| |
|
| | from fastapi import APIRouter, HTTPException, Request, Depends, Body |
| | from fastapi.responses import JSONResponse |
| |
|
| | from app.api.schemas import ( |
| | EngageRequest, |
| | EngageResponse, |
| | HealthResponse, |
| | BatchRequest, |
| | BatchResponse, |
| | SessionResponse, |
| | ErrorResponse, |
| | ExtractedIntelligence, |
| | EngagementInfo, |
| | MessageEntry, |
| | ResponseMetadata, |
| | HealthDependencies, |
| | BatchResultItem, |
| | UnifiedEngageRequest, |
| | GUVICallbackPayload, |
| | ) |
| | from app.api.auth import verify_api_key |
| | from app.config import settings |
| | from app.utils.logger import get_logger |
| |
|
| | logger = get_logger(__name__) |
| |
|
| | router = APIRouter(prefix="/api/v1", tags=["honeypot"]) |
| |
|
| |
|
| | @router.post("/honeypot/engage", dependencies=[Depends(verify_api_key)]) |
| | async def engage_honeypot(request_body: Dict[str, Any] = Body(default={})): |
| | """ |
| | Detect scam messages and engage scammers with AI personas to extract intelligence. |
| | |
| | This is the primary endpoint for the ScamShield AI system. It: |
| | 1. Analyzes the incoming message for scam indicators |
| | 2. If scam detected, engages using an appropriate persona |
| | 3. Extracts financial intelligence from the conversation |
| | 4. Returns structured response with engagement and intelligence data |
| | 5. Sends callback to GUVI when engagement completes |
| | |
| | Supports both formats: |
| | - Our format: {"message": "text", "session_id": "uuid", "language": "auto"} |
| | - GUVI format: {"sessionId": "id", "message": {"sender": "scammer", "text": "..."}, ...} |
| | |
| | For GUVI format requests, returns camelCase fields for evaluator compatibility. |
| | |
| | Args: |
| | request_body: Request body (accepts both formats) |
| | |
| | Returns: |
| | JSONResponse with camelCase fields (GUVI) or EngageResponse (standard) |
| | |
| | Raises: |
| | HTTPException: For validation errors or internal failures |
| | """ |
| | start_time = time.time() |
| | |
| | logger.info(f"Received engage request: {request_body}") |
| | |
| | try: |
| | from app.models.detector import ScamDetector, get_detector |
| | from app.models.language import detect_language |
| | from app.models.extractor import extract_intelligence, extract_from_messages |
| | from app.agent.honeypot import HoneypotAgent |
| | from app.agent.personas import select_persona |
| | from app.database.redis_client import ( |
| | get_session_state_with_fallback, |
| | save_session_state_with_fallback, |
| | ) |
| | from app.utils.guvi_callback import ( |
| | send_final_result_to_guvi, |
| | should_send_callback, |
| | extract_suspicious_keywords, |
| | generate_agent_notes, |
| | identify_scam_type, |
| | identify_red_flags, |
| | count_elicitation_attempts, |
| | ) |
| | |
| | |
| | message_text, session_id, language, conversation_history = _parse_request(request_body) |
| | is_guvi = _is_guvi_format(request_body) |
| | |
| | if not session_id: |
| | session_id = str(uuid.uuid4()) |
| | |
| | if language == "auto": |
| | detected_language, _ = detect_language(message_text) |
| | else: |
| | detected_language = language |
| | |
| | |
| | session_state = None |
| | is_ongoing_scam_session = False |
| | provided_session_id = request_body.get("session_id") or request_body.get("sessionId") |
| | if provided_session_id: |
| | session_state = get_session_state_with_fallback(provided_session_id) |
| | if session_state and session_state.get("turn_count", 0) > 0: |
| | is_ongoing_scam_session = True |
| | logger.info(f"Continuing existing scam session {session_id}, turn={session_state.get('turn_count', 0)}") |
| | |
| | |
| | if conversation_history and not session_state: |
| | session_state = _rebuild_session_from_history(conversation_history, detected_language) |
| | |
| | |
| | detector = get_detector() |
| | detection_result = detector.detect(message_text, detected_language) |
| | |
| | scam_detected = detection_result.get("scam_detected", False) |
| | confidence = detection_result.get("confidence", 0.0) |
| | scam_indicators = detection_result.get("indicators", []) |
| | |
| | |
| | |
| | if is_guvi: |
| | scam_detected = True |
| | confidence = max(confidence, 0.85) |
| | logger.info(f"GUVI format detected - forcing scam_detected=True, confidence={confidence:.2f}") |
| | |
| | processing_time_ms = int((time.time() - start_time) * 1000) |
| | |
| | |
| | if not scam_detected and not is_ongoing_scam_session and not is_guvi: |
| | logger.info(f"No scam detected for session {session_id}, confidence={confidence:.2f}") |
| | return EngageResponse( |
| | status="success", |
| | scam_detected=False, |
| | confidence=confidence, |
| | language_detected=detected_language, |
| | session_id=session_id, |
| | reply="No scam detected. Message appears legitimate.", |
| | message="No scam detected. Message appears legitimate.", |
| | metadata=ResponseMetadata( |
| | processing_time_ms=processing_time_ms, |
| | model_version="1.0.0", |
| | detection_model="indic-bert", |
| | engagement_model=None, |
| | ), |
| | ) |
| | |
| | |
| | if is_ongoing_scam_session: |
| | existing_confidence = session_state.get("scam_confidence", 0.0) |
| | confidence = max(confidence, existing_confidence) |
| | scam_detected = True |
| | |
| | logger.info(f"Scam detected for session {session_id}, confidence={confidence:.2f}") |
| | |
| | agent = HoneypotAgent() |
| | result = agent.engage(message_text, session_state) |
| | |
| | |
| | |
| | |
| | all_scammer_texts: List[str] = [] |
| | |
| | |
| | if conversation_history: |
| | for hist_msg in conversation_history: |
| | sender = hist_msg.get("sender", "") |
| | if sender in ("scammer", ""): |
| | all_scammer_texts.append(hist_msg.get("message", "")) |
| | |
| | |
| | for msg in result.get("messages", []): |
| | if msg.get("sender") == "scammer": |
| | all_scammer_texts.append(msg.get("message", "")) |
| | |
| | combined_scammer_text = " ".join(all_scammer_texts) |
| | intel, extraction_confidence = extract_intelligence(combined_scammer_text) |
| | |
| | result["extracted_intel"] = intel |
| | result["extraction_confidence"] = extraction_confidence |
| | result["scam_confidence"] = confidence |
| | |
| | |
| | save_session_state_with_fallback(session_id, result) |
| | |
| | if settings.POSTGRES_URL: |
| | try: |
| | from app.database.postgres import save_conversation |
| | conversation_data = { |
| | "language": detected_language, |
| | "persona": result.get("persona"), |
| | "scam_confidence": confidence, |
| | "turn_count": result.get("turn_count", 1), |
| | "messages": result.get("messages", []), |
| | "extracted_intel": intel, |
| | "extraction_confidence": extraction_confidence, |
| | } |
| | conversation_id = save_conversation(session_id, conversation_data) |
| | if conversation_id > 0: |
| | logger.debug(f"Conversation saved to PostgreSQL: id={conversation_id}") |
| | except Exception as e: |
| | logger.warning(f"Failed to save conversation to PostgreSQL: {e}") |
| | |
| | |
| | agent_messages = [m for m in result.get("messages", []) if m.get("sender") == "agent"] |
| | agent_response = agent_messages[-1]["message"] if agent_messages else "I understand, please tell me more about this." |
| | |
| | |
| | turn_count = result.get("turn_count", 1) |
| | max_turns = settings.MAX_TURNS |
| | messages_list = result.get("messages", []) |
| | |
| | |
| | total_messages_exchanged = len(messages_list) |
| | if conversation_history: |
| | total_messages_exchanged += len(conversation_history) |
| | |
| | engagement_duration_seconds = _calculate_engagement_duration( |
| | conversation_history, messages_list, start_time |
| | ) |
| | |
| | suspicious_keywords = extract_suspicious_keywords(messages_list, scam_indicators) |
| | |
| | |
| | red_flags_identified = identify_red_flags(messages_list) |
| | elicitation_attempts = count_elicitation_attempts(messages_list) |
| | |
| | agent_notes = generate_agent_notes(messages_list, intel, scam_indicators) |
| | |
| | |
| | max_turns_reached = turn_count >= max_turns |
| | terminated = result.get("terminated", False) |
| | |
| | if should_send_callback(turn_count, max_turns_reached, extraction_confidence, terminated): |
| | try: |
| | send_final_result_to_guvi( |
| | session_id=session_id, |
| | scam_detected=True, |
| | total_messages=total_messages_exchanged, |
| | extracted_intel=intel, |
| | messages=messages_list, |
| | scam_indicators=scam_indicators, |
| | agent_notes=agent_notes, |
| | engagement_duration_seconds=engagement_duration_seconds, |
| | ) |
| | except Exception as e: |
| | logger.error(f"GUVI callback error: {e}") |
| | |
| | processing_time_ms = int((time.time() - start_time) * 1000) |
| | |
| | logger.info( |
| | f"Engagement complete for session {session_id}: " |
| | f"turn={turn_count}, messages={total_messages_exchanged}, " |
| | f"duration={engagement_duration_seconds}s, " |
| | f"intel_count={len(intel.get('upi_ids', [])) + len(intel.get('phone_numbers', []))}" |
| | ) |
| | |
| | |
| | if is_guvi: |
| | scammer_text = " ".join( |
| | m.get("message", "") for m in messages_list if m.get("sender") == "scammer" |
| | ) |
| | scam_type = identify_scam_type(scammer_text.lower(), scammer_text) |
| | |
| | return JSONResponse(content={ |
| | "sessionId": session_id, |
| | "status": "success", |
| | "reply": agent_response, |
| | "scamDetected": True, |
| | "confidenceLevel": round(confidence, 2), |
| | "scamType": scam_type or "Financial Fraud", |
| | "totalMessagesExchanged": total_messages_exchanged, |
| | "engagementDurationSeconds": engagement_duration_seconds, |
| | "extractedIntelligence": { |
| | "phoneNumbers": intel.get("phone_numbers", []), |
| | "bankAccounts": intel.get("bank_accounts", []), |
| | "upiIds": intel.get("upi_ids", []), |
| | "ifscCodes": intel.get("ifsc_codes", []), |
| | "phishingLinks": intel.get("phishing_links", []), |
| | "emailAddresses": intel.get("email_addresses", []), |
| | "caseIds": intel.get("case_ids", []), |
| | "policyNumbers": intel.get("policy_numbers", []), |
| | "orderNumbers": intel.get("order_numbers", []), |
| | "suspiciousKeywords": suspicious_keywords, |
| | }, |
| | "engagementMetrics": { |
| | "engagementDurationSeconds": engagement_duration_seconds, |
| | "totalMessagesExchanged": total_messages_exchanged, |
| | }, |
| | "conversationQuality": { |
| | "turnCount": turn_count, |
| | "redFlagsIdentified": red_flags_identified, |
| | "redFlagsCount": len(red_flags_identified), |
| | "elicitationAttempts": elicitation_attempts, |
| | "questionsAsked": elicitation_attempts, |
| | }, |
| | "agentNotes": agent_notes, |
| | }) |
| | |
| | |
| | engagement = EngagementInfo( |
| | agent_response=agent_response[:500], |
| | turn_count=turn_count, |
| | max_turns_reached=turn_count >= max_turns, |
| | strategy=result.get("strategy", "build_trust"), |
| | persona=result.get("persona"), |
| | ) |
| | |
| | conversation_history_response = [] |
| | for msg in messages_list: |
| | raw_ts = msg.get("timestamp") |
| | if isinstance(raw_ts, int): |
| | timestamp = datetime.utcfromtimestamp(raw_ts / 1000).isoformat() + "Z" |
| | elif isinstance(raw_ts, str): |
| | timestamp = raw_ts |
| | else: |
| | timestamp = datetime.utcnow().isoformat() + "Z" |
| | conversation_history_response.append( |
| | MessageEntry( |
| | turn=msg.get("turn", 0), |
| | sender=msg.get("sender", "unknown"), |
| | message=msg.get("message", ""), |
| | timestamp=timestamp, |
| | ) |
| | ) |
| | |
| | extracted_intelligence = ExtractedIntelligence( |
| | upi_ids=intel.get("upi_ids", []), |
| | bank_accounts=intel.get("bank_accounts", []), |
| | ifsc_codes=intel.get("ifsc_codes", []), |
| | phone_numbers=intel.get("phone_numbers", []), |
| | phishing_links=intel.get("phishing_links", []), |
| | email_addresses=intel.get("email_addresses", []), |
| | case_ids=intel.get("case_ids", []), |
| | policy_numbers=intel.get("policy_numbers", []), |
| | order_numbers=intel.get("order_numbers", []), |
| | suspicious_keywords=suspicious_keywords, |
| | extraction_confidence=extraction_confidence, |
| | ) |
| | |
| | return EngageResponse( |
| | status="success", |
| | scam_detected=True, |
| | confidence=confidence, |
| | language_detected=detected_language, |
| | session_id=session_id, |
| | reply=agent_response, |
| | agent_notes=agent_notes, |
| | engagement=engagement, |
| | extracted_intelligence=extracted_intelligence, |
| | conversation_history=conversation_history_response, |
| | metadata=ResponseMetadata( |
| | processing_time_ms=processing_time_ms, |
| | model_version="1.0.0", |
| | detection_model="indic-bert", |
| | engagement_model="groq-llama-3.1-8b-instant", |
| | ), |
| | ) |
| | |
| | except ValueError as e: |
| | logger.warning(f"Validation error: {e}") |
| | raise HTTPException( |
| | status_code=400, |
| | detail={ |
| | "status": "error", |
| | "code": "INVALID_REQUEST_BODY", |
| | "message": str(e), |
| | }, |
| | ) |
| | except Exception as e: |
| | logger.error(f"Error processing engage request: {e}", exc_info=True) |
| | |
| | |
| | |
| | |
| | if _is_guvi_format(request_body): |
| | error_session_id = request_body.get("sessionId") or request_body.get("session_id") or str(uuid.uuid4()) |
| | error_start_time = int(time.time()) |
| | return JSONResponse( |
| | status_code=200, |
| | content={ |
| | "sessionId": error_session_id, |
| | "status": "success", |
| | "reply": "I am having some trouble, please tell me more.", |
| | "scamDetected": True, |
| | "scamType": "Unknown", |
| | "confidenceLevel": 0.5, |
| | "totalMessagesExchanged": 1, |
| | "engagementDurationSeconds": 1, |
| | "extractedIntelligence": { |
| | "phoneNumbers": [], |
| | "bankAccounts": [], |
| | "upiIds": [], |
| | "ifscCodes": [], |
| | "phishingLinks": [], |
| | "emailAddresses": [], |
| | "caseIds": [], |
| | "policyNumbers": [], |
| | "orderNumbers": [], |
| | "suspiciousKeywords": [], |
| | }, |
| | "engagementMetrics": { |
| | "engagementDurationSeconds": 1, |
| | "totalMessagesExchanged": 1, |
| | }, |
| | "agentNotes": f"Error during processing at {error_start_time}. Partial engagement.", |
| | }, |
| | ) |
| | raise HTTPException( |
| | status_code=500, |
| | detail={ |
| | "status": "error", |
| | "code": "INTERNAL_ERROR", |
| | "message": "An error occurred while processing your request", |
| | "details": str(e) if settings.DEBUG else None, |
| | }, |
| | ) |
| |
|
| |
|
| | @router.get("/honeypot/session/{session_id}", response_model=SessionResponse) |
| | async def get_session(session_id: str) -> SessionResponse: |
| | """ |
| | Retrieve complete conversation history for a session. |
| | |
| | Args: |
| | session_id: UUID of the session to retrieve |
| | |
| | Returns: |
| | SessionResponse with conversation history and extracted intelligence |
| | |
| | Raises: |
| | HTTPException: 404 if session not found, 400 if invalid session ID |
| | """ |
| | |
| | try: |
| | uuid.UUID(session_id) |
| | except ValueError: |
| | raise HTTPException( |
| | status_code=400, |
| | detail={ |
| | "code": "INVALID_SESSION_ID", |
| | "message": "Session ID format invalid. Must be a valid UUID.", |
| | }, |
| | ) |
| | |
| | try: |
| | from app.database.redis_client import get_session_state_with_fallback |
| |
|
| | |
| | session_state = get_session_state_with_fallback(session_id) |
| |
|
| | if session_state: |
| | |
| | messages = session_state.get("messages", []) |
| | conversation_history = [] |
| | for msg in messages: |
| | |
| | raw_ts = msg.get("timestamp") |
| | if isinstance(raw_ts, int): |
| | timestamp = datetime.utcfromtimestamp(raw_ts / 1000).isoformat() + "Z" |
| | elif isinstance(raw_ts, str): |
| | timestamp = raw_ts |
| | else: |
| | timestamp = datetime.utcnow().isoformat() + "Z" |
| | |
| | conversation_history.append( |
| | MessageEntry( |
| | turn=msg.get("turn", 0), |
| | sender=msg.get("sender", "unknown"), |
| | message=msg.get("message", ""), |
| | timestamp=timestamp, |
| | ) |
| | ) |
| | |
| | intel = session_state.get("extracted_intel", {}) |
| | extracted_intelligence = ExtractedIntelligence( |
| | upi_ids=intel.get("upi_ids", []), |
| | bank_accounts=intel.get("bank_accounts", []), |
| | ifsc_codes=intel.get("ifsc_codes", []), |
| | phone_numbers=intel.get("phone_numbers", []), |
| | phishing_links=intel.get("phishing_links", []), |
| | email_addresses=intel.get("email_addresses", []), |
| | case_ids=intel.get("case_ids", []), |
| | policy_numbers=intel.get("policy_numbers", []), |
| | order_numbers=intel.get("order_numbers", []), |
| | extraction_confidence=session_state.get("extraction_confidence", 0.0), |
| | ) |
| | |
| | |
| | created_at = messages[0].get("timestamp") if messages else datetime.utcnow().isoformat() + "Z" |
| | updated_at = messages[-1].get("timestamp") if messages else datetime.utcnow().isoformat() + "Z" |
| | |
| | return SessionResponse( |
| | session_id=session_id, |
| | language=session_state.get("language", "en"), |
| | persona=session_state.get("persona"), |
| | scam_confidence=session_state.get("scam_confidence", 0.0), |
| | turn_count=session_state.get("turn_count", 0), |
| | conversation_history=conversation_history, |
| | extracted_intelligence=extracted_intelligence, |
| | created_at=created_at, |
| | updated_at=updated_at, |
| | ) |
| | |
| | |
| | conversation = None |
| | if settings.POSTGRES_URL: |
| | try: |
| | from app.database.postgres import get_conversation |
| | conversation = get_conversation(session_id) |
| | except Exception as e: |
| | logger.warning(f"Failed to get conversation from PostgreSQL: {e}") |
| |
|
| | if conversation: |
| | messages = conversation.get("messages", []) |
| | conversation_history = [] |
| | for msg in messages: |
| | |
| | raw_ts = msg.get("timestamp") |
| | if isinstance(raw_ts, int): |
| | timestamp = datetime.utcfromtimestamp(raw_ts / 1000).isoformat() + "Z" |
| | elif isinstance(raw_ts, str): |
| | timestamp = raw_ts |
| | else: |
| | timestamp = datetime.utcnow().isoformat() + "Z" |
| | |
| | conversation_history.append( |
| | MessageEntry( |
| | turn=msg.get("turn", 0), |
| | sender=msg.get("sender", "unknown"), |
| | message=msg.get("message", ""), |
| | timestamp=timestamp, |
| | ) |
| | ) |
| | |
| | intel = conversation.get("extracted_intel", {}) |
| | extracted_intelligence = ExtractedIntelligence( |
| | upi_ids=intel.get("upi_ids", []), |
| | bank_accounts=intel.get("bank_accounts", []), |
| | ifsc_codes=intel.get("ifsc_codes", []), |
| | phone_numbers=intel.get("phone_numbers", []), |
| | phishing_links=intel.get("phishing_links", []), |
| | email_addresses=intel.get("email_addresses", []), |
| | case_ids=intel.get("case_ids", []), |
| | policy_numbers=intel.get("policy_numbers", []), |
| | order_numbers=intel.get("order_numbers", []), |
| | extraction_confidence=conversation.get("extraction_confidence", 0.0), |
| | ) |
| | |
| | return SessionResponse( |
| | session_id=session_id, |
| | language=conversation.get("language", "en"), |
| | persona=conversation.get("persona"), |
| | scam_confidence=conversation.get("scam_confidence", 0.0), |
| | turn_count=conversation.get("turn_count", 0), |
| | conversation_history=conversation_history, |
| | extracted_intelligence=extracted_intelligence, |
| | created_at=conversation.get("created_at", datetime.utcnow().isoformat() + "Z"), |
| | updated_at=conversation.get("updated_at", datetime.utcnow().isoformat() + "Z"), |
| | ) |
| | |
| | |
| | raise HTTPException( |
| | status_code=404, |
| | detail={ |
| | "code": "SESSION_NOT_FOUND", |
| | "message": "No conversation found for the provided session ID", |
| | "session_id": session_id, |
| | }, |
| | ) |
| | |
| | except HTTPException: |
| | raise |
| | except Exception as e: |
| | logger.error(f"Error retrieving session {session_id}: {e}", exc_info=True) |
| | raise HTTPException( |
| | status_code=500, |
| | detail={ |
| | "code": "INTERNAL_ERROR", |
| | "message": "An error occurred while retrieving the session", |
| | }, |
| | ) |
| |
|
| |
|
| | @router.get("/health", response_model=HealthResponse) |
| | async def health_check() -> HealthResponse: |
| | """ |
| | Service health check for monitoring and load balancing. |
| | |
| | Returns: |
| | HealthResponse with service status and dependency health |
| | """ |
| | from app.main import get_uptime_seconds |
| | |
| | |
| | groq_status = "online" |
| | postgres_status = "offline" |
| | redis_status = "offline" |
| | models_loaded = False |
| | |
| | |
| | try: |
| | from app.database.redis_client import health_check as redis_health |
| | redis_status = "online" if redis_health() else "offline" |
| | except Exception as e: |
| | logger.warning(f"Redis health check failed: {e}") |
| | redis_status = "offline" |
| | |
| | |
| | if settings.POSTGRES_URL: |
| | try: |
| | from app.database.postgres import verify_schema |
| | postgres_status = "online" if verify_schema() else "degraded" |
| | except Exception as e: |
| | logger.warning(f"PostgreSQL health check failed: {e}") |
| | postgres_status = "offline" |
| | |
| | |
| | try: |
| | groq_status = "online" if settings.GROQ_API_KEY else "not_configured" |
| | except Exception: |
| | groq_status = "unknown" |
| | |
| | |
| | try: |
| | from app.models.detector import get_detector |
| | detector = get_detector() |
| | models_loaded = detector is not None |
| | except Exception: |
| | models_loaded = False |
| | |
| | |
| | if redis_status == "offline" and postgres_status == "offline": |
| | overall_status = "unhealthy" |
| | elif redis_status == "offline" or postgres_status == "offline": |
| | overall_status = "degraded" |
| | else: |
| | overall_status = "healthy" |
| | |
| | return HealthResponse( |
| | status=overall_status, |
| | version="1.0.0", |
| | timestamp=datetime.utcnow().isoformat() + "Z", |
| | dependencies=HealthDependencies( |
| | groq_api=groq_status, |
| | postgres=postgres_status, |
| | redis=redis_status, |
| | models_loaded=models_loaded, |
| | ), |
| | uptime_seconds=get_uptime_seconds(), |
| | ) |
| |
|
| |
|
| | @router.post("/honeypot/batch", response_model=BatchResponse) |
| | async def batch_process(request: BatchRequest) -> BatchResponse: |
| | """ |
| | Process multiple messages in batch mode. |
| | |
| | Args: |
| | request: BatchRequest containing array of messages to process |
| | |
| | Returns: |
| | BatchResponse with processing results for each message |
| | """ |
| | start_time = time.time() |
| | |
| | try: |
| | from app.models.detector import get_detector |
| | from app.models.language import detect_language |
| | |
| | detector = get_detector() |
| | results: List[BatchResultItem] = [] |
| | failed_count = 0 |
| | |
| | for msg_item in request.messages: |
| | try: |
| | |
| | if msg_item.language == "auto": |
| | detected_language, _ = detect_language(msg_item.message) |
| | else: |
| | detected_language = msg_item.language |
| | |
| | |
| | detection_result = detector.detect(msg_item.message, detected_language) |
| | |
| | results.append( |
| | BatchResultItem( |
| | id=msg_item.id, |
| | status="success", |
| | scam_detected=detection_result.get("scam_detected", False), |
| | confidence=detection_result.get("confidence", 0.0), |
| | language_detected=detected_language, |
| | ) |
| | ) |
| | |
| | except Exception as e: |
| | logger.warning(f"Batch item {msg_item.id} failed: {e}") |
| | failed_count += 1 |
| | results.append( |
| | BatchResultItem( |
| | id=msg_item.id, |
| | status="error", |
| | error={ |
| | "code": "PROCESSING_ERROR", |
| | "message": str(e), |
| | }, |
| | ) |
| | ) |
| | |
| | processing_time_ms = int((time.time() - start_time) * 1000) |
| | |
| | return BatchResponse( |
| | status="success" if failed_count == 0 else "partial", |
| | processed=len(results) - failed_count, |
| | failed=failed_count, |
| | results=results, |
| | processing_time_ms=processing_time_ms, |
| | ) |
| | |
| | except Exception as e: |
| | logger.error(f"Batch processing failed: {e}", exc_info=True) |
| | raise HTTPException( |
| | status_code=500, |
| | detail={ |
| | "code": "BATCH_PROCESSING_ERROR", |
| | "message": "Failed to process batch request", |
| | }, |
| | ) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def _is_guvi_format(request_body: Dict[str, Any]) -> bool: |
| | """ |
| | Detect if the request is in GUVI evaluation format. |
| | |
| | GUVI format is identified by: |
| | - A nested ``message`` object (dict, not string) |
| | - Presence of ``sessionId`` (camelCase) |
| | |
| | Args: |
| | request_body: Raw request body |
| | |
| | Returns: |
| | True if request matches GUVI format |
| | """ |
| | if not request_body: |
| | return False |
| | return isinstance(request_body.get("message"), dict) or "sessionId" in request_body |
| |
|
| |
|
| | def _calculate_engagement_duration( |
| | conversation_history: Optional[List[Dict]], |
| | messages: List[Dict], |
| | request_start_time: float, |
| | ) -> int: |
| | """ |
| | Calculate engagement duration in seconds from message timestamps. |
| | |
| | Uses the earliest available timestamp (from conversation history or |
| | current messages) and the current wall-clock time to compute elapsed |
| | seconds. Falls back to a turn-based estimate when no usable |
| | timestamps are present. |
| | |
| | Args: |
| | conversation_history: GUVI conversation history (may be None) |
| | messages: Messages from current agent workflow |
| | request_start_time: time.time() when the request started |
| | |
| | Returns: |
| | Engagement duration in whole seconds (always > 0) |
| | """ |
| | earliest_ts: Optional[float] = None |
| | |
| | |
| | if conversation_history: |
| | for msg in conversation_history: |
| | ts = msg.get("timestamp") |
| | parsed = _parse_timestamp_to_epoch(ts) |
| | if parsed is not None: |
| | if earliest_ts is None or parsed < earliest_ts: |
| | earliest_ts = parsed |
| | |
| | |
| | if earliest_ts is None and messages: |
| | for msg in messages: |
| | ts = msg.get("timestamp") |
| | parsed = _parse_timestamp_to_epoch(ts) |
| | if parsed is not None: |
| | if earliest_ts is None or parsed < earliest_ts: |
| | earliest_ts = parsed |
| | |
| | now = time.time() |
| | |
| | |
| | |
| | total_turns = len(messages) |
| | if conversation_history: |
| | total_turns += len(conversation_history) |
| | estimated_duration = max(total_turns * 20, 60) |
| | |
| | if earliest_ts is not None and earliest_ts < now: |
| | actual_duration = int(now - earliest_ts) |
| | |
| | duration = max(actual_duration, estimated_duration) |
| | else: |
| | duration = estimated_duration |
| | |
| | |
| | |
| | if duration <= 0: |
| | duration = estimated_duration |
| | |
| | return max(duration, 60) |
| |
|
| |
|
| | def _parse_timestamp_to_epoch(ts) -> Optional[float]: |
| | """ |
| | Parse a timestamp value (ISO-8601 string or epoch-ms integer) to epoch seconds. |
| | |
| | Returns None if the timestamp cannot be parsed. |
| | """ |
| | if ts is None: |
| | return None |
| | if isinstance(ts, (int, float)): |
| | |
| | if ts > 1e12: |
| | return ts / 1000.0 |
| | return float(ts) |
| | if isinstance(ts, str): |
| | try: |
| | |
| | ts_clean = ts.rstrip("Z") |
| | dt = datetime.fromisoformat(ts_clean) |
| | return dt.timestamp() |
| | except (ValueError, TypeError): |
| | return None |
| | return None |
| |
|
| |
|
| | def _parse_request(request_body: Dict[str, Any]) -> tuple: |
| | """ |
| | Parse request body and normalize to internal format. |
| | |
| | Supports both our format and GUVI format. |
| | Also handles test/empty payloads gracefully for API validation. |
| | |
| | Args: |
| | request_body: Raw request body dictionary |
| | |
| | Returns: |
| | Tuple of (message_text, session_id, language, conversation_history) |
| | """ |
| | |
| | if not request_body: |
| | logger.info("Empty request body received - treating as API test") |
| | return "API test message", None, "auto", None |
| | |
| | |
| | is_guvi_format = ( |
| | isinstance(request_body.get("message"), dict) or |
| | "sessionId" in request_body |
| | ) |
| | |
| | if is_guvi_format: |
| | return _parse_guvi_format(request_body) |
| | else: |
| | return _parse_standard_format(request_body) |
| |
|
| |
|
| | def _parse_standard_format(request_body: Dict[str, Any]) -> tuple: |
| | """ |
| | Parse our standard request format. |
| | |
| | Format: {"message": "text", "session_id": "uuid", "language": "auto"} |
| | |
| | Also handles test payloads with minimal/missing fields. |
| | """ |
| | message_text = request_body.get("message", "") |
| | |
| | |
| | if not message_text: |
| | |
| | message_text = request_body.get("text", "") |
| | |
| | if not message_text: |
| | |
| | logger.info("No message field found - using default test message") |
| | message_text = "Test message for API validation" |
| | |
| | |
| | if not isinstance(message_text, str): |
| | message_text = str(message_text) |
| | |
| | session_id = request_body.get("session_id") |
| | language = request_body.get("language", "auto") |
| | |
| | return message_text, session_id, language, None |
| |
|
| |
|
| | def _parse_guvi_format(request_body: Dict[str, Any]) -> tuple: |
| | """ |
| | Parse GUVI's request format. |
| | |
| | Format: |
| | { |
| | "sessionId": "id", |
| | "message": {"sender": "scammer", "text": "...", "timestamp": "..."}, |
| | "conversationHistory": [...], |
| | "metadata": {"channel": "SMS", "language": "English", "locale": "IN"} |
| | } |
| | |
| | Also handles test payloads with minimal/missing fields for API validation. |
| | """ |
| | |
| | message_obj = request_body.get("message", {}) |
| | |
| | if isinstance(message_obj, dict): |
| | message_text = message_obj.get("text", "") |
| | else: |
| | |
| | message_text = str(message_obj) if message_obj else "" |
| | |
| | |
| | if not message_text: |
| | |
| | logger.info("No message.text found in GUVI format - using default test message") |
| | message_text = "Test message for API validation" |
| | |
| | |
| | session_id = request_body.get("sessionId") or request_body.get("session_id") |
| | |
| | |
| | metadata = request_body.get("metadata", {}) |
| | guvi_language = metadata.get("language", "").lower() if metadata else "" |
| | |
| | |
| | language_map = { |
| | "english": "en", |
| | "hindi": "hi", |
| | "hinglish": "hinglish", |
| | } |
| | language = language_map.get(guvi_language, "auto") |
| | |
| | |
| | conversation_history = request_body.get("conversationHistory", []) |
| | |
| | |
| | normalized_history = [] |
| | for item in conversation_history: |
| | if isinstance(item, dict): |
| | |
| | sender = item.get("sender", "scammer") |
| | if sender == "user": |
| | sender = "agent" |
| | |
| | |
| | raw_ts = item.get("timestamp") |
| | if isinstance(raw_ts, int): |
| | |
| | timestamp = datetime.utcfromtimestamp(raw_ts / 1000).isoformat() + "Z" |
| | elif isinstance(raw_ts, str): |
| | timestamp = raw_ts |
| | else: |
| | timestamp = datetime.utcnow().isoformat() + "Z" |
| | |
| | normalized_history.append({ |
| | "sender": sender, |
| | "message": item.get("text", ""), |
| | "timestamp": timestamp, |
| | }) |
| | |
| | return message_text, session_id, language, normalized_history |
| |
|
| |
|
| | def _rebuild_session_from_history( |
| | conversation_history: List[Dict], |
| | language: str, |
| | ) -> Dict[str, Any]: |
| | """ |
| | Rebuild session state from GUVI conversation history. |
| | |
| | When GUVI sends conversationHistory, we need to reconstruct |
| | the session state to continue the conversation properly. |
| | |
| | Args: |
| | conversation_history: List of previous messages |
| | language: Detected/provided language |
| | |
| | Returns: |
| | Session state dictionary |
| | """ |
| | from app.agent.personas import select_persona |
| | |
| | |
| | messages = [] |
| | for i, msg in enumerate(conversation_history): |
| | |
| | raw_ts = msg.get("timestamp") |
| | if isinstance(raw_ts, int): |
| | timestamp = datetime.utcfromtimestamp(raw_ts / 1000).isoformat() + "Z" |
| | elif isinstance(raw_ts, str): |
| | timestamp = raw_ts |
| | else: |
| | timestamp = datetime.utcnow().isoformat() + "Z" |
| | |
| | messages.append({ |
| | "turn": i + 1, |
| | "sender": msg.get("sender", "scammer"), |
| | "message": msg.get("message", ""), |
| | "timestamp": timestamp, |
| | }) |
| | |
| | turn_count = len(messages) |
| | |
| | |
| | full_text = " ".join(m.get("message", "") for m in messages) |
| | persona = select_persona("unknown", language) |
| | |
| | |
| | if turn_count < 5: |
| | strategy = "build_trust" |
| | elif turn_count < 12: |
| | strategy = "express_confusion" |
| | else: |
| | strategy = "probe_details" |
| | |
| | return { |
| | "messages": messages, |
| | "scam_confidence": 0.85, |
| | "turn_count": turn_count, |
| | "extracted_intel": { |
| | "upi_ids": [], |
| | "bank_accounts": [], |
| | "ifsc_codes": [], |
| | "phone_numbers": [], |
| | "phishing_links": [], |
| | "email_addresses": [], |
| | "case_ids": [], |
| | "policy_numbers": [], |
| | "order_numbers": [], |
| | }, |
| | "extraction_confidence": 0.0, |
| | "strategy": strategy, |
| | "language": language, |
| | "persona": persona, |
| | "max_turns_reached": False, |
| | "terminated": False, |
| | } |
| |
|