Gankit12 commited on
Commit
365a2db
·
verified ·
1 Parent(s): a4e031f

Update app/api/endpoints.py

Browse files
Files changed (1) hide show
  1. app/api/endpoints.py +793 -764
app/api/endpoints.py CHANGED
@@ -1,764 +1,793 @@
1
- """
2
- FastAPI endpoints for the ScamShield AI API.
3
-
4
- Implements Task 8.1: FastAPI Endpoints
5
-
6
- Acceptance Criteria:
7
- - AC-4.1.1: Returns 200 OK for valid requests
8
- - AC-4.1.2: Returns 400 for invalid input
9
- - AC-4.1.3: Response matches schema
10
- - AC-4.1.5: Response time <2s (p95)
11
-
12
- GUVI Integration:
13
- - Supports GUVI's exact input format with nested message object
14
- - Includes x-api-key authentication
15
- - Sends callback to GUVI evaluation endpoint on completion
16
- """
17
-
18
- from typing import Optional, List, Any, Dict, Union
19
- from datetime import datetime
20
- import uuid
21
- import time
22
-
23
- from fastapi import APIRouter, HTTPException, Request, Depends, Body
24
-
25
- from app.api.schemas import (
26
- EngageRequest,
27
- EngageResponse,
28
- HealthResponse,
29
- BatchRequest,
30
- BatchResponse,
31
- SessionResponse,
32
- ErrorResponse,
33
- ExtractedIntelligence,
34
- EngagementInfo,
35
- MessageEntry,
36
- ResponseMetadata,
37
- HealthDependencies,
38
- BatchResultItem,
39
- UnifiedEngageRequest,
40
- )
41
- from app.api.auth import verify_api_key
42
- from app.config import settings
43
- from app.utils.logger import get_logger
44
-
45
- logger = get_logger(__name__)
46
-
47
- router = APIRouter(prefix="/api/v1", tags=["honeypot"])
48
-
49
-
50
- @router.post("/honeypot/engage", response_model=EngageResponse, dependencies=[Depends(verify_api_key)])
51
- async def engage_honeypot(request_body: Dict[str, Any] = Body(...)) -> EngageResponse:
52
- """
53
- Detect scam messages and engage scammers with AI personas to extract intelligence.
54
-
55
- This is the primary endpoint for the ScamShield AI system. It:
56
- 1. Analyzes the incoming message for scam indicators
57
- 2. If scam detected, engages using an appropriate persona
58
- 3. Extracts financial intelligence from the conversation
59
- 4. Returns structured response with engagement and intelligence data
60
- 5. Sends callback to GUVI when engagement completes
61
-
62
- Supports both formats:
63
- - Our format: {"message": "text", "session_id": "uuid", "language": "auto"}
64
- - GUVI format: {"sessionId": "id", "message": {"sender": "scammer", "text": "..."}, ...}
65
-
66
- Args:
67
- request_body: Request body (accepts both formats)
68
-
69
- Returns:
70
- EngageResponse with detection results, engagement, and extracted intelligence
71
-
72
- Raises:
73
- HTTPException: For validation errors or internal failures
74
- """
75
- start_time = time.time()
76
-
77
- try:
78
- # Import required modules
79
- from app.models.detector import ScamDetector, get_detector
80
- from app.models.language import detect_language
81
- from app.models.extractor import extract_intelligence
82
- from app.agent.honeypot import HoneypotAgent
83
- from app.agent.personas import select_persona
84
- from app.database.redis_client import (
85
- get_session_state_with_fallback,
86
- save_session_state_with_fallback,
87
- )
88
- from app.utils.guvi_callback import (
89
- send_final_result_to_guvi,
90
- should_send_callback,
91
- extract_suspicious_keywords,
92
- generate_agent_notes,
93
- )
94
-
95
- # Parse request - detect format and normalize
96
- message_text, session_id, language, conversation_history = _parse_request(request_body)
97
-
98
- # Generate session ID if not provided
99
- if not session_id:
100
- session_id = str(uuid.uuid4())
101
-
102
- # Detect language if auto
103
- if language == "auto":
104
- detected_language, _ = detect_language(message_text)
105
- else:
106
- detected_language = language
107
-
108
- # Retrieve existing session state if session_id was provided
109
- session_state = None
110
- is_ongoing_scam_session = False
111
- provided_session_id = request_body.get("session_id") or request_body.get("sessionId")
112
- if provided_session_id:
113
- session_state = get_session_state_with_fallback(provided_session_id)
114
- # Check if this is an ongoing scam conversation
115
- if session_state and session_state.get("turn_count", 0) > 0:
116
- is_ongoing_scam_session = True
117
- logger.info(f"Continuing existing scam session {session_id}, turn={session_state.get('turn_count', 0)}")
118
-
119
- # If conversation history provided (GUVI format), rebuild session state
120
- if conversation_history and not session_state:
121
- session_state = _rebuild_session_from_history(conversation_history, detected_language)
122
-
123
- # Run scam detection
124
- detector = get_detector()
125
- detection_result = detector.detect(message_text, detected_language)
126
-
127
- scam_detected = detection_result.get("scam_detected", False)
128
- confidence = detection_result.get("confidence", 0.0)
129
- scam_indicators = detection_result.get("indicators", [])
130
-
131
- # Calculate processing time so far
132
- processing_time_ms = int((time.time() - start_time) * 1000)
133
-
134
- # If not a scam AND not part of an ongoing scam conversation, return simple response
135
- if not scam_detected and not is_ongoing_scam_session:
136
- logger.info(f"No scam detected for session {session_id}, confidence={confidence:.2f}")
137
-
138
- return EngageResponse(
139
- status="success",
140
- scam_detected=False,
141
- confidence=confidence,
142
- language_detected=detected_language,
143
- session_id=session_id,
144
- reply=None, # No reply for non-scam messages
145
- message="No scam detected. Message appears legitimate.",
146
- metadata=ResponseMetadata(
147
- processing_time_ms=processing_time_ms,
148
- model_version="1.0.0",
149
- detection_model="indic-bert",
150
- engagement_model=None,
151
- ),
152
- )
153
-
154
- # Scam detected OR continuing an ongoing scam conversation - engage with honeypot agent
155
- if is_ongoing_scam_session:
156
- logger.info(f"Continuing scam conversation for session {session_id}")
157
- # Use the higher confidence from detection or existing session
158
- existing_confidence = session_state.get("scam_confidence", 0.0)
159
- confidence = max(confidence, existing_confidence)
160
- scam_detected = True # It's a scam conversation
161
- else:
162
- logger.info(f"Scam detected for session {session_id}, confidence={confidence:.2f}")
163
-
164
- # Create honeypot agent and engage
165
- agent = HoneypotAgent()
166
-
167
- # Engage the agent
168
- result = agent.engage(message_text, session_state)
169
-
170
- # Extract intelligence from conversation
171
- full_text = " ".join(msg.get("message", "") for msg in result.get("messages", []))
172
- intel, extraction_confidence = extract_intelligence(full_text)
173
-
174
- # Update result with extracted intelligence
175
- result["extracted_intel"] = intel
176
- result["extraction_confidence"] = extraction_confidence
177
- result["scam_confidence"] = confidence
178
-
179
- # Save session state to Redis (with in-memory fallback)
180
- save_session_state_with_fallback(session_id, result)
181
-
182
- # Save conversation to PostgreSQL (optional, graceful degradation)
183
- try:
184
- from app.database.postgres import save_conversation
185
-
186
- conversation_data = {
187
- "language": detected_language,
188
- "persona": result.get("persona"),
189
- "scam_confidence": confidence,
190
- "turn_count": result.get("turn_count", 1),
191
- "messages": result.get("messages", []),
192
- "extracted_intel": intel,
193
- "extraction_confidence": extraction_confidence,
194
- }
195
-
196
- conversation_id = save_conversation(session_id, conversation_data)
197
- if conversation_id > 0:
198
- logger.debug(f"Conversation saved to PostgreSQL: id={conversation_id}")
199
- except Exception as e:
200
- # PostgreSQL save failed, but continue - Redis already saved the session
201
- logger.warning(f"Failed to save conversation to PostgreSQL: {e}")
202
- logger.info("Session state saved to Redis, continuing without PostgreSQL persistence")
203
-
204
- # Build conversation history for response
205
- conversation_history_response = [
206
- MessageEntry(
207
- turn=msg.get("turn", 0),
208
- sender=msg.get("sender", "unknown"),
209
- message=msg.get("message", ""),
210
- timestamp=msg.get("timestamp", datetime.utcnow().isoformat() + "Z"),
211
- )
212
- for msg in result.get("messages", [])
213
- ]
214
-
215
- # Get the agent's response (last message from agent)
216
- agent_messages = [m for m in result.get("messages", []) if m.get("sender") == "agent"]
217
- agent_response = agent_messages[-1]["message"] if agent_messages else "Engaged with scammer."
218
-
219
- # Build engagement info
220
- turn_count = result.get("turn_count", 1)
221
- max_turns = settings.MAX_TURNS
222
-
223
- engagement = EngagementInfo(
224
- agent_response=agent_response[:500], # Limit to 500 chars
225
- turn_count=turn_count,
226
- max_turns_reached=turn_count >= max_turns,
227
- strategy=result.get("strategy", "build_trust"),
228
- persona=result.get("persona"),
229
- )
230
-
231
- # Extract suspicious keywords for GUVI format
232
- messages_list = result.get("messages", [])
233
- suspicious_keywords = extract_suspicious_keywords(messages_list, scam_indicators)
234
-
235
- # Build extracted intelligence
236
- extracted_intelligence = ExtractedIntelligence(
237
- upi_ids=intel.get("upi_ids", []),
238
- bank_accounts=intel.get("bank_accounts", []),
239
- ifsc_codes=intel.get("ifsc_codes", []),
240
- phone_numbers=intel.get("phone_numbers", []),
241
- phishing_links=intel.get("phishing_links", []),
242
- suspicious_keywords=suspicious_keywords,
243
- extraction_confidence=extraction_confidence,
244
- )
245
-
246
- # Generate agent notes (summary of scammer behavior)
247
- agent_notes = generate_agent_notes(messages_list, intel, scam_indicators)
248
-
249
- # Check if we should send GUVI callback
250
- max_turns_reached = turn_count >= max_turns
251
- terminated = result.get("terminated", False)
252
-
253
- if should_send_callback(turn_count, max_turns_reached, extraction_confidence, terminated):
254
- # Send callback to GUVI (async-safe, non-blocking)
255
- try:
256
- total_messages = len(messages_list)
257
- callback_success = send_final_result_to_guvi(
258
- session_id=session_id,
259
- scam_detected=True,
260
- total_messages=total_messages,
261
- extracted_intel=intel,
262
- messages=messages_list,
263
- scam_indicators=scam_indicators,
264
- agent_notes=agent_notes,
265
- )
266
- if callback_success:
267
- logger.info(f"GUVI callback sent successfully for session {session_id}")
268
- else:
269
- logger.warning(f"GUVI callback failed for session {session_id}")
270
- except Exception as e:
271
- logger.error(f"GUVI callback error: {e}")
272
-
273
- # Calculate final processing time
274
- processing_time_ms = int((time.time() - start_time) * 1000)
275
-
276
- logger.info(
277
- f"Engagement complete for session {session_id}: "
278
- f"turn={turn_count}, strategy={engagement.strategy}, "
279
- f"intel_count={len(intel.get('upi_ids', [])) + len(intel.get('phone_numbers', []))}"
280
- )
281
-
282
- return EngageResponse(
283
- status="success",
284
- scam_detected=True,
285
- confidence=confidence,
286
- language_detected=detected_language,
287
- session_id=session_id,
288
- reply=agent_response, # GUVI format requirement
289
- agent_notes=agent_notes, # Summary of scammer behavior
290
- engagement=engagement,
291
- extracted_intelligence=extracted_intelligence,
292
- conversation_history=conversation_history_response,
293
- metadata=ResponseMetadata(
294
- processing_time_ms=processing_time_ms,
295
- model_version="1.0.0",
296
- detection_model="indic-bert",
297
- engagement_model="groq-llama-3.1-70b",
298
- ),
299
- )
300
-
301
- except ValueError as e:
302
- logger.warning(f"Validation error: {e}")
303
- raise HTTPException(
304
- status_code=400,
305
- detail={
306
- "code": "VALIDATION_ERROR",
307
- "message": str(e),
308
- },
309
- )
310
- except Exception as e:
311
- logger.error(f"Error processing engage request: {e}", exc_info=True)
312
- raise HTTPException(
313
- status_code=500,
314
- detail={
315
- "code": "INTERNAL_ERROR",
316
- "message": "An error occurred while processing your request",
317
- "details": str(e) if settings.DEBUG else None,
318
- },
319
- )
320
-
321
-
322
- @router.get("/honeypot/session/{session_id}", response_model=SessionResponse)
323
- async def get_session(session_id: str) -> SessionResponse:
324
- """
325
- Retrieve complete conversation history for a session.
326
-
327
- Args:
328
- session_id: UUID of the session to retrieve
329
-
330
- Returns:
331
- SessionResponse with conversation history and extracted intelligence
332
-
333
- Raises:
334
- HTTPException: 404 if session not found, 400 if invalid session ID
335
- """
336
- # Validate session_id format
337
- try:
338
- uuid.UUID(session_id)
339
- except ValueError:
340
- raise HTTPException(
341
- status_code=400,
342
- detail={
343
- "code": "INVALID_SESSION_ID",
344
- "message": "Session ID format invalid. Must be a valid UUID.",
345
- },
346
- )
347
-
348
- try:
349
- from app.database.redis_client import get_session_state_with_fallback
350
- from app.database.postgres import get_conversation
351
-
352
- # Try Redis first (active sessions)
353
- session_state = get_session_state_with_fallback(session_id)
354
-
355
- if session_state:
356
- # Build response from Redis session state
357
- messages = session_state.get("messages", [])
358
- conversation_history = [
359
- MessageEntry(
360
- turn=msg.get("turn", 0),
361
- sender=msg.get("sender", "unknown"),
362
- message=msg.get("message", ""),
363
- timestamp=msg.get("timestamp", datetime.utcnow().isoformat() + "Z"),
364
- )
365
- for msg in messages
366
- ]
367
-
368
- intel = session_state.get("extracted_intel", {})
369
- extracted_intelligence = ExtractedIntelligence(
370
- upi_ids=intel.get("upi_ids", []),
371
- bank_accounts=intel.get("bank_accounts", []),
372
- ifsc_codes=intel.get("ifsc_codes", []),
373
- phone_numbers=intel.get("phone_numbers", []),
374
- phishing_links=intel.get("phishing_links", []),
375
- extraction_confidence=session_state.get("extraction_confidence", 0.0),
376
- )
377
-
378
- # Get timestamps from first and last messages
379
- created_at = messages[0].get("timestamp") if messages else datetime.utcnow().isoformat() + "Z"
380
- updated_at = messages[-1].get("timestamp") if messages else datetime.utcnow().isoformat() + "Z"
381
-
382
- return SessionResponse(
383
- session_id=session_id,
384
- language=session_state.get("language", "en"),
385
- persona=session_state.get("persona"),
386
- scam_confidence=session_state.get("scam_confidence", 0.0),
387
- turn_count=session_state.get("turn_count", 0),
388
- conversation_history=conversation_history,
389
- extracted_intelligence=extracted_intelligence,
390
- created_at=created_at,
391
- updated_at=updated_at,
392
- )
393
-
394
- # Try PostgreSQL for archived sessions
395
- conversation = get_conversation(session_id)
396
-
397
- if conversation:
398
- messages = conversation.get("messages", [])
399
- conversation_history = [
400
- MessageEntry(
401
- turn=msg.get("turn", 0),
402
- sender=msg.get("sender", "unknown"),
403
- message=msg.get("message", ""),
404
- timestamp=msg.get("timestamp", datetime.utcnow().isoformat() + "Z"),
405
- )
406
- for msg in messages
407
- ]
408
-
409
- intel = conversation.get("extracted_intel", {})
410
- extracted_intelligence = ExtractedIntelligence(
411
- upi_ids=intel.get("upi_ids", []),
412
- bank_accounts=intel.get("bank_accounts", []),
413
- ifsc_codes=intel.get("ifsc_codes", []),
414
- phone_numbers=intel.get("phone_numbers", []),
415
- phishing_links=intel.get("phishing_links", []),
416
- extraction_confidence=conversation.get("extraction_confidence", 0.0),
417
- )
418
-
419
- return SessionResponse(
420
- session_id=session_id,
421
- language=conversation.get("language", "en"),
422
- persona=conversation.get("persona"),
423
- scam_confidence=conversation.get("scam_confidence", 0.0),
424
- turn_count=conversation.get("turn_count", 0),
425
- conversation_history=conversation_history,
426
- extracted_intelligence=extracted_intelligence,
427
- created_at=conversation.get("created_at", datetime.utcnow().isoformat() + "Z"),
428
- updated_at=conversation.get("updated_at", datetime.utcnow().isoformat() + "Z"),
429
- )
430
-
431
- # Session not found in either Redis or PostgreSQL
432
- raise HTTPException(
433
- status_code=404,
434
- detail={
435
- "code": "SESSION_NOT_FOUND",
436
- "message": "No conversation found for the provided session ID",
437
- "session_id": session_id,
438
- },
439
- )
440
-
441
- except HTTPException:
442
- raise
443
- except Exception as e:
444
- logger.error(f"Error retrieving session {session_id}: {e}", exc_info=True)
445
- raise HTTPException(
446
- status_code=500,
447
- detail={
448
- "code": "INTERNAL_ERROR",
449
- "message": "An error occurred while retrieving the session",
450
- },
451
- )
452
-
453
-
454
- @router.get("/health", response_model=HealthResponse)
455
- async def health_check() -> HealthResponse:
456
- """
457
- Service health check for monitoring and load balancing.
458
-
459
- Returns:
460
- HealthResponse with service status and dependency health
461
- """
462
- from app.main import get_uptime_seconds
463
-
464
- # Check dependency health
465
- groq_status = "online"
466
- postgres_status = "offline"
467
- redis_status = "offline"
468
- models_loaded = False
469
-
470
- # Check Redis
471
- try:
472
- from app.database.redis_client import health_check as redis_health
473
- redis_status = "online" if redis_health() else "offline"
474
- except Exception as e:
475
- logger.warning(f"Redis health check failed: {e}")
476
- redis_status = "offline"
477
-
478
- # Check PostgreSQL
479
- try:
480
- from app.database.postgres import verify_schema
481
- postgres_status = "online" if verify_schema() else "degraded"
482
- except Exception as e:
483
- logger.warning(f"PostgreSQL health check failed: {e}")
484
- postgres_status = "offline"
485
-
486
- # Check Groq API (just check if API key is configured)
487
- try:
488
- groq_status = "online" if settings.GROQ_API_KEY else "not_configured"
489
- except Exception:
490
- groq_status = "unknown"
491
-
492
- # Check if models are loaded
493
- try:
494
- from app.models.detector import get_detector
495
- detector = get_detector()
496
- models_loaded = detector is not None
497
- except Exception:
498
- models_loaded = False
499
-
500
- # Determine overall status
501
- if redis_status == "offline" and postgres_status == "offline":
502
- overall_status = "unhealthy"
503
- elif redis_status == "offline" or postgres_status == "offline":
504
- overall_status = "degraded"
505
- else:
506
- overall_status = "healthy"
507
-
508
- return HealthResponse(
509
- status=overall_status,
510
- version="1.0.0",
511
- timestamp=datetime.utcnow().isoformat() + "Z",
512
- dependencies=HealthDependencies(
513
- groq_api=groq_status,
514
- postgres=postgres_status,
515
- redis=redis_status,
516
- models_loaded=models_loaded,
517
- ),
518
- uptime_seconds=get_uptime_seconds(),
519
- )
520
-
521
-
522
- @router.post("/honeypot/batch", response_model=BatchResponse)
523
- async def batch_process(request: BatchRequest) -> BatchResponse:
524
- """
525
- Process multiple messages in batch mode.
526
-
527
- Args:
528
- request: BatchRequest containing array of messages to process
529
-
530
- Returns:
531
- BatchResponse with processing results for each message
532
- """
533
- start_time = time.time()
534
-
535
- try:
536
- from app.models.detector import get_detector
537
- from app.models.language import detect_language
538
-
539
- detector = get_detector()
540
- results: List[BatchResultItem] = []
541
- failed_count = 0
542
-
543
- for msg_item in request.messages:
544
- try:
545
- # Detect language
546
- if msg_item.language == "auto":
547
- detected_language, _ = detect_language(msg_item.message)
548
- else:
549
- detected_language = msg_item.language
550
-
551
- # Run scam detection
552
- detection_result = detector.detect(msg_item.message, detected_language)
553
-
554
- results.append(
555
- BatchResultItem(
556
- id=msg_item.id,
557
- status="success",
558
- scam_detected=detection_result.get("scam_detected", False),
559
- confidence=detection_result.get("confidence", 0.0),
560
- language_detected=detected_language,
561
- )
562
- )
563
-
564
- except Exception as e:
565
- logger.warning(f"Batch item {msg_item.id} failed: {e}")
566
- failed_count += 1
567
- results.append(
568
- BatchResultItem(
569
- id=msg_item.id,
570
- status="error",
571
- error={
572
- "code": "PROCESSING_ERROR",
573
- "message": str(e),
574
- },
575
- )
576
- )
577
-
578
- processing_time_ms = int((time.time() - start_time) * 1000)
579
-
580
- return BatchResponse(
581
- status="success" if failed_count == 0 else "partial",
582
- processed=len(results) - failed_count,
583
- failed=failed_count,
584
- results=results,
585
- processing_time_ms=processing_time_ms,
586
- )
587
-
588
- except Exception as e:
589
- logger.error(f"Batch processing failed: {e}", exc_info=True)
590
- raise HTTPException(
591
- status_code=500,
592
- detail={
593
- "code": "BATCH_PROCESSING_ERROR",
594
- "message": "Failed to process batch request",
595
- },
596
- )
597
-
598
-
599
- # =====================================================
600
- # Helper Functions for GUVI Format Support
601
- # =====================================================
602
-
603
- def _parse_request(request_body: Dict[str, Any]) -> tuple:
604
- """
605
- Parse request body and normalize to internal format.
606
-
607
- Supports both our format and GUVI format.
608
-
609
- Args:
610
- request_body: Raw request body dictionary
611
-
612
- Returns:
613
- Tuple of (message_text, session_id, language, conversation_history)
614
- """
615
- # Check if this is GUVI format (nested message object or sessionId)
616
- is_guvi_format = (
617
- isinstance(request_body.get("message"), dict) or
618
- "sessionId" in request_body
619
- )
620
-
621
- if is_guvi_format:
622
- return _parse_guvi_format(request_body)
623
- else:
624
- return _parse_standard_format(request_body)
625
-
626
-
627
- def _parse_standard_format(request_body: Dict[str, Any]) -> tuple:
628
- """
629
- Parse our standard request format.
630
-
631
- Format: {"message": "text", "session_id": "uuid", "language": "auto"}
632
- """
633
- message_text = request_body.get("message", "")
634
-
635
- if not message_text or not isinstance(message_text, str):
636
- raise ValueError("message field is required and must be a string")
637
-
638
- session_id = request_body.get("session_id")
639
- language = request_body.get("language", "auto")
640
-
641
- return message_text, session_id, language, None
642
-
643
-
644
- def _parse_guvi_format(request_body: Dict[str, Any]) -> tuple:
645
- """
646
- Parse GUVI's request format.
647
-
648
- Format:
649
- {
650
- "sessionId": "id",
651
- "message": {"sender": "scammer", "text": "...", "timestamp": "..."},
652
- "conversationHistory": [...],
653
- "metadata": {"channel": "SMS", "language": "English", "locale": "IN"}
654
- }
655
- """
656
- # Extract message text from nested object
657
- message_obj = request_body.get("message", {})
658
-
659
- if isinstance(message_obj, dict):
660
- message_text = message_obj.get("text", "")
661
- else:
662
- # Fallback: message might be a string in hybrid format
663
- message_text = str(message_obj) if message_obj else ""
664
-
665
- if not message_text:
666
- raise ValueError("message.text field is required")
667
-
668
- # Get session ID (GUVI uses camelCase)
669
- session_id = request_body.get("sessionId") or request_body.get("session_id")
670
-
671
- # Get language from metadata
672
- metadata = request_body.get("metadata", {})
673
- guvi_language = metadata.get("language", "").lower() if metadata else ""
674
-
675
- # Map GUVI language names to our codes
676
- language_map = {
677
- "english": "en",
678
- "hindi": "hi",
679
- "hinglish": "hinglish",
680
- }
681
- language = language_map.get(guvi_language, "auto")
682
-
683
- # Parse conversation history
684
- conversation_history = request_body.get("conversationHistory", [])
685
-
686
- # Normalize conversation history format
687
- normalized_history = []
688
- for item in conversation_history:
689
- if isinstance(item, dict):
690
- # Map 'user' sender to 'agent' for our internal format
691
- sender = item.get("sender", "scammer")
692
- if sender == "user":
693
- sender = "agent"
694
-
695
- normalized_history.append({
696
- "sender": sender,
697
- "message": item.get("text", ""),
698
- "timestamp": item.get("timestamp", datetime.utcnow().isoformat() + "Z"),
699
- })
700
-
701
- return message_text, session_id, language, normalized_history
702
-
703
-
704
- def _rebuild_session_from_history(
705
- conversation_history: List[Dict],
706
- language: str,
707
- ) -> Dict[str, Any]:
708
- """
709
- Rebuild session state from GUVI conversation history.
710
-
711
- When GUVI sends conversationHistory, we need to reconstruct
712
- the session state to continue the conversation properly.
713
-
714
- Args:
715
- conversation_history: List of previous messages
716
- language: Detected/provided language
717
-
718
- Returns:
719
- Session state dictionary
720
- """
721
- from app.agent.personas import select_persona
722
-
723
- # Build messages list with turn numbers
724
- messages = []
725
- for i, msg in enumerate(conversation_history):
726
- messages.append({
727
- "turn": i + 1,
728
- "sender": msg.get("sender", "scammer"),
729
- "message": msg.get("message", ""),
730
- "timestamp": msg.get("timestamp", datetime.utcnow().isoformat() + "Z"),
731
- })
732
-
733
- turn_count = len(messages)
734
-
735
- # Select persona based on conversation content
736
- full_text = " ".join(m.get("message", "") for m in messages)
737
- persona = select_persona("unknown", language)
738
-
739
- # Determine strategy based on turn count
740
- if turn_count < 5:
741
- strategy = "build_trust"
742
- elif turn_count < 12:
743
- strategy = "express_confusion"
744
- else:
745
- strategy = "probe_details"
746
-
747
- return {
748
- "messages": messages,
749
- "scam_confidence": 0.7, # Assume scam since history provided
750
- "turn_count": turn_count,
751
- "extracted_intel": {
752
- "upi_ids": [],
753
- "bank_accounts": [],
754
- "ifsc_codes": [],
755
- "phone_numbers": [],
756
- "phishing_links": [],
757
- },
758
- "extraction_confidence": 0.0,
759
- "strategy": strategy,
760
- "language": language,
761
- "persona": persona,
762
- "max_turns_reached": False,
763
- "terminated": False,
764
- }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ FastAPI endpoints for the ScamShield AI API.
3
+
4
+ Implements Task 8.1: FastAPI Endpoints
5
+
6
+ Acceptance Criteria:
7
+ - AC-4.1.1: Returns 200 OK for valid requests
8
+ - AC-4.1.2: Returns 400 for invalid input
9
+ - AC-4.1.3: Response matches schema
10
+ - AC-4.1.5: Response time <2s (p95)
11
+
12
+ GUVI Integration:
13
+ - Supports GUVI's exact input format with nested message object
14
+ - Includes x-api-key authentication
15
+ - Sends callback to GUVI evaluation endpoint on completion
16
+ """
17
+
18
+ from typing import Optional, List, Any, Dict, Union
19
+ from datetime import datetime
20
+ import uuid
21
+ import time
22
+
23
+ from fastapi import APIRouter, HTTPException, Request, Depends, Body
24
+
25
+ from app.api.schemas import (
26
+ EngageRequest,
27
+ EngageResponse,
28
+ HealthResponse,
29
+ BatchRequest,
30
+ BatchResponse,
31
+ SessionResponse,
32
+ ErrorResponse,
33
+ ExtractedIntelligence,
34
+ EngagementInfo,
35
+ MessageEntry,
36
+ ResponseMetadata,
37
+ HealthDependencies,
38
+ BatchResultItem,
39
+ UnifiedEngageRequest,
40
+ )
41
+ from app.api.auth import verify_api_key
42
+ from app.config import settings
43
+ from app.utils.logger import get_logger
44
+
45
+ logger = get_logger(__name__)
46
+
47
+ router = APIRouter(prefix="/api/v1", tags=["honeypot"])
48
+
49
+
50
+ @router.post("/honeypot/engage", response_model=EngageResponse, dependencies=[Depends(verify_api_key)])
51
+ async def engage_honeypot(request_body: Dict[str, Any] = Body(default={})) -> EngageResponse:
52
+ """
53
+ Detect scam messages and engage scammers with AI personas to extract intelligence.
54
+
55
+ This is the primary endpoint for the ScamShield AI system. It:
56
+ 1. Analyzes the incoming message for scam indicators
57
+ 2. If scam detected, engages using an appropriate persona
58
+ 3. Extracts financial intelligence from the conversation
59
+ 4. Returns structured response with engagement and intelligence data
60
+ 5. Sends callback to GUVI when engagement completes
61
+
62
+ Supports both formats:
63
+ - Our format: {"message": "text", "session_id": "uuid", "language": "auto"}
64
+ - GUVI format: {"sessionId": "id", "message": {"sender": "scammer", "text": "..."}, ...}
65
+
66
+ Args:
67
+ request_body: Request body (accepts both formats)
68
+
69
+ Returns:
70
+ EngageResponse with detection results, engagement, and extracted intelligence
71
+
72
+ Raises:
73
+ HTTPException: For validation errors or internal failures
74
+ """
75
+ start_time = time.time()
76
+
77
+ # Log the incoming request for debugging
78
+ logger.info(f"Received engage request: {request_body}")
79
+
80
+ try:
81
+ # Import required modules
82
+ from app.models.detector import ScamDetector, get_detector
83
+ from app.models.language import detect_language
84
+ from app.models.extractor import extract_intelligence
85
+ from app.agent.honeypot import HoneypotAgent
86
+ from app.agent.personas import select_persona
87
+ from app.database.redis_client import (
88
+ get_session_state_with_fallback,
89
+ save_session_state_with_fallback,
90
+ )
91
+ from app.utils.guvi_callback import (
92
+ send_final_result_to_guvi,
93
+ should_send_callback,
94
+ extract_suspicious_keywords,
95
+ generate_agent_notes,
96
+ )
97
+
98
+ # Parse request - detect format and normalize
99
+ message_text, session_id, language, conversation_history = _parse_request(request_body)
100
+
101
+ # Generate session ID if not provided
102
+ if not session_id:
103
+ session_id = str(uuid.uuid4())
104
+
105
+ # Detect language if auto
106
+ if language == "auto":
107
+ detected_language, _ = detect_language(message_text)
108
+ else:
109
+ detected_language = language
110
+
111
+ # Retrieve existing session state if session_id was provided
112
+ session_state = None
113
+ is_ongoing_scam_session = False
114
+ provided_session_id = request_body.get("session_id") or request_body.get("sessionId")
115
+ if provided_session_id:
116
+ session_state = get_session_state_with_fallback(provided_session_id)
117
+ # Check if this is an ongoing scam conversation
118
+ if session_state and session_state.get("turn_count", 0) > 0:
119
+ is_ongoing_scam_session = True
120
+ logger.info(f"Continuing existing scam session {session_id}, turn={session_state.get('turn_count', 0)}")
121
+
122
+ # If conversation history provided (GUVI format), rebuild session state
123
+ if conversation_history and not session_state:
124
+ session_state = _rebuild_session_from_history(conversation_history, detected_language)
125
+
126
+ # Run scam detection
127
+ detector = get_detector()
128
+ detection_result = detector.detect(message_text, detected_language)
129
+
130
+ scam_detected = detection_result.get("scam_detected", False)
131
+ confidence = detection_result.get("confidence", 0.0)
132
+ scam_indicators = detection_result.get("indicators", [])
133
+
134
+ # Calculate processing time so far
135
+ processing_time_ms = int((time.time() - start_time) * 1000)
136
+
137
+ # If not a scam AND not part of an ongoing scam conversation, return simple response
138
+ if not scam_detected and not is_ongoing_scam_session:
139
+ logger.info(f"No scam detected for session {session_id}, confidence={confidence:.2f}")
140
+
141
+ return EngageResponse(
142
+ status="success",
143
+ scam_detected=False,
144
+ confidence=confidence,
145
+ language_detected=detected_language,
146
+ session_id=session_id,
147
+ reply=None, # No reply for non-scam messages
148
+ message="No scam detected. Message appears legitimate.",
149
+ metadata=ResponseMetadata(
150
+ processing_time_ms=processing_time_ms,
151
+ model_version="1.0.0",
152
+ detection_model="indic-bert",
153
+ engagement_model=None,
154
+ ),
155
+ )
156
+
157
+ # Scam detected OR continuing an ongoing scam conversation - engage with honeypot agent
158
+ if is_ongoing_scam_session:
159
+ logger.info(f"Continuing scam conversation for session {session_id}")
160
+ # Use the higher confidence from detection or existing session
161
+ existing_confidence = session_state.get("scam_confidence", 0.0)
162
+ confidence = max(confidence, existing_confidence)
163
+ scam_detected = True # It's a scam conversation
164
+ else:
165
+ logger.info(f"Scam detected for session {session_id}, confidence={confidence:.2f}")
166
+
167
+ # Create honeypot agent and engage
168
+ agent = HoneypotAgent()
169
+
170
+ # Engage the agent
171
+ result = agent.engage(message_text, session_state)
172
+
173
+ # Extract intelligence from conversation
174
+ full_text = " ".join(msg.get("message", "") for msg in result.get("messages", []))
175
+ intel, extraction_confidence = extract_intelligence(full_text)
176
+
177
+ # Update result with extracted intelligence
178
+ result["extracted_intel"] = intel
179
+ result["extraction_confidence"] = extraction_confidence
180
+ result["scam_confidence"] = confidence
181
+
182
+ # Save session state to Redis (with in-memory fallback)
183
+ save_session_state_with_fallback(session_id, result)
184
+
185
+ # Save conversation to PostgreSQL (optional, graceful degradation)
186
+ try:
187
+ from app.database.postgres import save_conversation
188
+
189
+ conversation_data = {
190
+ "language": detected_language,
191
+ "persona": result.get("persona"),
192
+ "scam_confidence": confidence,
193
+ "turn_count": result.get("turn_count", 1),
194
+ "messages": result.get("messages", []),
195
+ "extracted_intel": intel,
196
+ "extraction_confidence": extraction_confidence,
197
+ }
198
+
199
+ conversation_id = save_conversation(session_id, conversation_data)
200
+ if conversation_id > 0:
201
+ logger.debug(f"Conversation saved to PostgreSQL: id={conversation_id}")
202
+ except Exception as e:
203
+ # PostgreSQL save failed, but continue - Redis already saved the session
204
+ logger.warning(f"Failed to save conversation to PostgreSQL: {e}")
205
+ logger.info("Session state saved to Redis, continuing without PostgreSQL persistence")
206
+
207
+ # Build conversation history for response
208
+ conversation_history_response = [
209
+ MessageEntry(
210
+ turn=msg.get("turn", 0),
211
+ sender=msg.get("sender", "unknown"),
212
+ message=msg.get("message", ""),
213
+ timestamp=msg.get("timestamp", datetime.utcnow().isoformat() + "Z"),
214
+ )
215
+ for msg in result.get("messages", [])
216
+ ]
217
+
218
+ # Get the agent's response (last message from agent)
219
+ agent_messages = [m for m in result.get("messages", []) if m.get("sender") == "agent"]
220
+ agent_response = agent_messages[-1]["message"] if agent_messages else "Engaged with scammer."
221
+
222
+ # Build engagement info
223
+ turn_count = result.get("turn_count", 1)
224
+ max_turns = settings.MAX_TURNS
225
+
226
+ engagement = EngagementInfo(
227
+ agent_response=agent_response[:500], # Limit to 500 chars
228
+ turn_count=turn_count,
229
+ max_turns_reached=turn_count >= max_turns,
230
+ strategy=result.get("strategy", "build_trust"),
231
+ persona=result.get("persona"),
232
+ )
233
+
234
+ # Extract suspicious keywords for GUVI format
235
+ messages_list = result.get("messages", [])
236
+ suspicious_keywords = extract_suspicious_keywords(messages_list, scam_indicators)
237
+
238
+ # Build extracted intelligence
239
+ extracted_intelligence = ExtractedIntelligence(
240
+ upi_ids=intel.get("upi_ids", []),
241
+ bank_accounts=intel.get("bank_accounts", []),
242
+ ifsc_codes=intel.get("ifsc_codes", []),
243
+ phone_numbers=intel.get("phone_numbers", []),
244
+ phishing_links=intel.get("phishing_links", []),
245
+ suspicious_keywords=suspicious_keywords,
246
+ extraction_confidence=extraction_confidence,
247
+ )
248
+
249
+ # Generate agent notes (summary of scammer behavior)
250
+ agent_notes = generate_agent_notes(messages_list, intel, scam_indicators)
251
+
252
+ # Check if we should send GUVI callback
253
+ max_turns_reached = turn_count >= max_turns
254
+ terminated = result.get("terminated", False)
255
+
256
+ if should_send_callback(turn_count, max_turns_reached, extraction_confidence, terminated):
257
+ # Send callback to GUVI (async-safe, non-blocking)
258
+ try:
259
+ total_messages = len(messages_list)
260
+ callback_success = send_final_result_to_guvi(
261
+ session_id=session_id,
262
+ scam_detected=True,
263
+ total_messages=total_messages,
264
+ extracted_intel=intel,
265
+ messages=messages_list,
266
+ scam_indicators=scam_indicators,
267
+ agent_notes=agent_notes,
268
+ )
269
+ if callback_success:
270
+ logger.info(f"GUVI callback sent successfully for session {session_id}")
271
+ else:
272
+ logger.warning(f"GUVI callback failed for session {session_id}")
273
+ except Exception as e:
274
+ logger.error(f"GUVI callback error: {e}")
275
+
276
+ # Calculate final processing time
277
+ processing_time_ms = int((time.time() - start_time) * 1000)
278
+
279
+ logger.info(
280
+ f"Engagement complete for session {session_id}: "
281
+ f"turn={turn_count}, strategy={engagement.strategy}, "
282
+ f"intel_count={len(intel.get('upi_ids', [])) + len(intel.get('phone_numbers', []))}"
283
+ )
284
+
285
+ return EngageResponse(
286
+ status="success",
287
+ scam_detected=True,
288
+ confidence=confidence,
289
+ language_detected=detected_language,
290
+ session_id=session_id,
291
+ reply=agent_response, # GUVI format requirement
292
+ agent_notes=agent_notes, # Summary of scammer behavior
293
+ engagement=engagement,
294
+ extracted_intelligence=extracted_intelligence,
295
+ conversation_history=conversation_history_response,
296
+ metadata=ResponseMetadata(
297
+ processing_time_ms=processing_time_ms,
298
+ model_version="1.0.0",
299
+ detection_model="indic-bert",
300
+ engagement_model="groq-llama-3.1-70b",
301
+ ),
302
+ )
303
+
304
+ except ValueError as e:
305
+ logger.warning(f"Validation error: {e}")
306
+ raise HTTPException(
307
+ status_code=400,
308
+ detail={
309
+ "status": "error",
310
+ "code": "INVALID_REQUEST_BODY",
311
+ "message": str(e),
312
+ },
313
+ )
314
+ except Exception as e:
315
+ logger.error(f"Error processing engage request: {e}", exc_info=True)
316
+ raise HTTPException(
317
+ status_code=500,
318
+ detail={
319
+ "status": "error",
320
+ "code": "INTERNAL_ERROR",
321
+ "message": "An error occurred while processing your request",
322
+ "details": str(e) if settings.DEBUG else None,
323
+ },
324
+ )
325
+
326
+
327
+ @router.get("/honeypot/session/{session_id}", response_model=SessionResponse)
328
+ async def get_session(session_id: str) -> SessionResponse:
329
+ """
330
+ Retrieve complete conversation history for a session.
331
+
332
+ Args:
333
+ session_id: UUID of the session to retrieve
334
+
335
+ Returns:
336
+ SessionResponse with conversation history and extracted intelligence
337
+
338
+ Raises:
339
+ HTTPException: 404 if session not found, 400 if invalid session ID
340
+ """
341
+ # Validate session_id format
342
+ try:
343
+ uuid.UUID(session_id)
344
+ except ValueError:
345
+ raise HTTPException(
346
+ status_code=400,
347
+ detail={
348
+ "code": "INVALID_SESSION_ID",
349
+ "message": "Session ID format invalid. Must be a valid UUID.",
350
+ },
351
+ )
352
+
353
+ try:
354
+ from app.database.redis_client import get_session_state_with_fallback
355
+ from app.database.postgres import get_conversation
356
+
357
+ # Try Redis first (active sessions)
358
+ session_state = get_session_state_with_fallback(session_id)
359
+
360
+ if session_state:
361
+ # Build response from Redis session state
362
+ messages = session_state.get("messages", [])
363
+ conversation_history = [
364
+ MessageEntry(
365
+ turn=msg.get("turn", 0),
366
+ sender=msg.get("sender", "unknown"),
367
+ message=msg.get("message", ""),
368
+ timestamp=msg.get("timestamp", datetime.utcnow().isoformat() + "Z"),
369
+ )
370
+ for msg in messages
371
+ ]
372
+
373
+ intel = session_state.get("extracted_intel", {})
374
+ extracted_intelligence = ExtractedIntelligence(
375
+ upi_ids=intel.get("upi_ids", []),
376
+ bank_accounts=intel.get("bank_accounts", []),
377
+ ifsc_codes=intel.get("ifsc_codes", []),
378
+ phone_numbers=intel.get("phone_numbers", []),
379
+ phishing_links=intel.get("phishing_links", []),
380
+ extraction_confidence=session_state.get("extraction_confidence", 0.0),
381
+ )
382
+
383
+ # Get timestamps from first and last messages
384
+ created_at = messages[0].get("timestamp") if messages else datetime.utcnow().isoformat() + "Z"
385
+ updated_at = messages[-1].get("timestamp") if messages else datetime.utcnow().isoformat() + "Z"
386
+
387
+ return SessionResponse(
388
+ session_id=session_id,
389
+ language=session_state.get("language", "en"),
390
+ persona=session_state.get("persona"),
391
+ scam_confidence=session_state.get("scam_confidence", 0.0),
392
+ turn_count=session_state.get("turn_count", 0),
393
+ conversation_history=conversation_history,
394
+ extracted_intelligence=extracted_intelligence,
395
+ created_at=created_at,
396
+ updated_at=updated_at,
397
+ )
398
+
399
+ # Try PostgreSQL for archived sessions
400
+ conversation = get_conversation(session_id)
401
+
402
+ if conversation:
403
+ messages = conversation.get("messages", [])
404
+ conversation_history = [
405
+ MessageEntry(
406
+ turn=msg.get("turn", 0),
407
+ sender=msg.get("sender", "unknown"),
408
+ message=msg.get("message", ""),
409
+ timestamp=msg.get("timestamp", datetime.utcnow().isoformat() + "Z"),
410
+ )
411
+ for msg in messages
412
+ ]
413
+
414
+ intel = conversation.get("extracted_intel", {})
415
+ extracted_intelligence = ExtractedIntelligence(
416
+ upi_ids=intel.get("upi_ids", []),
417
+ bank_accounts=intel.get("bank_accounts", []),
418
+ ifsc_codes=intel.get("ifsc_codes", []),
419
+ phone_numbers=intel.get("phone_numbers", []),
420
+ phishing_links=intel.get("phishing_links", []),
421
+ extraction_confidence=conversation.get("extraction_confidence", 0.0),
422
+ )
423
+
424
+ return SessionResponse(
425
+ session_id=session_id,
426
+ language=conversation.get("language", "en"),
427
+ persona=conversation.get("persona"),
428
+ scam_confidence=conversation.get("scam_confidence", 0.0),
429
+ turn_count=conversation.get("turn_count", 0),
430
+ conversation_history=conversation_history,
431
+ extracted_intelligence=extracted_intelligence,
432
+ created_at=conversation.get("created_at", datetime.utcnow().isoformat() + "Z"),
433
+ updated_at=conversation.get("updated_at", datetime.utcnow().isoformat() + "Z"),
434
+ )
435
+
436
+ # Session not found in either Redis or PostgreSQL
437
+ raise HTTPException(
438
+ status_code=404,
439
+ detail={
440
+ "code": "SESSION_NOT_FOUND",
441
+ "message": "No conversation found for the provided session ID",
442
+ "session_id": session_id,
443
+ },
444
+ )
445
+
446
+ except HTTPException:
447
+ raise
448
+ except Exception as e:
449
+ logger.error(f"Error retrieving session {session_id}: {e}", exc_info=True)
450
+ raise HTTPException(
451
+ status_code=500,
452
+ detail={
453
+ "code": "INTERNAL_ERROR",
454
+ "message": "An error occurred while retrieving the session",
455
+ },
456
+ )
457
+
458
+
459
+ @router.get("/health", response_model=HealthResponse)
460
+ async def health_check() -> HealthResponse:
461
+ """
462
+ Service health check for monitoring and load balancing.
463
+
464
+ Returns:
465
+ HealthResponse with service status and dependency health
466
+ """
467
+ from app.main import get_uptime_seconds
468
+
469
+ # Check dependency health
470
+ groq_status = "online"
471
+ postgres_status = "offline"
472
+ redis_status = "offline"
473
+ models_loaded = False
474
+
475
+ # Check Redis
476
+ try:
477
+ from app.database.redis_client import health_check as redis_health
478
+ redis_status = "online" if redis_health() else "offline"
479
+ except Exception as e:
480
+ logger.warning(f"Redis health check failed: {e}")
481
+ redis_status = "offline"
482
+
483
+ # Check PostgreSQL
484
+ try:
485
+ from app.database.postgres import verify_schema
486
+ postgres_status = "online" if verify_schema() else "degraded"
487
+ except Exception as e:
488
+ logger.warning(f"PostgreSQL health check failed: {e}")
489
+ postgres_status = "offline"
490
+
491
+ # Check Groq API (just check if API key is configured)
492
+ try:
493
+ groq_status = "online" if settings.GROQ_API_KEY else "not_configured"
494
+ except Exception:
495
+ groq_status = "unknown"
496
+
497
+ # Check if models are loaded
498
+ try:
499
+ from app.models.detector import get_detector
500
+ detector = get_detector()
501
+ models_loaded = detector is not None
502
+ except Exception:
503
+ models_loaded = False
504
+
505
+ # Determine overall status
506
+ if redis_status == "offline" and postgres_status == "offline":
507
+ overall_status = "unhealthy"
508
+ elif redis_status == "offline" or postgres_status == "offline":
509
+ overall_status = "degraded"
510
+ else:
511
+ overall_status = "healthy"
512
+
513
+ return HealthResponse(
514
+ status=overall_status,
515
+ version="1.0.0",
516
+ timestamp=datetime.utcnow().isoformat() + "Z",
517
+ dependencies=HealthDependencies(
518
+ groq_api=groq_status,
519
+ postgres=postgres_status,
520
+ redis=redis_status,
521
+ models_loaded=models_loaded,
522
+ ),
523
+ uptime_seconds=get_uptime_seconds(),
524
+ )
525
+
526
+
527
+ @router.post("/honeypot/batch", response_model=BatchResponse)
528
+ async def batch_process(request: BatchRequest) -> BatchResponse:
529
+ """
530
+ Process multiple messages in batch mode.
531
+
532
+ Args:
533
+ request: BatchRequest containing array of messages to process
534
+
535
+ Returns:
536
+ BatchResponse with processing results for each message
537
+ """
538
+ start_time = time.time()
539
+
540
+ try:
541
+ from app.models.detector import get_detector
542
+ from app.models.language import detect_language
543
+
544
+ detector = get_detector()
545
+ results: List[BatchResultItem] = []
546
+ failed_count = 0
547
+
548
+ for msg_item in request.messages:
549
+ try:
550
+ # Detect language
551
+ if msg_item.language == "auto":
552
+ detected_language, _ = detect_language(msg_item.message)
553
+ else:
554
+ detected_language = msg_item.language
555
+
556
+ # Run scam detection
557
+ detection_result = detector.detect(msg_item.message, detected_language)
558
+
559
+ results.append(
560
+ BatchResultItem(
561
+ id=msg_item.id,
562
+ status="success",
563
+ scam_detected=detection_result.get("scam_detected", False),
564
+ confidence=detection_result.get("confidence", 0.0),
565
+ language_detected=detected_language,
566
+ )
567
+ )
568
+
569
+ except Exception as e:
570
+ logger.warning(f"Batch item {msg_item.id} failed: {e}")
571
+ failed_count += 1
572
+ results.append(
573
+ BatchResultItem(
574
+ id=msg_item.id,
575
+ status="error",
576
+ error={
577
+ "code": "PROCESSING_ERROR",
578
+ "message": str(e),
579
+ },
580
+ )
581
+ )
582
+
583
+ processing_time_ms = int((time.time() - start_time) * 1000)
584
+
585
+ return BatchResponse(
586
+ status="success" if failed_count == 0 else "partial",
587
+ processed=len(results) - failed_count,
588
+ failed=failed_count,
589
+ results=results,
590
+ processing_time_ms=processing_time_ms,
591
+ )
592
+
593
+ except Exception as e:
594
+ logger.error(f"Batch processing failed: {e}", exc_info=True)
595
+ raise HTTPException(
596
+ status_code=500,
597
+ detail={
598
+ "code": "BATCH_PROCESSING_ERROR",
599
+ "message": "Failed to process batch request",
600
+ },
601
+ )
602
+
603
+
604
+ # =====================================================
605
+ # Helper Functions for GUVI Format Support
606
+ # =====================================================
607
+
608
+ def _parse_request(request_body: Dict[str, Any]) -> tuple:
609
+ """
610
+ Parse request body and normalize to internal format.
611
+
612
+ Supports both our format and GUVI format.
613
+ Also handles test/empty payloads gracefully for API validation.
614
+
615
+ Args:
616
+ request_body: Raw request body dictionary
617
+
618
+ Returns:
619
+ Tuple of (message_text, session_id, language, conversation_history)
620
+ """
621
+ # Handle empty or None request body (API test/validation request)
622
+ if not request_body:
623
+ logger.info("Empty request body received - treating as API test")
624
+ return "API test message", None, "auto", None
625
+
626
+ # Check if this is GUVI format (nested message object or sessionId)
627
+ is_guvi_format = (
628
+ isinstance(request_body.get("message"), dict) or
629
+ "sessionId" in request_body
630
+ )
631
+
632
+ if is_guvi_format:
633
+ return _parse_guvi_format(request_body)
634
+ else:
635
+ return _parse_standard_format(request_body)
636
+
637
+
638
+ def _parse_standard_format(request_body: Dict[str, Any]) -> tuple:
639
+ """
640
+ Parse our standard request format.
641
+
642
+ Format: {"message": "text", "session_id": "uuid", "language": "auto"}
643
+
644
+ Also handles test payloads with minimal/missing fields.
645
+ """
646
+ message_text = request_body.get("message", "")
647
+
648
+ # Handle missing or empty message gracefully for test requests
649
+ if not message_text:
650
+ # Check if there's any text field as fallback
651
+ message_text = request_body.get("text", "")
652
+
653
+ if not message_text:
654
+ # If still empty, this might be an API test - use default test message
655
+ logger.info("No message field found - using default test message")
656
+ message_text = "Test message for API validation"
657
+
658
+ # Ensure message is a string
659
+ if not isinstance(message_text, str):
660
+ message_text = str(message_text)
661
+
662
+ session_id = request_body.get("session_id")
663
+ language = request_body.get("language", "auto")
664
+
665
+ return message_text, session_id, language, None
666
+
667
+
668
+ def _parse_guvi_format(request_body: Dict[str, Any]) -> tuple:
669
+ """
670
+ Parse GUVI's request format.
671
+
672
+ Format:
673
+ {
674
+ "sessionId": "id",
675
+ "message": {"sender": "scammer", "text": "...", "timestamp": "..."},
676
+ "conversationHistory": [...],
677
+ "metadata": {"channel": "SMS", "language": "English", "locale": "IN"}
678
+ }
679
+
680
+ Also handles test payloads with minimal/missing fields for API validation.
681
+ """
682
+ # Extract message text from nested object
683
+ message_obj = request_body.get("message", {})
684
+
685
+ if isinstance(message_obj, dict):
686
+ message_text = message_obj.get("text", "")
687
+ else:
688
+ # Fallback: message might be a string in hybrid format
689
+ message_text = str(message_obj) if message_obj else ""
690
+
691
+ # Handle missing message gracefully for test/validation requests
692
+ if not message_text:
693
+ # This might be an API test request with just sessionId
694
+ logger.info("No message.text found in GUVI format - using default test message")
695
+ message_text = "Test message for API validation"
696
+
697
+ # Get session ID (GUVI uses camelCase)
698
+ session_id = request_body.get("sessionId") or request_body.get("session_id")
699
+
700
+ # Get language from metadata
701
+ metadata = request_body.get("metadata", {})
702
+ guvi_language = metadata.get("language", "").lower() if metadata else ""
703
+
704
+ # Map GUVI language names to our codes
705
+ language_map = {
706
+ "english": "en",
707
+ "hindi": "hi",
708
+ "hinglish": "hinglish",
709
+ }
710
+ language = language_map.get(guvi_language, "auto")
711
+
712
+ # Parse conversation history
713
+ conversation_history = request_body.get("conversationHistory", [])
714
+
715
+ # Normalize conversation history format
716
+ normalized_history = []
717
+ for item in conversation_history:
718
+ if isinstance(item, dict):
719
+ # Map 'user' sender to 'agent' for our internal format
720
+ sender = item.get("sender", "scammer")
721
+ if sender == "user":
722
+ sender = "agent"
723
+
724
+ normalized_history.append({
725
+ "sender": sender,
726
+ "message": item.get("text", ""),
727
+ "timestamp": item.get("timestamp", datetime.utcnow().isoformat() + "Z"),
728
+ })
729
+
730
+ return message_text, session_id, language, normalized_history
731
+
732
+
733
+ def _rebuild_session_from_history(
734
+ conversation_history: List[Dict],
735
+ language: str,
736
+ ) -> Dict[str, Any]:
737
+ """
738
+ Rebuild session state from GUVI conversation history.
739
+
740
+ When GUVI sends conversationHistory, we need to reconstruct
741
+ the session state to continue the conversation properly.
742
+
743
+ Args:
744
+ conversation_history: List of previous messages
745
+ language: Detected/provided language
746
+
747
+ Returns:
748
+ Session state dictionary
749
+ """
750
+ from app.agent.personas import select_persona
751
+
752
+ # Build messages list with turn numbers
753
+ messages = []
754
+ for i, msg in enumerate(conversation_history):
755
+ messages.append({
756
+ "turn": i + 1,
757
+ "sender": msg.get("sender", "scammer"),
758
+ "message": msg.get("message", ""),
759
+ "timestamp": msg.get("timestamp", datetime.utcnow().isoformat() + "Z"),
760
+ })
761
+
762
+ turn_count = len(messages)
763
+
764
+ # Select persona based on conversation content
765
+ full_text = " ".join(m.get("message", "") for m in messages)
766
+ persona = select_persona("unknown", language)
767
+
768
+ # Determine strategy based on turn count
769
+ if turn_count < 5:
770
+ strategy = "build_trust"
771
+ elif turn_count < 12:
772
+ strategy = "express_confusion"
773
+ else:
774
+ strategy = "probe_details"
775
+
776
+ return {
777
+ "messages": messages,
778
+ "scam_confidence": 0.7, # Assume scam since history provided
779
+ "turn_count": turn_count,
780
+ "extracted_intel": {
781
+ "upi_ids": [],
782
+ "bank_accounts": [],
783
+ "ifsc_codes": [],
784
+ "phone_numbers": [],
785
+ "phishing_links": [],
786
+ },
787
+ "extraction_confidence": 0.0,
788
+ "strategy": strategy,
789
+ "language": language,
790
+ "persona": persona,
791
+ "max_turns_reached": False,
792
+ "terminated": False,
793
+ }