Spaces:
Sleeping
Sleeping
| import os | |
| import pymssql | |
| import time | |
| import base64 | |
| import uuid | |
| import pickle | |
| import requests | |
| import pyodbc | |
| import faiss | |
| import numpy as np | |
| import cv2 | |
| from mtcnn import MTCNN | |
| # Robust import for FER, works with different fer versions | |
| try: | |
| from fer import FER | |
| except ImportError: | |
| from fer.fer import FER | |
| from dotenv import load_dotenv | |
| from flask import Flask, request, jsonify | |
| from werkzeug.security import generate_password_hash, check_password_hash | |
| from flask_cors import CORS | |
| from sentence_transformers import SentenceTransformer | |
| import json | |
| # ------------------------------------------------------------ | |
| # INITIAL SETUP | |
| # ------------------------------------------------------------ | |
| app = Flask(__name__) | |
| CORS(app, | |
| resources={r"/*": {"origins": "*"}}, | |
| supports_credentials=True, | |
| allow_headers=["Content-Type", "Authorization"], | |
| expose_headers=["Content-Type", "Authorization"], | |
| methods=["GET", "POST", "PUT", "DELETE", "OPTIONS", "PATCH"]) | |
| def handle_options_request(): | |
| if request.method == "OPTIONS": | |
| return jsonify({"status": "CORS Preflight OK"}), 200 | |
| # ------------------------------------------------------------ | |
| # ENVIRONMENT VARIABLES | |
| # ------------------------------------------------------------ | |
| BASE_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| load_dotenv(os.path.join(BASE_DIR, ".env")) | |
| OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") | |
| if not OPENAI_API_KEY: | |
| print("⚠️ Warning: OPENAI_API_KEY not found. OpenAI features will be disabled.") | |
| # ------------------------------------------------------------ | |
| MODE = os.getenv("MODE", "local").lower() | |
| USE_DATABASE = os.getenv("USE_DATABASE", "1") == "1" | |
| DB_HOST = os.getenv("DB_HOST") or os.getenv("RDS_SQL_SERVER", r"localhost") | |
| DB_PORT = int(os.getenv("DB_PORT") or os.getenv("RDS_SQL_PORT", "1433")) | |
| DB_NAME = os.getenv("DB_NAME") or os.getenv("RDS_SQL_DATABASE", "PyDetect") | |
| DB_USER = os.getenv("DB_USER") or os.getenv("RDS_SQL_USER", "") | |
| DB_PASSWORD = os.getenv("DB_PASSWORD") or os.getenv("RDS_SQL_PASSWORD", "") | |
| if USE_DATABASE: | |
| print(f"🔍 Database mode enabled. Target SQL Server: {DB_HOST}:{DB_PORT}, db={DB_NAME}") | |
| else: | |
| print("ℹ️ USE_DATABASE=0 - running without database (demo mode).") | |
| def get_db_connection(): | |
| """Return connection to SQL Server using pymssql. | |
| - On local: you can point DB_HOST to your local SQL Server or to AWS. | |
| - On Hugging Face: DB_HOST should be your AWS RDS endpoint. | |
| """ | |
| if not USE_DATABASE: | |
| return None | |
| try: | |
| conn = pymssql.connect( | |
| server=DB_HOST, | |
| user=DB_USER, | |
| password=DB_PASSWORD, | |
| database=DB_NAME, | |
| port=DB_PORT, | |
| login_timeout=10, | |
| timeout=30, | |
| as_dict=False # we access rows by index | |
| ) | |
| print(f"✅ Connected to SQL Server {DB_HOST}:{DB_PORT}") | |
| return conn | |
| except Exception as e: | |
| print(f"❌ Could not connect to SQL Server at {DB_HOST}:{DB_PORT} - {e}") | |
| raise | |
| def create_user_table(): | |
| """Create Users table if it does not exist.""" | |
| if not USE_DATABASE: | |
| print("ℹ️ Skipping user table creation (demo mode, no DB).") | |
| return | |
| try: | |
| conn = get_db_connection() | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| IF NOT EXISTS ( | |
| SELECT * FROM sysobjects | |
| WHERE name = 'Users' AND xtype = 'U' | |
| ) | |
| CREATE TABLE Users ( | |
| id INT IDENTITY(1,1) PRIMARY KEY, | |
| name NVARCHAR(120) NOT NULL, | |
| role NVARCHAR(50) NOT NULL, | |
| email NVARCHAR(120) UNIQUE NOT NULL, | |
| password NVARCHAR(255) NOT NULL | |
| ) | |
| """) | |
| conn.commit() | |
| cursor.close() | |
| conn.close() | |
| print("✅ Users table verified/created successfully.") | |
| except Exception as e: | |
| print(f"❌ Database setup failed: {str(e)}") | |
| create_user_table() | |
| # ------------------------------------------------------------ | |
| # LOAD VECTOR INDEX AND CHUNKS | |
| # ------------------------------------------------------------ | |
| MODEL = SentenceTransformer('all-MiniLM-L6-v2') | |
| #Paths for the old and new files | |
| FAISS_PATH = os.path.join(BASE_DIR, "crime_scene_index.faiss") | |
| CHUNKS_PATH = os.path.join(BASE_DIR, "crime_scene_chunks.pkl") | |
| NEW_FAISS_PATH = os.path.join(BASE_DIR, "Manual on Investigative Interviewing for Criminal Investigation.faiss") | |
| NEW_CHUNKS_PATH = os.path.join(BASE_DIR, "Manual on Investigative Interviewing for Criminal Investigation.pkl") | |
| # Load old FAISS index and text chunks | |
| if os.path.exists(FAISS_PATH) and os.path.exists(CHUNKS_PATH): | |
| print("📘 Loading old FAISS index and text chunks...") | |
| old_index = faiss.read_index(FAISS_PATH) | |
| with open(CHUNKS_PATH, "rb") as f: | |
| old_text_chunks = pickle.load(f) | |
| print(f"✅ Loaded {len(old_text_chunks)} chunks from the old reference guide.") | |
| else: | |
| old_index = None | |
| old_text_chunks = [] | |
| print("⚠️ Old FAISS or chunks file not found. Context retrieval disabled.") | |
| # Load new FAISS index and text chunks | |
| if os.path.exists(NEW_FAISS_PATH) and os.path.exists(NEW_CHUNKS_PATH): | |
| print("📘 Loading new FAISS index and text chunks...") | |
| new_index = faiss.read_index(NEW_FAISS_PATH) | |
| with open(NEW_CHUNKS_PATH, "rb") as f: | |
| new_text_chunks = pickle.load(f) | |
| print(f"✅ Loaded {len(new_text_chunks)} chunks from the new reference guide.") | |
| else: | |
| new_index = None | |
| new_text_chunks = [] | |
| print("⚠️ New FAISS or chunks file not found. Context retrieval for new book is disabled.") | |
| # ------------------------------------------------------------ | |
| # BODY LANGUAGE BOOK FAISS INDEX (using provided FAISS file) | |
| # ------------------------------------------------------------ | |
| BODY_BOOK_FAISS_PATH = os.path.join(BASE_DIR, "what-everybody-is-saying.faiss") | |
| BODY_BOOK_CHUNKS_PATH = os.path.join(BASE_DIR, "what-everybody-is-saying_chunks.pkl") | |
| MODEL_BODY = SentenceTransformer('all-MiniLM-L6-v2') | |
| if os.path.exists(BODY_BOOK_FAISS_PATH) and os.path.exists(BODY_BOOK_CHUNKS_PATH): | |
| body_book_index = faiss.read_index(BODY_BOOK_FAISS_PATH) | |
| with open(BODY_BOOK_CHUNKS_PATH, "rb") as f: | |
| body_book_entries = pickle.load(f) | |
| print(f"✅ Loaded body language FAISS index and chunks: {len(body_book_entries)} entries.") | |
| else: | |
| body_book_index = None | |
| body_book_entries = [] | |
| print("⚠️ Body language FAISS or chunks file not found. Context retrieval disabled.") | |
| # ------------------------------------------------------------ | |
| # HELPER FUNCTIONS | |
| # ------------------------------------------------------------ | |
| # --- Computer Vision: lightweight face analysis (OpenCV Haar cascades) --- | |
| detector_mtcnn = MTCNN() | |
| fer_detector = FER() | |
| def analyze_frame_mtcnn(image_bgr, previous=None): | |
| """Analyze a single BGR frame using MTCNN. Returns metrics dict.""" | |
| if image_bgr is None: | |
| return { | |
| "face_present": False, | |
| "faces_count": 0, | |
| "jitter": None, | |
| "face_box": None, | |
| "quality": 0, | |
| "behavior_tags": ["no_face"], | |
| "investigative_expression": "no_face" | |
| } | |
| image_rgb = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2RGB) | |
| faces = detector_mtcnn.detect_faces(image_rgb) | |
| if not faces: | |
| return { | |
| "face_present": False, | |
| "faces_count": 0, | |
| "jitter": None, | |
| "face_box": None, | |
| "quality": 0, | |
| "behavior_tags": ["no_face"], | |
| "investigative_expression": "absence" | |
| } | |
| # Choose largest face | |
| face = max(faces, key=lambda f: f['box'][2] * f['box'][3]) | |
| x, y, w, h = face['box'] | |
| # Jitter (movement between frames) normalized by face width | |
| cx, cy = x + w / 2.0, y + h / 2.0 | |
| jitter = None | |
| if previous and previous.get("face_box"): | |
| px, py, pw, ph = previous["face_box"] | |
| pcx, pcy = px + pw / 2.0, py + ph / 2.0 | |
| dist = ((cx - pcx) ** 2 + (cy - pcy) ** 2) ** 0.5 | |
| jitter = float(dist / max(1.0, w)) | |
| # Heuristic quality: use face confidence | |
| quality = round(face.get('confidence', 0) * 100, 1) | |
| tags = [] | |
| if jitter is not None: | |
| if jitter > 0.08: | |
| tags.append("stress_head_movement") | |
| elif jitter > 0.04: | |
| tags.append("elevated_movement") | |
| if not tags: | |
| tags.append("baseline") | |
| # Map tags to investigative_expression (similar to original logic) | |
| if "stress_head_movement" in tags: | |
| investigative_expression = "stress" | |
| elif "elevated_movement" in tags: | |
| investigative_expression = "elevated" | |
| elif "baseline" in tags: | |
| investigative_expression = "neutral" | |
| else: | |
| investigative_expression = tags[0] if tags else "unknown" | |
| return { | |
| "face_present": True, | |
| "faces_count": len(faces), | |
| "jitter": round(jitter, 4) if jitter is not None else None, | |
| "face_box": [int(x), int(y), int(w), int(h)], | |
| "quality": quality, | |
| "behavior_tags": tags, | |
| "investigative_expression": investigative_expression | |
| } | |
| try: | |
| FACE_CASCADE = cv2.CascadeClassifier(cv2.data.haarcascades + "haarcascade_frontalface_default.xml") | |
| EYE_CASCADE = cv2.CascadeClassifier(cv2.data.haarcascades + "haarcascade_eye.xml") | |
| SMILE_CASCADE = cv2.CascadeClassifier(cv2.data.haarcascades + "haarcascade_smile.xml") | |
| except Exception: | |
| FACE_CASCADE = None | |
| EYE_CASCADE = None | |
| SMILE_CASCADE = None | |
| def _variance_of_laplacian(image_gray): | |
| return float(cv2.Laplacian(image_gray, cv2.CV_64F).var()) | |
| def analyze_frame(image_bgr, previous=None): | |
| """Analyze a single BGR frame. Returns metrics dict. | |
| previous: optional dict from prior frame to compute jitter. | |
| """ | |
| if image_bgr is None or FACE_CASCADE is None: | |
| return { | |
| "face_present": False, | |
| "faces_count": 0, | |
| "jitter": None, | |
| "blur": None, | |
| "brightness": None, | |
| "eyes": 0, | |
| "smile": 0, | |
| "face_box": None, | |
| "quality": 0, | |
| "expression": "no_face", | |
| "investigative_expression": "no_face", | |
| "behavior_tags": ["no_face"], | |
| "emotion": None | |
| } | |
| gray = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2GRAY) | |
| faces = FACE_CASCADE.detectMultiScale(gray, scaleFactor=1.2, minNeighbors=5, minSize=(60, 60)) | |
| # Basic quality metrics | |
| blur_val = _variance_of_laplacian(gray) | |
| # Normalize blur to 0-100 range heuristically | |
| quality = max(0, min(100, (blur_val / 200.0) * 100)) | |
| brightness = float(np.mean(gray)) | |
| if len(faces) == 0: | |
| return { | |
| "face_present": False, | |
| "faces_count": 0, | |
| "jitter": None, | |
| "blur": round(blur_val, 2), | |
| "brightness": round(brightness, 2), | |
| "eyes": 0, | |
| "smile": 0, | |
| "face_box": None, | |
| "quality": round(quality, 1), | |
| "expression": "no_face", | |
| "investigative_expression": "absence", | |
| "behavior_tags": ["avoidance", "absence"], | |
| "emotion": None | |
| } | |
| # Choose largest face | |
| x, y, w, h = max(faces, key=lambda b: b[2] * b[3]) | |
| face_roi_gray = gray[y:y+h, x:x+w] | |
| eyes = EYE_CASCADE.detectMultiScale(face_roi_gray, scaleFactor=1.1, minNeighbors=5, minSize=(20, 20)) if EYE_CASCADE is not None else [] | |
| smiles = SMILE_CASCADE.detectMultiScale(face_roi_gray, scaleFactor=1.3, minNeighbors=20) if SMILE_CASCADE is not None else [] | |
| # Jitter (movement between frames) normalized by face width | |
| cx, cy = x + w / 2.0, y + h / 2.0 | |
| jitter = None | |
| if previous and previous.get("face_box"): | |
| px, py, pw, ph = previous["face_box"] | |
| pcx, pcy = px + pw / 2.0, py + ph / 2.0 | |
| dist = ((cx - pcx) ** 2 + (cy - pcy) ** 2) ** 0.5 | |
| jitter = float(dist / max(1.0, w)) # normalized | |
| # FER emotion detection | |
| emotion = None | |
| try: | |
| fer_results = fer_detector.detect_emotions(image_bgr) | |
| if fer_results: | |
| emotions = fer_results[0]["emotions"] | |
| emotion = max(emotions, key=emotions.get) | |
| except Exception: | |
| emotion = None | |
| # Investigative-oriented heuristic classification | |
| eyes_cnt = int(len(eyes) if eyes is not None else 0) | |
| smile_cnt = int(len(smiles) if smiles is not None else 0) | |
| expr_basic = "neutral" | |
| if smile_cnt >= 1 and quality >= 40: | |
| expr_basic = "smiling" | |
| elif smile_cnt == 0 and eyes_cnt >= 1: | |
| expr_basic = "flat" | |
| tags = [] | |
| # Head movement / jitter cues | |
| if jitter is not None: | |
| if jitter > 0.08: | |
| tags.append("stress_head_movement") | |
| elif jitter > 0.04: | |
| tags.append("elevated_movement") | |
| # Avoidance (low eyes or poor quality + no smile) | |
| if eyes_cnt == 0 and smile_cnt == 0: | |
| tags.append("possible_avoidance") | |
| # Masking smile (smile plus low eyes or movement) | |
| if smile_cnt >= 1 and (eyes_cnt <= 1 or (jitter is not None and jitter > 0.04)): | |
| tags.append("masking_smile") | |
| # Calm/composed | |
| if jitter is not None and jitter <= 0.02 and smile_cnt == 0 and eyes_cnt >= 2: | |
| tags.append("composed") | |
| # Potential concealment (smile with minimal eye engagement) | |
| if smile_cnt >= 1 and eyes_cnt == 0: | |
| tags.append("potential_concealment") | |
| if not tags: | |
| tags.append("baseline") | |
| # Derive a single investigative_expression label preference order | |
| investigative_expression = ( | |
| "masking_smile" if "masking_smile" in tags else | |
| "stress" if "stress_head_movement" in tags else | |
| "avoidance" if "possible_avoidance" in tags else | |
| "concealment" if "potential_concealment" in tags else | |
| "composed" if "composed" in tags else | |
| expr_basic | |
| ) | |
| return { | |
| "face_present": True, | |
| "faces_count": int(len(faces)), | |
| "jitter": round(jitter, 4) if jitter is not None else None, | |
| "blur": round(blur_val, 2), | |
| "brightness": round(brightness, 2), | |
| "eyes": eyes_cnt, | |
| "smile": smile_cnt, | |
| "face_box": [int(x), int(y), int(w), int(h)], | |
| "quality": round(quality, 1), | |
| "expression": expr_basic, # keep backward-compatible key | |
| "investigative_expression": investigative_expression, | |
| "behavior_tags": tags, | |
| "emotion": emotion | |
| } | |
| def recommend_command(metrics): | |
| """Derive a simple guidance command based on metrics.""" | |
| if not metrics or not metrics.get("face_present"): | |
| return "Please position your face in the frame and face the camera." | |
| if metrics.get("quality", 0) < 40: | |
| return "Increase lighting and hold steady for a clearer view." | |
| if metrics.get("jitter") is not None and metrics["jitter"] > 0.08: | |
| return "Try to keep your head steady while answering." | |
| if metrics.get("eyes", 0) == 0: | |
| return "Ensure your eyes are visible; avoid looking away." | |
| return "Proceed with your answer." | |
| def _normalize_epoch_to_seconds(value): | |
| """Normalize a numeric epoch timestamp to seconds. | |
| Accepts seconds (e.g., 1730971974) or milliseconds (e.g., 1730971974123). | |
| Returns float seconds or None if invalid. | |
| """ | |
| try: | |
| if value is None: | |
| return None | |
| v = float(value) | |
| # Heuristic: treat large values as ms | |
| if v > 1e11: | |
| return v / 1000.0 | |
| return v | |
| except Exception: | |
| return None | |
| def _aggregate_interval_from_history(face_state, start_s, end_s): | |
| """Aggregate metrics from face_state['history'] within [start_s, end_s]. | |
| Returns dict with per-answer stats. | |
| """ | |
| history = face_state.get("history", []) | |
| if not history or start_s is None or end_s is None or end_s < start_s: | |
| return { | |
| "duration": 0.0, | |
| "frames": 0, | |
| "face_presence_rate": 0.0, | |
| "avg_quality": None, | |
| "avg_brightness": None, | |
| "avg_jitter": None, | |
| "avg_eyes": None, | |
| "smile_rate": 0.0 | |
| } | |
| samples = [h for h in history if start_s <= h.get("t", 0) <= end_s] | |
| if not samples: | |
| return { | |
| "duration": round(float(max(0.0, end_s - start_s)), 3), | |
| "frames": 0, | |
| "face_presence_rate": 0.0, | |
| "avg_quality": None, | |
| "avg_brightness": None, | |
| "avg_jitter": None, | |
| "avg_eyes": None, | |
| "smile_rate": 0.0 | |
| } | |
| def _avg(arr): | |
| return round(float(sum(arr) / len(arr)), 3) if arr else None | |
| frames = len(samples) | |
| presence = [1.0 if s.get("face_present") else 0.0 for s in samples] | |
| qualities = [s.get("quality") for s in samples if s.get("quality") is not None] | |
| brights = [s.get("brightness") for s in samples if s.get("brightness") is not None] | |
| jitters = [s.get("jitter") for s in samples if s.get("jitter") is not None] | |
| eyes = [s.get("eyes") for s in samples if s.get("eyes") is not None] | |
| smiles = [s.get("smile") for s in samples if s.get("smile") is not None] | |
| # Expression distribution | |
| expr_hist = {} | |
| inv_expr_hist = {} | |
| tag_hist = {} | |
| for s in samples: | |
| ex = s.get("investigative_expression") or s.get("expression") or "unknown" | |
| expr_hist[ex] = expr_hist.get(ex, 0) + 1 | |
| inv = s.get("investigative_expression") or "unknown" | |
| inv_expr_hist[inv] = inv_expr_hist.get(inv, 0) + 1 | |
| tags = s.get("behavior_tags") or [] | |
| for t in tags: | |
| tag_hist[t] = tag_hist.get(t, 0) + 1 | |
| dominant_expr = None | |
| if expr_hist: | |
| dominant_expr = max(expr_hist.items(), key=lambda kv: kv[1])[0] | |
| dominant_investigative = None | |
| if inv_expr_hist: | |
| dominant_investigative = max(inv_expr_hist.items(), key=lambda kv: kv[1])[0] | |
| return { | |
| "duration": round(float(max(0.0, end_s - start_s)), 3), | |
| "frames": frames, | |
| "face_presence_rate": round(_avg(presence) if presence else 0.0, 3), | |
| "avg_quality": _avg(qualities), | |
| "avg_brightness": _avg(brights), | |
| "avg_jitter": _avg(jitters), | |
| "avg_eyes": _avg(eyes), | |
| "smile_rate": round(float(sum(1 for v in smiles if v and v > 0) / frames), 3) if frames else 0.0, | |
| "expression_distribution": expr_hist, | |
| "investigative_expression_distribution": inv_expr_hist, | |
| "behavior_tag_distribution": tag_hist, | |
| "dominant_expression": dominant_expr, | |
| "dominant_investigative_expression": dominant_investigative | |
| } | |
| def _clamp(v, lo=0.0, hi=100.0): | |
| return float(max(lo, min(hi, v))) | |
| def _safe_ratio(a, b): | |
| try: | |
| if b == 0: | |
| return 0.0 | |
| return float(a) / float(b) | |
| except Exception: | |
| return 0.0 | |
| def compute_investigative_assessment(final_result, face_body): | |
| """Produce investigation-oriented cues and an involvement score (0-100). | |
| Inputs: | |
| - final_result: dict with keys like {"truth_score": number, "label": str} | |
| - face_body: {"metrics": { ... aggregation ... }} or None | |
| Output schema: | |
| { | |
| "nonverbal_score": number, # 0..100 (higher means more concerning) | |
| "involvement_score": number, # 0..100 (higher means likely involved) | |
| "cues": [str], # textual cues detected | |
| "rationale": str # brief explanation | |
| } | |
| """ | |
| cues = [] | |
| nonverbal = 0.0 | |
| metrics = (face_body or {}).get("metrics") if face_body else None | |
| # Base involvement from AI label (maps investigative label to base risk) | |
| label = (final_result or {}).get("label", "").upper() | |
| base_involvement = 50.0 | |
| if "GUILTY" in label: | |
| base_involvement = 80.0 | |
| elif "INNOCENT" in label: | |
| base_involvement = 20.0 | |
| elif "EVASIVE" in label: | |
| base_involvement = 60.0 | |
| if metrics: | |
| fpr = float(metrics.get("face_presence_rate", 0.0) or 0.0) | |
| jitter = metrics.get("avg_jitter") | |
| smile_rate = float(metrics.get("smile_rate") or 0.0) | |
| avg_eyes = metrics.get("avg_eyes") | |
| expr_hist = metrics.get("investigative_expression_distribution", {}) or {} | |
| dominant_expr = (metrics.get("dominant_investigative_expression") or "").lower() | |
| # Avoidance cue: low presence in window | |
| if fpr < 0.5: | |
| cues.append("face_avoidance") | |
| nonverbal += 25.0 * (0.5 - fpr) / 0.5 # up to +25 | |
| # Movement (jitter) cue: normalized | |
| if jitter is not None: | |
| # Typical steady jitter ~0.0-0.04; higher suggests agitation | |
| if jitter > 0.08: | |
| cues.append("high_head_movement") | |
| nonverbal += _clamp(((float(jitter) - 0.03) / 0.12) * 40.0, 0.0, 40.0) | |
| # Expression cues | |
| total_frames = sum(expr_hist.values()) or 0 | |
| stress_ratio = _safe_ratio(expr_hist.get("stress", 0), total_frames) | |
| avoidance_ratio = _safe_ratio(expr_hist.get("avoidance", 0), total_frames) | |
| conceal_ratio = _safe_ratio(expr_hist.get("concealment", 0), total_frames) | |
| masking_ratio = _safe_ratio(expr_hist.get("masking_smile", 0), total_frames) | |
| composed_ratio = _safe_ratio(expr_hist.get("composed", 0), total_frames) | |
| if stress_ratio >= 0.15: | |
| cues.append("stress_cue") | |
| nonverbal += 12.0 * (stress_ratio / 0.5) | |
| if avoidance_ratio >= 0.15: | |
| cues.append("avoidance_cue") | |
| nonverbal += 14.0 * (avoidance_ratio / 0.5) | |
| if conceal_ratio >= 0.10: | |
| cues.append("concealment_cue") | |
| nonverbal += 10.0 * (conceal_ratio / 0.4) | |
| if masking_ratio >= 0.10: | |
| cues.append("masking_smile_cue") | |
| nonverbal += 8.0 * (masking_ratio / 0.4) | |
| # Incongruent affect: many smiles but low eyes -> suspicion | |
| if smile_rate > 0.35 and (avg_eyes is not None and avg_eyes < 1.0): | |
| cues.append("incongruent_affect") | |
| nonverbal += 10.0 | |
| # Calming / mitigating cues reduce score | |
| if composed_ratio >= 0.40 and jitter is not None and jitter <= 0.03: | |
| nonverbal -= 12.0 | |
| if dominant_expr == "composed" and fpr >= 0.85: | |
| nonverbal -= 6.0 | |
| nonverbal = _clamp(nonverbal, 0.0, 100.0) | |
| # Combine with AI judgement into involvement score | |
| involvement = _clamp(0.7 * base_involvement + 0.3 * nonverbal, 0.0, 100.0) | |
| rationale = ( | |
| f"Base={int(base_involvement)} from label '{label}', " | |
| f"Nonverbal={int(nonverbal)} via cues: {', '.join(cues) if cues else 'none'}" | |
| ) | |
| return { | |
| "nonverbal_score": round(nonverbal, 1), | |
| "involvement_score": round(involvement, 1), | |
| "cues": cues, | |
| "rationale": rationale | |
| } | |
| def retrieve_relevant_context(query, top_k=3, use_new_reference=False): | |
| """Retrieve relevant book context from FAISS.""" | |
| if use_new_reference: | |
| index = new_index | |
| text_chunks = new_text_chunks | |
| else: | |
| index = old_index # Default to old reference if new is not selected | |
| text_chunks = old_text_chunks | |
| if index is None or len(text_chunks) == 0: | |
| return "No reference context found (FAISS not loaded)." | |
| query_vector = MODEL.encode([query]).astype('float32') | |
| D, I = index.search(query_vector, k=top_k) | |
| valid_indices = [i for i in I[0] if i < len(text_chunks)] | |
| results = [text_chunks[i] for i in valid_indices] | |
| return "\n".join(results) | |
| def detect_crime_type(brief_description: str): | |
| """Automatically detect crime type from description.""" | |
| if not brief_description or len(brief_description.strip()) == 0: | |
| return "Unknown" | |
| text = brief_description.lower() | |
| crime_keywords = { | |
| "kidnap": "Kidnapping", | |
| "abduct": "Kidnapping", | |
| "murder": "Murder", | |
| "kill": "Murder", | |
| "stab": "Murder", | |
| "shoot": "Murder", | |
| "theft": "Theft", | |
| "steal": "Theft", | |
| "rob": "Robbery", | |
| "burglar": "Burglary", | |
| "attack": "Assault", | |
| "assault": "Assault", | |
| "fraud": "Fraud", | |
| "scam": "Fraud", | |
| "arson": "Arson", | |
| "fire": "Arson", | |
| "rape": "Sexual Assault", | |
| "harass": "Harassment", | |
| "poison": "Attempted Murder" | |
| } | |
| for keyword, crime_type in crime_keywords.items(): | |
| if keyword in text: | |
| return crime_type | |
| return "Unknown" | |
| # ------------------------------------------------------------ | |
| # AUTH ROUTES | |
| # ------------------------------------------------------------ | |
| # @app.route('/sign-in', methods=['POST']) | |
| # def sign_in(): | |
| # data = request.json | |
| # email = data.get('email') | |
| # password = data.get('password') | |
| # conn = get_db_connection() | |
| # cursor = conn.cursor() | |
| # cursor.execute('SELECT * FROM Users WHERE email = ?', (email,)) | |
| # user = cursor.fetchone() | |
| # cursor.close() | |
| # conn.close() | |
| # if user and check_password_hash(user[4], password): | |
| # return jsonify({"message": "Login successful", "user": { | |
| # "id": user[0], "name": user[1], "role": user[2], "email": user[3] | |
| # }}), 200 | |
| # elif user: | |
| # return jsonify({"message": "Invalid password"}), 401 | |
| # else: | |
| # return jsonify({"message": "Email not found"}), 404 | |
| # @app.route('/sign-up', methods=['POST']) | |
| # def sign_up(): | |
| # data = request.json | |
| # name, role, email, password = data.get('name'), data.get('role'), data.get('email'), data.get('password') | |
| # if not email or not password: | |
| # return jsonify({"message": "Email and password are required"}), 400 | |
| # conn = get_db_connection() | |
| # cursor = conn.cursor() | |
| # cursor.execute('SELECT * FROM Users WHERE email = ?', (email,)) | |
| # if cursor.fetchone(): | |
| # return jsonify({"message": "Email already exists"}), 400 | |
| # hashed_password = generate_password_hash(password) | |
| # cursor.execute('INSERT INTO Users (name, role, email, password) VALUES (?, ?, ?, ?)', | |
| # (name, role, email, hashed_password)) | |
| # conn.commit() | |
| # cursor.close() | |
| # conn.close() | |
| # return jsonify({"message": "User created successfully"}), 201 | |
| def sign_in(): | |
| data = request.get_json() or {} | |
| email = data.get('email') | |
| password = data.get('password') | |
| if not email or not password: | |
| return jsonify({"message": "Email and password are required"}), 400 | |
| # Optional: if you kept USE_DATABASE from earlier | |
| if USE_DATABASE is False: | |
| return jsonify({"message": "Database is disabled (USE_DATABASE=0)."}), 503 | |
| try: | |
| conn = get_db_connection() | |
| cursor = conn.cursor() | |
| # IMPORTANT: pymssql uses %s, not ? | |
| cursor.execute( | |
| "SELECT id, name, role, email, password FROM Users WHERE email = %s", | |
| (email,) | |
| ) | |
| user = cursor.fetchone() | |
| cursor.close() | |
| conn.close() | |
| except Exception as e: | |
| print(f"❌ Sign-in DB error: {e}") | |
| return jsonify({"message": "Login service unavailable"}), 503 | |
| if user and check_password_hash(user[4], password): | |
| return jsonify({ | |
| "message": "Login successful", | |
| "user": { | |
| "id": user[0], | |
| "name": user[1], | |
| "role": user[2], | |
| "email": user[3] | |
| } | |
| }), 200 | |
| elif user: | |
| return jsonify({"message": "Invalid password"}), 401 | |
| else: | |
| return jsonify({"message": "Email not found"}), 404 | |
| def sign_up(): | |
| data = request.get_json() or {} | |
| name = data.get('name') | |
| role = data.get('role') | |
| email = data.get('email') | |
| password = data.get('password') | |
| if not email or not password: | |
| return jsonify({"message": "Email and password are required"}), 400 | |
| # Optional: if you kept USE_DATABASE from earlier | |
| if USE_DATABASE is False: | |
| return jsonify({"message": "Sign-up is disabled because the database is not enabled (USE_DATABASE=0)."}), 503 | |
| try: | |
| conn = get_db_connection() | |
| cursor = conn.cursor() | |
| # Check if email already exists | |
| cursor.execute( | |
| "SELECT id FROM Users WHERE email = %s", | |
| (email,) | |
| ) | |
| if cursor.fetchone(): | |
| cursor.close() | |
| conn.close() | |
| return jsonify({"message": "Email already exists"}), 400 | |
| hashed_password = generate_password_hash(password) | |
| # Insert new user | |
| cursor.execute( | |
| "INSERT INTO Users (name, role, email, password) VALUES (%s, %s, %s, %s)", | |
| (name, role, email, hashed_password) | |
| ) | |
| conn.commit() | |
| cursor.close() | |
| conn.close() | |
| return jsonify({"message": "User created successfully"}), 201 | |
| except Exception as e: | |
| print(f"❌ Sign-up DB error: {e}") | |
| return jsonify({"message": "Sign-up service unavailable"}), 503 | |
| # ------------------------------------------------------------ | |
| # CORE LOGIC | |
| # ------------------------------------------------------------ | |
| sessions = {} | |
| def health_check(): | |
| return jsonify({ | |
| "status": "healthy", | |
| "service": "PyDetect backend", | |
| "features": { | |
| "crime_type_detection": "enabled", | |
| "question_generation": "enabled", | |
| "response_analysis": "enabled", | |
| "report_generation": "enabled", | |
| "validation_results": "enabled" | |
| }, | |
| "endpoints": [ | |
| "/sign-in", "/sign-up", "/start_session", | |
| "/submit_profile", "/submit_case", "/submit_response", | |
| "/get_report", "/get_validation_results", "/ask_question" | |
| ] | |
| }), 200 | |
| def start_session(): | |
| data = request.get_json(silent=True) | |
| brief_description = "" | |
| if data: | |
| brief_description = data.get("briefDescription", "") | |
| print(f"[SESSION START] brief_description: {brief_description}") | |
| sid = str(uuid.uuid4()) | |
| sessions[sid] = {"profile": {"brief_description": brief_description} if brief_description else {}, "crime_type": "", "responses": [], "last_question": ""} | |
| return jsonify({"session_id": sid}) | |
| def submit_profile(): | |
| data = request.get_json(force=True) | |
| sid = data.get("session_id") | |
| if sid not in sessions: | |
| return jsonify({"error": "Invalid session_id"}), 400 | |
| profile = data.get("profile", {}) | |
| brief = profile.get("brief_description", "") | |
| detected = detect_crime_type(brief) | |
| profile["crime_type"] = detected | |
| sessions[sid]["profile"] = profile | |
| sessions[sid]["crime_type"] = detected | |
| return jsonify({"status": "Profile saved", "detected_crime_type": detected}) | |
| def submit_case_alias(): | |
| """For older frontend compatibility.""" | |
| data = request.get_json(force=True) | |
| sid = data.get("session_id") | |
| if sid not in sessions: | |
| return jsonify({"error": "Invalid session_id"}), 400 | |
| profile = data.get("case_data") or data.get("profile") or {} | |
| brief = profile.get("brief_description", "") | |
| detected = detect_crime_type(brief) | |
| profile["crime_type"] = detected | |
| sessions[sid]["profile"] = profile | |
| sessions[sid]["crime_type"] = detected | |
| return jsonify({"status": "Case saved", "detected_crime_type": detected}), 200 | |
| # ------------------------------------------------------------ | |
| # AI RESPONSE + REPORT + QUESTIONS | |
| # ------------------------------------------------------------ | |
| def submit_response(): | |
| try: | |
| data = request.get_json(force=True) | |
| sid, text = data.get("session_id"), data.get("text", "") | |
| if sid not in sessions: | |
| return jsonify({"error": "Invalid session_id"}), 400 | |
| session = sessions[sid] | |
| profile = session.get("profile", {}) | |
| case_desc = profile.get("brief_description", "No description.") | |
| last_q = session.get("last_question", "") | |
| responses = session.get("responses", []) | |
| history = "\n".join([f"Q: {r['question']}\nA: {r['text']}" for r in responses[-3:]]) if responses else "" | |
| # Optional frontend timing fields | |
| answer_start_at_ms = data.get("answer_start_at") | |
| answer_end_at_ms = data.get("answer_end_at") | |
| duration_ms = data.get("duration_ms") | |
| mode = data.get("mode") # 'voice' | 'text' | 'mixed' | |
| question_id = data.get("question_id") | |
| response_id = data.get("response_id") | |
| # Retrieve relevant context from reference material (if available) | |
| context = retrieve_relevant_context(case_desc, top_k=3, use_new_reference=True) | |
| # Calculate semantic similarity between answer and context | |
| if context and context != "No reference context found (FAISS not loaded).": | |
| answer_vec = MODEL.encode([text])[0] | |
| context_vec = MODEL.encode([context])[0] | |
| similarity = float(np.dot(answer_vec, context_vec) / (np.linalg.norm(answer_vec) * np.linalg.norm(context_vec))) | |
| similarity_score = round(similarity * 100, 2) | |
| else: | |
| similarity_score = None | |
| # Enhanced prompt for OpenAI | |
| prompt = f""" | |
| You are a detective analyzing a suspect's reply. | |
| CASE: {case_desc} | |
| QUESTION: {last_q} | |
| ANSWER: {text} | |
| HISTORY: | |
| {history if history else 'None'} | |
| REFERENCE CONTEXT (from official guide): | |
| {context if context else 'None'} | |
| First, compare the answer to the reference context and rate its factual alignment (0-100). | |
| Second, classify the reply as CONFESSION, DENIAL, or EVASIVE. | |
| Third, give detailed reasoning, confidence (0–100), and final label: LIKELY GUILTY, LIKELY INNOCENT, or EVASIVE. | |
| Return JSON like: | |
| {{"truth_score": number, "label": "string", "reason": "string", "similarity_score": number}} | |
| """ | |
| response = requests.post( | |
| "https://api.openai.com/v1/chat/completions", | |
| headers={"Authorization": f"Bearer {OPENAI_API_KEY}", "Content-Type": "application/json"}, | |
| json={"model": "gpt-3.5-turbo", "messages": [ | |
| {"role": "system", "content": "You are a professional detective."}, | |
| {"role": "user", "content": prompt} | |
| ], "temperature": 0.3, "max_tokens": 250}, timeout=60) | |
| import json | |
| result = json.loads(response.json()["choices"][0]["message"]["content"]) | |
| # Optionally, blend the AI's truth_score with the semantic similarity | |
| if similarity_score is not None and "truth_score" in result: | |
| # Weighted average: 70% AI, 30% similarity | |
| result["truth_score"] = round(0.7 * result["truth_score"] + 0.3 * similarity_score, 2) | |
| result["similarity_score"] = similarity_score | |
| record = {"question": last_q, "text": text, "final": result} | |
| # Persist optional timing/meta for traceability | |
| trace = {} | |
| if question_id: trace["question_id"] = question_id | |
| if response_id: trace["response_id"] = response_id | |
| if mode: trace["mode"] = mode | |
| if answer_start_at_ms is not None: trace["answer_start_at"] = answer_start_at_ms | |
| if answer_end_at_ms is not None: trace["answer_end_at"] = answer_end_at_ms | |
| if duration_ms is not None: trace["duration_ms"] = duration_ms | |
| if trace: | |
| record["timing"] = trace | |
| # If timing provided, aggregate face/body metrics over that interval from history | |
| try: | |
| face_state = session.setdefault("face", {}) | |
| start_s = _normalize_epoch_to_seconds(answer_start_at_ms) | |
| end_s = _normalize_epoch_to_seconds(answer_end_at_ms) | |
| if start_s is None and end_s is None and duration_ms is not None: | |
| # If only duration provided, use server 'now' as end | |
| end_s = time.time() | |
| start_s = end_s - max(0.0, float(duration_ms) / 1000.0) | |
| elif start_s is not None and end_s is None and duration_ms is not None: | |
| end_s = start_s + max(0.0, float(duration_ms) / 1000.0) | |
| elif end_s is not None and start_s is None and duration_ms is not None: | |
| start_s = end_s - max(0.0, float(duration_ms) / 1000.0) | |
| if start_s is not None and end_s is not None: | |
| agg = _aggregate_interval_from_history(face_state, start_s, end_s) | |
| record["face_body"] = { | |
| "start": start_s, | |
| "end": end_s, | |
| "metrics": agg | |
| } | |
| except Exception: | |
| # Do not fail the request because of metrics aggregation | |
| pass | |
| session["responses"].append(record) | |
| # After appending, compute investigative assessment (uses face_body if present) | |
| try: | |
| assessment = compute_investigative_assessment(record.get("final"), record.get("face_body")) | |
| record["investigative_assessment"] = assessment | |
| except Exception: | |
| pass | |
| # If there's a recent, unattached answer segment, attach it to this response | |
| try: | |
| segs = session.get("answer_segments") or [] | |
| if segs: | |
| last_seg = segs[-1] | |
| if not last_seg.get("attached"): | |
| session["responses"][-1]["face_body"] = last_seg | |
| last_seg["attached"] = True | |
| except Exception: | |
| pass | |
| session["last_answer"] = text | |
| return jsonify(result) | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| def get_report(session_id): | |
| if session_id not in sessions: | |
| return jsonify({"error": "Invalid session_id"}), 400 | |
| session = sessions[session_id] | |
| profile = session.get("profile", {}) | |
| crime = profile.get("brief_description", "unspecified") | |
| responses = session.get("responses", []) | |
| if not responses: | |
| return jsonify({ | |
| "report": "No responses yet.", | |
| "verdict": "Inconclusive", | |
| "truePercentage": 0, | |
| "falsePercentage": 0, | |
| "truthScore": 0, | |
| "avg_truth_score": 0, | |
| "validationResult": "Inconclusive", | |
| "session_duration": "0 minutes", | |
| "questions_answered": 0 | |
| }), 200 | |
| # Calculate truth scores and statistics | |
| truth_scores = [r["final"]["truth_score"] for r in responses] | |
| avg_truth_score = sum(truth_scores) / len(truth_scores) | |
| # Calculate percentages for frontend validation page | |
| true_percentage = max(0, min(100, avg_truth_score)) | |
| false_percentage = 100 - true_percentage | |
| # Determine validation result | |
| if avg_truth_score >= 70: | |
| validation_result = "LIKELY TRUTHFUL" | |
| elif avg_truth_score >= 50: | |
| validation_result = "INCONCLUSIVE" | |
| else: | |
| validation_result = "LIKELY DECEPTIVE" | |
| # Build interview transcript for AI analysis | |
| interview = "\n".join([ | |
| f"Detective: {r['question']}\nAccused: {r['text']}\nAssessment: {r['final']['label']} ({r['final']['truth_score']}%)" | |
| for r in responses]) | |
| # Generate AI verdict if OpenAI is available | |
| final_verdict = "Inconclusive" | |
| summary = "Analysis based on response patterns and truth indicators." | |
| if OPENAI_API_KEY: | |
| try: | |
| prompt = f""" | |
| Summarize this interrogation and decide verdict. | |
| CASE: {crime} | |
| INTERVIEW: | |
| {interview} | |
| Respond JSON: | |
| {{"final_verdict": "string", "summary": "string"}} | |
| """ | |
| response = requests.post( | |
| "https://api.openai.com/v1/chat/completions", | |
| headers={"Authorization": f"Bearer {OPENAI_API_KEY}", "Content-Type": "application/json"}, | |
| json={"model": "gpt-3.5-turbo", "messages": [ | |
| {"role": "system", "content": "You are a detective summarizing interrogation."}, | |
| {"role": "user", "content": prompt} | |
| ], "temperature": 0.4, "max_tokens": 300}, timeout=60) | |
| import json | |
| ai_result = json.loads(response.json()["choices"][0]["message"]["content"]) | |
| final_verdict = ai_result.get("final_verdict", final_verdict) | |
| summary = ai_result.get("summary", summary) | |
| except Exception as e: | |
| print(f"AI analysis failed: {e}") | |
| # Aggregate involvement from investigative assessments if present | |
| involvement_scores = [] | |
| cue_counter = {} | |
| for r in responses: | |
| ia = r.get("investigative_assessment") | |
| if ia and isinstance(ia.get("involvement_score"), (int, float)): | |
| involvement_scores.append(float(ia["involvement_score"])) | |
| if ia and isinstance(ia.get("cues"), list): | |
| for c in ia["cues"]: | |
| cue_counter[c] = cue_counter.get(c, 0) + 1 | |
| avg_involvement = round(float(sum(involvement_scores) / len(involvement_scores)), 1) if involvement_scores else 0.0 | |
| high_risk = sum(1 for s in involvement_scores if s >= 70) | |
| moderate = sum(1 for s in involvement_scores if 40 <= s < 70) | |
| low_risk = sum(1 for s in involvement_scores if s < 40) | |
| # Return comprehensive data for frontend validation page | |
| return jsonify({ | |
| "final_verdict": final_verdict, | |
| "summary": summary, | |
| "truePercentage": round(true_percentage, 1), | |
| "falsePercentage": round(false_percentage, 1), | |
| "truthScore": round(avg_truth_score, 1), | |
| "avg_truth_score": round(avg_truth_score, 1), | |
| "validationResult": validation_result, | |
| "session_duration": f"{len(responses) * 2} minutes", # Estimate 2 minutes per Q&A | |
| "questions_answered": len(responses), | |
| "responses": responses, | |
| "case_summary": crime, | |
| "overall_involvement": { | |
| "avg_involvement_score": avg_involvement, | |
| "high_risk_count": high_risk, | |
| "moderate_count": moderate, | |
| "low_risk_count": low_risk, | |
| "top_cues": sorted(cue_counter.items(), key=lambda kv: kv[1], reverse=True)[:5] | |
| }, | |
| "detailed_analysis": { | |
| "truth_indicators": sum(1 for r in responses if r["final"]["truth_score"] > 70), | |
| "deception_indicators": sum(1 for r in responses if r["final"]["truth_score"] < 50), | |
| "neutral_responses": sum(1 for r in responses if 50 <= r["final"]["truth_score"] <= 70), | |
| "highest_truth_score": max(truth_scores) if truth_scores else 0, | |
| "lowest_truth_score": min(truth_scores) if truth_scores else 0, | |
| "consistency_rating": "High" if max(truth_scores) - min(truth_scores) < 30 else "Moderate" if max(truth_scores) - min(truth_scores) < 50 else "Low" | |
| } | |
| }) | |
| def get_validation_results(session_id): | |
| """ | |
| Endpoint specifically designed for the validation page component | |
| Returns data in the exact format expected by the frontend | |
| """ | |
| if session_id not in sessions: | |
| return jsonify({"error": "Invalid session_id"}), 400 | |
| session = sessions[session_id] | |
| profile = session.get("profile", {}) | |
| responses = session.get("responses", []) | |
| if not responses: | |
| return jsonify({ | |
| "truePercentage": 0, | |
| "falsePercentage": 0, | |
| "truthScore": 0, | |
| "avg_truth_score": 0, | |
| "validationResult": "Inconclusive - No responses recorded", | |
| "session_duration": "0 minutes", | |
| "questions_answered": 0, | |
| "report": { | |
| "case_summary": profile.get("brief_description", "No case details"), | |
| "total_questions": 0, | |
| "analysis_complete": False | |
| } | |
| }), 200 | |
| # Calculate comprehensive validation metrics | |
| truth_scores = [r["final"]["truth_score"] for r in responses] | |
| avg_truth_score = sum(truth_scores) / len(truth_scores) | |
| # Frontend-compatible percentages | |
| true_percentage = max(0, min(100, avg_truth_score)) | |
| false_percentage = 100 - true_percentage | |
| # Determine validation result with detailed reasoning | |
| if avg_truth_score >= 85: | |
| validation_result = "HIGHLY TRUTHFUL" | |
| elif avg_truth_score >= 70: | |
| validation_result = "LIKELY TRUTHFUL" | |
| elif avg_truth_score >= 50: | |
| validation_result = "INCONCLUSIVE" | |
| elif avg_truth_score >= 30: | |
| validation_result = "LIKELY DECEPTIVE" | |
| else: | |
| validation_result = "HIGHLY DECEPTIVE" | |
| # Calculate session duration (estimate) | |
| estimated_duration = len(responses) * 2 # 2 minutes per question | |
| session_duration = f"{estimated_duration} minutes" | |
| # Build comprehensive report object | |
| report_data = { | |
| "case_summary": profile.get("brief_description", "No case details"), | |
| "crime_type": session.get("crime_type", "Unknown"), | |
| "total_questions": len(responses), | |
| "analysis_complete": True, | |
| "truth_indicators": sum(1 for r in responses if r["final"]["truth_score"] > 70), | |
| "deception_indicators": sum(1 for r in responses if r["final"]["truth_score"] < 50), | |
| "neutral_responses": sum(1 for r in responses if 50 <= r["final"]["truth_score"] <= 70), | |
| "session_start": "Current session", | |
| "avg_truth_score": avg_truth_score, | |
| "validationResult": validation_result, | |
| "session_duration": session_duration, | |
| "questions_answered": len(responses), | |
| "detailed_responses": [ | |
| { | |
| "question": r.get("question", ""), | |
| "answer": r.get("text", ""), | |
| "truth_score": r["final"]["truth_score"], | |
| "label": r["final"]["label"], | |
| "reason": r["final"].get("reason", "") | |
| } for r in responses | |
| ] | |
| } | |
| return jsonify({ | |
| "truePercentage": round(true_percentage, 1), | |
| "falsePercentage": round(false_percentage, 1), | |
| "truthScore": round(avg_truth_score, 1), | |
| "avg_truth_score": round(avg_truth_score, 1), | |
| "validationResult": validation_result, | |
| "session_duration": session_duration, | |
| "questions_answered": len(responses), | |
| "report": report_data, | |
| "success": True | |
| }) | |
| def ask_question(): | |
| try: | |
| if not OPENAI_API_KEY: | |
| return jsonify({"error": "No API key configured"}), 500 | |
| sid = request.args.get("session_id") | |
| if not sid or sid not in sessions: | |
| return jsonify({"error": "Invalid session_id"}), 400 | |
| s = sessions[sid] | |
| # Update session with latest crime_type and brief_description if provided | |
| crime_type_param = request.args.get("crime_type") | |
| brief_description_param = request.args.get("brief_description") | |
| if crime_type_param: | |
| s["crime_type"] = crime_type_param | |
| if brief_description_param is not None: | |
| s.setdefault("profile", {})["brief_description"] = brief_description_param | |
| profile = s.get("profile", {}) | |
| brief_description = profile.get("brief_description", "No brief description provided.") | |
| ctype = s.get("crime_type", "Unknown") | |
| history = "\n".join([f"Detective: {r['question']}\nAccused: {r['text']}" for r in s.get("responses", [])[-3:]]) if s.get("responses") else "" | |
| # Retrieve relevant context from the reference (old/new based on case data) | |
| context = retrieve_relevant_context(f"{ctype} investigation", use_new_reference=True) | |
| prompt = f""" | |
| You are Detective Johnson investigating a {ctype.lower()}. | |
| CASE TYPE: {ctype} | |
| BRIEF DESCRIPTION: {brief_description} | |
| CONTEXT: {context} | |
| HISTORY: {history if history else 'No previous questions.'} | |
| Your task: | |
| Ask ONE short, simple, clear, and high-quality question in plain English (≤25 words) that follows up naturally on the latest answer and case context. | |
| Avoid complex language, jargon, or generic questions. Make sure the question is easy to understand and relevant to the investigation. | |
| """ | |
| # Call OpenAI API to generate a question based on the context and case details | |
| response = requests.post( | |
| "https://api.openai.com/v1/chat/completions", | |
| headers={"Authorization": f"Bearer {OPENAI_API_KEY}", "Content-Type": "application/json"}, | |
| json={"model": "gpt-3.5-turbo", "messages": [ | |
| {"role": "system", "content": "You are a skilled detective conducting interrogations."}, | |
| {"role": "user", "content": prompt} | |
| ], "temperature": 0.7, "max_tokens": 80}, timeout=60) | |
| q = response.json()["choices"][0]["message"]["content"].strip() | |
| s["last_question"] = q # Store the last generated question | |
| return jsonify({"question": q}) | |
| except Exception as e: | |
| import traceback | |
| print("Error in /ask_question:", e) | |
| traceback.print_exc() | |
| return jsonify({"error": str(e)}), 500 | |
| # ------------------------------------------------------------ | |
| # REAL-TIME FACE STREAM ENDPOINTS | |
| # ------------------------------------------------------------ | |
| def face_frame(): | |
| """Receive a single frame (base64 image) and update face metrics for the session. | |
| Expected JSON: {"session_id": str, "frame": "data:image/...;base64,...."} | |
| Returns latest metrics and a recommended command. | |
| """ | |
| data = request.get_json(silent=True) or {} | |
| sid = data.get("session_id") | |
| frame_b64 = data.get("frame") | |
| if not sid or sid not in sessions: | |
| return jsonify({"error": "Invalid session_id"}), 400 | |
| if not frame_b64 or not isinstance(frame_b64, str): | |
| return jsonify({"error": "No frame provided"}), 400 | |
| # Strip possible data URL header | |
| if "," in frame_b64: | |
| frame_b64 = frame_b64.split(",", 1)[1] | |
| try: | |
| img_bytes = base64.b64decode(frame_b64) | |
| nparr = np.frombuffer(img_bytes, np.uint8) | |
| img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) | |
| except Exception as e: | |
| return jsonify({"error": f"Invalid image data: {e}"}), 400 | |
| face_state = sessions[sid].setdefault("face", {}) | |
| prev_metrics = face_state.get("last_metrics") | |
| metrics = analyze_frame(img, previous=prev_metrics) | |
| # Simple command logic for MTCNN (can be expanded) | |
| cmd = "Proceed with your answer." if metrics.get("face_present") else "Please position your face in the frame and face the camera." | |
| # Keep a short history and update stats | |
| ts = time.time() | |
| history = face_state.setdefault("history", []) | |
| history.append({"t": ts, **metrics}) | |
| # Limit history length (keep more frames to cover longer answers) | |
| if len(history) > 600: | |
| history.pop(0) | |
| face_state["last_metrics"] = metrics | |
| face_state["last_update"] = ts | |
| face_state["last_command"] = cmd | |
| return jsonify({"metrics": metrics, "command": cmd, "timestamp": ts}) | |
| def face_status(): | |
| sid = request.args.get("session_id") | |
| if not sid or sid not in sessions: | |
| return jsonify({"error": "Invalid session_id"}), 400 | |
| face_state = sessions[sid].get("face", {}) | |
| metrics = face_state.get("last_metrics") | |
| cmd = face_state.get("last_command") or recommend_command(metrics) | |
| updated = face_state.get("last_update") | |
| return jsonify({ | |
| "metrics": metrics, | |
| "command": cmd, | |
| "last_update": updated, | |
| "history_len": len(face_state.get("history", [])) | |
| }) | |
| # ------------------------------------------------------------ | |
| # BODY LANGUAGE QUERY FUNCTION | |
| # ------------------------------------------------------------ | |
| def query_body_language_cue(cue_text, top_k=1): | |
| """ | |
| Query the body language FAISS index with a cue or description. | |
| Returns the most relevant entry's meaning and explanation. | |
| """ | |
| if body_book_index is None or not body_book_entries: | |
| return {"error": "Body language FAISS index not loaded."} | |
| cue_vec = MODEL_BODY.encode([cue_text]).astype('float32') | |
| D, I = body_book_index.search(cue_vec, k=top_k) | |
| valid_indices = [i for i in I[0] if i < len(body_book_entries)] | |
| results = [body_book_entries[i] for i in valid_indices] | |
| return results[0] if results else {"error": "No matching body language entry found."} | |
| # ------------------------------------------------------------ | |
| # BODY LANGUAGE EXPLANATION API ENDPOINT | |
| # ------------------------------------------------------------ | |
| def body_language_explain(): | |
| """ | |
| API endpoint to get body language meaning/explanation for a detected cue. | |
| Expects JSON: {"cue": "..."} | |
| Returns: {"meaning": ..., "explanation": ...} or error | |
| """ | |
| data = request.get_json(force=True) | |
| cue = data.get("cue", "") | |
| if not cue: | |
| return jsonify({"error": "No cue provided."}), 400 | |
| result = query_body_language_cue(cue) | |
| return jsonify(result) | |
| # ------------------------------------------------------------ | |
| if __name__ == "__main__": | |
| port = int(os.environ.get("PORT", "7860")) | |
| print(f"🚀 PyDetect Flask backend running at http://0.0.0.0:{port}") | |
| app.run(host="0.0.0.0", port=port, debug=False) | |