""" Orionus AI -- Behavioral Intelligence for Security Hugging Face Spaces Demo Self-contained Streamlit app demonstrating: - Multimodal security emotion analysis (text + simulated movement) - 10 security intent categories - 5 threat level assessment - Zone-based monitoring - Fuzzy inference visualization - Cross-modal conflict detection - Alert generation Gated behind an email-verified trial system (1 use per email, 60-second session). """ from __future__ import annotations import re import time import uuid import random from dataclasses import dataclass, field from enum import Enum from typing import Any import numpy as np import plotly.graph_objects as go import streamlit as st from auth import ( CODE_EXPIRY_SEC, LIVE_SESSION_MAX_SEC, TRIAL_DURATION_SEC, VIDEO_MAX_DURATION_SEC, generate_code, is_email_used, mark_email_used, remaining_seconds, send_verification_email, session_expired, smtp_configured, ) # ============================================================================ # PAGE CONFIG & CSS # ============================================================================ st.set_page_config( page_title="Orionus AI -- Behavioral Intelligence for Security", page_icon="shield", layout="wide", initial_sidebar_state="collapsed", ) BRAND_CSS = """ """ st.markdown(BRAND_CSS, unsafe_allow_html=True) # ============================================================================ # CONSTANTS # ============================================================================ GOEMOTIONS_LABELS = [ "admiration", "amusement", "anger", "annoyance", "approval", "caring", "confusion", "curiosity", "desire", "disappointment", "disapproval", "disgust", "embarrassment", "excitement", "fear", "gratitude", "grief", "joy", "love", "nervousness", "neutral", "optimism", "pride", "realization", "relief", "remorse", "sadness", "surprise", ] NUM_GOEMOTIONS = len(GOEMOTIONS_LABELS) INTENT_LABELS = [ "normal", "agitated", "aggressive", "attack_intent", "concealment", "evasion", "loitering", "panic", "deceptive", "erratic", ] THREAT_LEVELS_LIST = ["none", "low", "elevated", "high", "critical"] SECURITY_EMOTION_CLUSTERS = { "aggression": ["anger", "annoyance", "disapproval", "disgust"], "fear_anxiety": ["fear", "nervousness"], "distress": ["sadness", "grief", "disappointment"], "deception": ["amusement", "joy", "neutral"], "agitation": ["anger", "nervousness", "confusion", "surprise"], "calm_normal": ["neutral", "approval", "optimism", "relief"], } ZONE_TYPES = [ "checkpoint", "boarding", "transit", "queue", "retail", "restricted", "entrance", "gathering", "perimeter", "parking", ] # ============================================================================ # SCHEMAS # ============================================================================ class ThreatLevel(str, Enum): NONE = "none" LOW = "low" ELEVATED = "elevated" HIGH = "high" CRITICAL = "critical" class AlertPriority(str, Enum): INFO = "info" WARNING = "warning" URGENT = "urgent" CRITICAL = "critical" @dataclass class MovementFeatures: velocity: float = 0.0 acceleration: float = 0.0 direction_changes: float = 0.0 proximity_approach: float = 0.0 proximity_avoid: float = 0.0 hand_activity: float = 0.0 body_tension: float = 0.0 gait_regularity: float = 0.0 loiter_score: float = 0.0 crowd_interaction: float = 0.0 def as_dict(self) -> dict[str, float]: return { "velocity": self.velocity, "acceleration": self.acceleration, "direction_changes": self.direction_changes, "proximity_approach": self.proximity_approach, "proximity_avoid": self.proximity_avoid, "hand_activity": self.hand_activity, "body_tension": self.body_tension, "gait_regularity": self.gait_regularity, "loiter_score": self.loiter_score, "crowd_interaction": self.crowd_interaction, } @dataclass class MovementResult: features: MovementFeatures intent_scores: dict[str, float] dominant_intent: str = "normal" confidence: float = 1.0 timestamp: float = 0.0 @dataclass class FuzzyEmotionState: emotion: str memberships: dict[str, float] dominant_level: str crisp_value: float @dataclass class CrossModalConflict: emotion: str modality_a: str modality_b: str level_a: str level_b: str severity: float interpretation: str @dataclass class ThreatAssessmentResult: threat_level: ThreatLevel threat_score: float primary_intent: str intent_scores: dict[str, float] emotion_summary: dict[str, float] movement_summary: dict[str, float] conflicts: list[CrossModalConflict] fired_rules: list[str] contributing_factors: list[str] recommended_action: str @dataclass class Alert: alert_id: str zone_id: str subject_id: str | None priority: AlertPriority threat_level: ThreatLevel title: str description: str timestamp: float contributing_signals: list[str] recommended_action: str # ============================================================================ # FUZZY MEMBERSHIP FUNCTIONS # ============================================================================ FUZZY_LEVELS = ["absent", "low", "moderate", "high", "very_high"] LEVEL_CENTROIDS = { "absent": 0.0, "low": 0.15, "moderate": 0.35, "high": 0.60, "very_high": 0.85, } def _trapezoid(x: float, a: float, b: float, c: float, d: float) -> float: if x <= a or x >= d: return 0.0 if b <= x <= c: return 1.0 if a < x < b: return (x - a) / (b - a) return (d - x) / (d - c) def _left_shoulder(x: float, a: float, b: float) -> float: if x <= a: return 1.0 if x >= b: return 0.0 return (b - x) / (b - a) def _right_shoulder(x: float, a: float, b: float) -> float: if x <= a: return 0.0 if x >= b: return 1.0 return (x - a) / (b - a) def mf_absent(x): return _left_shoulder(x, 0.05, 0.12) def mf_low(x): return _trapezoid(x, 0.05, 0.12, 0.20, 0.30) def mf_moderate(x): return _trapezoid(x, 0.20, 0.30, 0.45, 0.55) def mf_high(x): return _trapezoid(x, 0.45, 0.55, 0.70, 0.80) def mf_very_high(x): return _right_shoulder(x, 0.70, 0.80) def fuzzify(crisp_value: float) -> dict[str, float]: return { "absent": mf_absent(crisp_value), "low": mf_low(crisp_value), "moderate": mf_moderate(crisp_value), "high": mf_high(crisp_value), "very_high": mf_very_high(crisp_value), } # ============================================================================ # FUZZY RULE BASE # ============================================================================ @dataclass class FuzzyRule: rule_id: str category: str conditions: list[dict] outputs: dict[str, str] threat_boost: float intent_signal: str description: str def build_security_rules(zone_type: str | None = None) -> list[FuzzyRule]: rules = [] # Aggression rules.append(FuzzyRule("S01_rising_aggression", "aggression", [{"modality": "face", "emotion": "angry", "level": "moderate", "op": ">="}, {"modality": "voice", "emotion": "angry", "level": "moderate", "op": ">="}, {"modality": "movement", "emotion": "body_tension", "level": "moderate", "op": ">="}], {"anger": "very_high", "nervousness": "low"}, 0.6, "aggressive", "Multi-modal aggression: face angry + voice angry + tense body")) rules.append(FuzzyRule("S02_pre_attack_posture", "aggression", [{"modality": "movement", "emotion": "body_tension", "level": "high", "op": ">="}, {"modality": "movement", "emotion": "acceleration", "level": "moderate", "op": ">="}, {"modality": "face", "emotion": "angry", "level": "low", "op": ">="}], {"anger": "high", "fear": "low"}, 0.8, "attack_intent", "Pre-attack posture: high body tension + sudden acceleration + angry face")) rules.append(FuzzyRule("S03_verbal_threat", "aggression", [{"modality": "text", "emotion": "anger", "level": "high", "op": ">="}, {"modality": "voice", "emotion": "angry", "level": "moderate", "op": ">="}], {"anger": "very_high"}, 0.5, "aggressive", "Verbal threat: angry text + angry voice")) rules.append(FuzzyRule("S04_approach_with_anger", "aggression", [{"modality": "movement", "emotion": "proximity_approach", "level": "high", "op": ">="}, {"modality": "face", "emotion": "angry", "level": "moderate", "op": ">="}], {"anger": "high"}, 0.7, "attack_intent", "Aggressive approach: rapid approach + angry face")) # Deception rules.append(FuzzyRule("S05_calm_face_stressed_voice", "deception", [{"modality": "face", "emotion": "neutral", "level": "high", "op": ">="}, {"modality": "voice", "emotion": "fearful", "level": "moderate", "op": ">="}], {"nervousness": "high", "neutral": "low"}, 0.4, "deceptive", "Deception: calm face but stressed voice -- concealing fear")) rules.append(FuzzyRule("S06_forced_calm", "deception", [{"modality": "face", "emotion": "happy", "level": "moderate", "op": ">="}, {"modality": "movement", "emotion": "body_tension", "level": "moderate", "op": ">="}, {"modality": "movement", "emotion": "hand_activity", "level": "moderate", "op": ">="}], {"nervousness": "high", "joy": "low"}, 0.35, "deceptive", "Forced calm: smiling but tense body + fidgety hands")) rules.append(FuzzyRule("S07_social_engineering", "deception", [{"modality": "face", "emotion": "happy", "level": "high", "op": ">="}, {"modality": "voice", "emotion": "happy", "level": "moderate", "op": ">="}, {"modality": "text", "emotion": "nervousness", "level": "moderate", "op": ">="}], {"nervousness": "high", "joy": "absent"}, 0.45, "deceptive", "Social engineering: overly friendly but nervous text content")) # Panic rules.append(FuzzyRule("S08_genuine_panic", "panic", [{"modality": "face", "emotion": "fear", "level": "high", "op": ">="}, {"modality": "voice", "emotion": "fearful", "level": "moderate", "op": ">="}, {"modality": "movement", "emotion": "velocity", "level": "high", "op": ">="}], {"fear": "very_high"}, 0.5, "panic", "Genuine panic: fearful face + fearful voice + running")) rules.append(FuzzyRule("S10_crowd_panic", "panic", [{"modality": "movement", "emotion": "velocity", "level": "high", "op": ">="}, {"modality": "movement", "emotion": "crowd_interaction", "level": "high", "op": ">="}], {"fear": "high", "surprise": "high"}, 0.6, "panic", "Mass movement: high velocity + crowd interaction")) # Surveillance rules.append(FuzzyRule("S11_suspicious_loitering", "surveillance", [{"modality": "movement", "emotion": "loiter_score", "level": "moderate", "op": ">="}, {"modality": "face", "emotion": "neutral", "level": "high", "op": ">="}], {"neutral": "moderate"}, 0.25, "loitering", "Suspicious loitering: lingering + watchful face")) rules.append(FuzzyRule("S12_evasive_movement", "surveillance", [{"modality": "movement", "emotion": "direction_changes", "level": "high", "op": ">="}, {"modality": "movement", "emotion": "proximity_avoid", "level": "moderate", "op": ">="}], {"nervousness": "high"}, 0.4, "evasion", "Evasive movement: direction changes + avoiding security areas")) rules.append(FuzzyRule("S13_concealment_behavior", "surveillance", [{"modality": "movement", "emotion": "hand_activity", "level": "high", "op": ">="}, {"modality": "movement", "emotion": "velocity", "level": "low", "op": "<="}, {"modality": "face", "emotion": "neutral", "level": "moderate", "op": ">="}], {"nervousness": "moderate"}, 0.35, "concealment", "Concealment: high hand activity + slow movement + controlled face")) rules.append(FuzzyRule("S14_erratic_behavior", "surveillance", [{"modality": "movement", "emotion": "gait_regularity", "level": "high", "op": ">="}, {"modality": "movement", "emotion": "direction_changes", "level": "moderate", "op": ">="}], {"confusion": "high"}, 0.3, "erratic", "Erratic behavior: irregular gait + frequent direction changes")) # De-escalation rules.append(FuzzyRule("S15_genuine_calm", "deescalation", [{"modality": "face", "emotion": "neutral", "level": "high", "op": ">="}, {"modality": "voice", "emotion": "calm", "level": "high", "op": ">="}, {"modality": "movement", "emotion": "body_tension", "level": "absent", "op": "<="}], {"neutral": "very_high"}, -0.3, "normal", "Genuine calm: neutral face + calm voice + relaxed body")) # Zone-specific if zone_type == "checkpoint": rules.append(FuzzyRule("Z02_checkpoint_evasion", "zone", [{"modality": "movement", "emotion": "direction_changes", "level": "high", "op": ">="}, {"modality": "movement", "emotion": "velocity", "level": "moderate", "op": ">="}], {"nervousness": "very_high"}, 0.6, "evasion", "Checkpoint evasion: sudden direction change near checkpoint")) elif zone_type == "restricted": rules.append(FuzzyRule("Z03_restricted_presence", "zone", [{"modality": "movement", "emotion": "loiter_score", "level": "low", "op": ">="}], {"nervousness": "moderate"}, 0.4, "evasion", "Restricted zone presence: any loitering in restricted area")) elif zone_type == "entrance": rules.append(FuzzyRule("Z04_entrance_reversal", "zone", [{"modality": "movement", "emotion": "direction_changes", "level": "high", "op": ">="}, {"modality": "face", "emotion": "fear", "level": "moderate", "op": ">="}], {"nervousness": "high"}, 0.5, "evasion", "Entrance reversal: turning back at entrance with fear")) elif zone_type == "gathering": rules.append(FuzzyRule("Z05_crowd_agitator", "zone", [{"modality": "voice", "emotion": "angry", "level": "high", "op": ">="}, {"modality": "movement", "emotion": "hand_activity", "level": "high", "op": ">="}], {"anger": "very_high"}, 0.5, "aggressive", "Crowd agitator: loud angry voice + aggressive gestures")) return rules # ============================================================================ # TEXT EMOTION SIMULATOR # ============================================================================ _TEXT_EMOTION_KEYWORDS = { "anger": ["angry", "furious", "hate", "rage", "kill", "destroy", "attack", "fight", "threat", "damn", "hell"], "annoyance": ["annoyed", "irritated", "frustrated", "bothered", "stupid", "idiot", "ridiculous", "unacceptable"], "fear": ["afraid", "scared", "terrified", "panic", "danger", "help", "run", "bomb", "gun", "weapon"], "nervousness": ["nervous", "anxious", "worried", "uneasy", "tense", "shaking", "sweating"], "joy": ["happy", "glad", "wonderful", "great", "love", "beautiful", "amazing", "fantastic"], "sadness": ["sad", "depressed", "crying", "miserable", "lonely", "grief", "mourn"], "surprise": ["surprised", "shocked", "unexpected", "wow", "unbelievable", "sudden"], "disgust": ["disgusting", "revolting", "vile", "sick", "horrible", "gross"], "confusion": ["confused", "lost", "unclear", "don't understand", "what", "spinning"], "neutral": ["ok", "fine", "normal", "nothing", "usual", "regular", "just"], "disapproval": ["wrong", "disagree", "bad", "terrible", "no"], "curiosity": ["curious", "wonder", "interested"], "admiration": ["admire", "respect", "impressive", "brilliant"], "approval": ["agree", "yes", "correct", "right", "good"], "caring": ["care", "worry about", "hope", "please", "safe"], "excitement": ["excited", "thrilled", "can't wait", "pumped"], "gratitude": ["thank", "grateful", "appreciate"], "optimism": ["hope", "optimistic", "better", "improve"], "embarrassment": ["embarrassed", "ashamed", "awkward"], "desire": ["want", "wish", "need"], "disappointment": ["disappointed", "letdown", "expected more", "waiting"], "love": ["love", "adore", "cherish", "kind"], "pride": ["proud", "accomplished"], "realization": ["realize", "understand now", "see", "dawned"], "relief": ["relieved", "finally", "phew"], "remorse": ["sorry", "regret", "apologize"], "grief": ["death", "gone forever"], "amusement": ["funny", "laugh", "hilarious", "joke", "haha"], } def simulate_text_emotions(text: str) -> dict[str, float]: text_lower = text.lower() scores = {label: 0.01 for label in GOEMOTIONS_LABELS} for emotion, keywords in _TEXT_EMOTION_KEYWORDS.items(): for kw in keywords: if kw in text_lower: scores[emotion] = min(scores[emotion] + 0.25, 1.0) total = sum(scores.values()) if total > 0: scores = {k: v / total for k, v in scores.items()} return scores def simulate_face_emotions(text_emotions: dict[str, float]) -> dict[str, float]: face_map = { "angry": text_emotions.get("anger", 0) * 0.7 + text_emotions.get("annoyance", 0) * 0.3, "disgust": text_emotions.get("disgust", 0) * 0.8 + text_emotions.get("disapproval", 0) * 0.2, "fear": text_emotions.get("fear", 0) * 0.7 + text_emotions.get("nervousness", 0) * 0.3, "happy": text_emotions.get("joy", 0) * 0.6 + text_emotions.get("amusement", 0) * 0.2 + text_emotions.get("excitement", 0) * 0.2, "sad": text_emotions.get("sadness", 0) * 0.7 + text_emotions.get("grief", 0) * 0.2 + text_emotions.get("disappointment", 0) * 0.1, "surprise": text_emotions.get("surprise", 0) * 0.8 + text_emotions.get("realization", 0) * 0.2, "neutral": text_emotions.get("neutral", 0) * 0.8 + text_emotions.get("approval", 0) * 0.2, } for k in face_map: face_map[k] = max(0, face_map[k] + random.gauss(0, 0.02)) total = sum(face_map.values()) if total > 0: face_map = {k: v / total for k, v in face_map.items()} return face_map def simulate_voice_emotions(text_emotions: dict[str, float]) -> dict[str, float]: voice_map = { "angry": text_emotions.get("anger", 0) * 0.6 + text_emotions.get("annoyance", 0) * 0.3, "calm": text_emotions.get("neutral", 0) * 0.5 + text_emotions.get("relief", 0) * 0.3, "disgust": text_emotions.get("disgust", 0) * 0.7, "fearful": text_emotions.get("fear", 0) * 0.7 + text_emotions.get("nervousness", 0) * 0.4, "happy": text_emotions.get("joy", 0) * 0.6 + text_emotions.get("excitement", 0) * 0.3, "neutral": text_emotions.get("neutral", 0) * 0.6, "sad": text_emotions.get("sadness", 0) * 0.7 + text_emotions.get("grief", 0) * 0.2, "surprised": text_emotions.get("surprise", 0) * 0.8, } for k in voice_map: voice_map[k] = max(0, voice_map[k] + random.gauss(0, 0.02)) total = sum(voice_map.values()) if total > 0: voice_map = {k: v / total for k, v in voice_map.items()} return voice_map # ============================================================================ # EMOTION PROJECTOR # ============================================================================ FACE_TO_GOEMOTIONS_MAP = { "angry": ["anger", "annoyance"], "disgust": ["disgust", "disapproval"], "fear": ["fear", "nervousness"], "happy": ["joy", "amusement", "excitement"], "sad": ["sadness", "grief", "disappointment"], "surprise": ["surprise", "realization"], "neutral": ["neutral"], } VOICE_TO_GOEMOTIONS_MAP = { "angry": ["anger", "annoyance"], "calm": ["neutral", "relief"], "disgust": ["disgust", "disapproval"], "fearful": ["fear", "nervousness"], "happy": ["joy", "amusement", "excitement"], "neutral": ["neutral"], "sad": ["sadness", "grief", "disappointment"], "surprised": ["surprise", "realization"], } def project_to_goemotions(source_probs: dict[str, float], mapping: dict[str, list[str]]) -> np.ndarray: vec = np.zeros(NUM_GOEMOTIONS) go_idx = {label: i for i, label in enumerate(GOEMOTIONS_LABELS)} for src_label, prob in source_probs.items(): targets = mapping.get(src_label, []) if targets: weight = prob / len(targets) for t in targets: if t in go_idx: vec[go_idx[t]] += weight total = vec.sum() if total > 0: vec = vec / total return vec def movement_to_emotion_space(intent_scores: dict[str, float]) -> np.ndarray: vec = np.zeros(NUM_GOEMOTIONS) idx = {label: i for i, label in enumerate(GOEMOTIONS_LABELS)} attack = intent_scores.get("attack_intent", 0) + intent_scores.get("aggressive", 0) * 0.8 vec[idx["anger"]] += attack * 0.6; vec[idx["disgust"]] += attack * 0.2; vec[idx["annoyance"]] += attack * 0.2 p = intent_scores.get("panic", 0) vec[idx["fear"]] += p * 0.5; vec[idx["nervousness"]] += p * 0.3; vec[idx["surprise"]] += p * 0.2 e = intent_scores.get("evasion", 0) vec[idx["nervousness"]] += e * 0.6; vec[idx["fear"]] += e * 0.3 c = intent_scores.get("concealment", 0) vec[idx["nervousness"]] += c * 0.5; vec[idx["fear"]] += c * 0.2; vec[idx["neutral"]] += c * 0.3 er = intent_scores.get("erratic", 0) vec[idx["confusion"]] += er * 0.5; vec[idx["surprise"]] += er * 0.3 ag = intent_scores.get("agitated", 0) vec[idx["anger"]] += ag * 0.3; vec[idx["nervousness"]] += ag * 0.4; vec[idx["annoyance"]] += ag * 0.3 lo = intent_scores.get("loitering", 0) vec[idx["neutral"]] += lo * 0.7; vec[idx["nervousness"]] += lo * 0.3 n = intent_scores.get("normal", 0) vec[idx["neutral"]] += n * 0.8; vec[idx["approval"]] += n * 0.1; vec[idx["optimism"]] += n * 0.1 total = vec.sum() if total > 0: vec = vec / total return vec # ============================================================================ # INTENT MAPPER # ============================================================================ def map_movement_to_intents(features: MovementFeatures) -> dict[str, float]: feat_dict = features.as_dict() scores = {intent: 0.0 for intent in INTENT_LABELS} scores["normal"] = 0.5 rules = [ {"intent": "attack_intent", "conditions": {"proximity_approach": (">=", 0.6), "body_tension": (">=", 0.5), "acceleration": (">=", 0.4)}, "weight": 1.0}, {"intent": "aggressive", "conditions": {"hand_activity": (">=", 0.5), "body_tension": (">=", 0.4), "velocity": (">=", 0.3)}, "weight": 0.9}, {"intent": "concealment", "conditions": {"hand_activity": (">=", 0.4), "body_tension": (">=", 0.3), "velocity": ("<=", 0.3)}, "weight": 0.8}, {"intent": "evasion", "conditions": {"direction_changes": (">=", 0.5), "proximity_avoid": (">=", 0.4), "velocity": (">=", 0.3)}, "weight": 0.85}, {"intent": "loitering", "conditions": {"loiter_score": (">=", 0.3), "velocity": ("<=", 0.2)}, "weight": 0.7}, {"intent": "panic", "conditions": {"velocity": (">=", 0.7), "acceleration": (">=", 0.5), "gait_regularity": (">=", 0.4)}, "weight": 0.95}, {"intent": "erratic", "conditions": {"gait_regularity": (">=", 0.5), "direction_changes": (">=", 0.4)}, "weight": 0.75}, {"intent": "agitated", "conditions": {"hand_activity": (">=", 0.3), "body_tension": (">=", 0.3)}, "weight": 0.6}, ] for rule in rules: activations = [] match = True for feature, (op, threshold) in rule["conditions"].items(): value = feat_dict.get(feature, 0.0) if op == ">=" and value >= threshold: activations.append(min((value - threshold) / (1.0 - threshold + 1e-6) + 0.5, 1.0)) elif op == "<=" and value <= threshold: activations.append(min((threshold - value) / (threshold + 1e-6) + 0.5, 1.0)) else: match = False; break if match and activations: activation = min(activations) weighted = activation * rule["weight"] scores[rule["intent"]] = max(scores[rule["intent"]], weighted) scores["normal"] *= (1 - weighted * 0.5) total = sum(scores.values()) if total > 0: scores = {k: v / total for k, v in scores.items()} return scores # ============================================================================ # CONFLICT DETECTION # ============================================================================ _LEVEL_ORDER = {"absent": 0, "low": 1, "moderate": 2, "high": 3, "very_high": 4} _CONFLICT_INTERPRETATIONS = { ("face", "voice", "joy", "sadness"): "Calm face masking stress -- possible deception", ("face", "voice", "joy", "anger"): "Feigned friendliness masking hostility", ("face", "voice", "neutral", "anger"): "Suppressed anger -- covert hostility", ("face", "voice", "neutral", "fear"): "Suppressed fear -- voice reveals anxiety", ("face", "text", "joy", "fear"): "Feigned composure -- smiling but expressing fear in words", ("face", "text", "neutral", "nervousness"): "Controlled exterior, anxious interior -- concealment", ("voice", "text", "neutral", "anger"): "Controlled voice, angry words -- measured hostility", } def detect_conflicts(modality_fuzzy, modality_names, threshold=0.3): conflicts = [] opposing = [("joy", "sadness"), ("joy", "anger"), ("joy", "fear"), ("anger", "fear"), ("neutral", "anger"), ("neutral", "fear"), ("neutral", "nervousness")] for i in range(len(modality_names)): for j in range(i + 1, len(modality_names)): mod_a, mod_b = modality_names[i], modality_names[j] states_a = modality_fuzzy.get(mod_a, {}) states_b = modality_fuzzy.get(mod_b, {}) for emo_a, emo_b in opposing: memb_a = states_a.get(emo_a, {"absent": 1.0}) memb_b = states_b.get(emo_b, {"absent": 1.0}) level_a = max(memb_a, key=memb_a.get) level_b = max(memb_b, key=memb_b.get) if _LEVEL_ORDER.get(level_a, 0) >= 2 and _LEVEL_ORDER.get(level_b, 0) >= 2: severity = min(1.0, (memb_a.get(level_a, 0) + memb_b.get(level_b, 0)) / 2.0) if severity >= threshold: key = (mod_a, mod_b, emo_a, emo_b) interp = _CONFLICT_INTERPRETATIONS.get(key, f"Cross-modal disagreement: {mod_a} shows {emo_a}({level_a}) while {mod_b} shows {emo_b}({level_b})") conflicts.append(CrossModalConflict( emotion=f"{emo_a}_vs_{emo_b}", modality_a=mod_a, modality_b=mod_b, level_a=level_a, level_b=level_b, severity=severity, interpretation=interp)) conflicts.sort(key=lambda c: c.severity, reverse=True) return conflicts # ============================================================================ # RULE EVALUATOR # ============================================================================ def evaluate_rules(rules, fuzzified_states, movement_features=None): fired = [] level_order = ["absent", "low", "moderate", "high", "very_high"] for rule in rules: activations = [] for cond in rule.conditions: modality, emotion, level = cond["modality"], cond["emotion"], cond["level"] op = cond.get("op", ">=") if modality == "movement" and movement_features: memberships = fuzzify(movement_features.get(emotion, 0.0)) elif modality in fuzzified_states and emotion in fuzzified_states[modality]: memberships = fuzzified_states[modality][emotion] else: activations.append(0.0); continue target_idx = level_order.index(level) if level in level_order else 0 if op == ">=": activation = sum(memberships.get(l, 0.0) for l in level_order[target_idx:]) elif op == "<=": activation = sum(memberships.get(l, 0.0) for l in level_order[:target_idx + 1]) else: activation = memberships.get(level, 0.0) activations.append(activation) rule_activation = min(activations) if activations else 0.0 if rule_activation > 0.01: fired.append((rule, rule_activation)) return fired # ============================================================================ # THREAT ASSESSOR # ============================================================================ def assess_threat(emotion_probs, movement_result, conflict_score, fired_rules, conflicts, zone_sensitivity=1.0): aggression = sum(emotion_probs.get(e, 0) for e in SECURITY_EMOTION_CLUSTERS["aggression"]) fear = sum(emotion_probs.get(e, 0) for e in SECURITY_EMOTION_CLUSTERS["fear_anxiety"]) agitation = sum(emotion_probs.get(e, 0) for e in SECURITY_EMOTION_CLUSTERS["agitation"]) emotion_threat = min(aggression * 0.5 + fear * 0.2 + agitation * 0.3, 1.0) movement_threat = 0.0; intent_scores = {}; primary_intent = "normal"; movement_summary = {} if movement_result: high_threat = {"attack_intent": 1.0, "aggressive": 0.8, "panic": 0.7, "evasion": 0.5, "concealment": 0.5, "erratic": 0.4, "loitering": 0.2, "agitated": 0.3, "deceptive": 0.4, "normal": 0.0} movement_threat = min(sum(movement_result.intent_scores.get(i, 0) * w for i, w in high_threat.items()), 1.0) intent_scores = movement_result.intent_scores primary_intent = movement_result.dominant_intent movement_summary = {k: v for k, v in movement_result.features.as_dict().items() if v > 0.1} conflict_threat = conflict_score * 0.3 rule_threat = max(min(sum(r.threat_boost * a for r, a in fired_rules), 1.0), 0.0) if fired_rules else 0.0 raw_score = emotion_threat * 0.25 + movement_threat * 0.35 + conflict_threat * 0.15 + rule_threat * 0.25 threat_score = min(raw_score * zone_sensitivity, 1.0) if threat_score < 0.15: threat_level = ThreatLevel.NONE elif threat_score < 0.35: threat_level = ThreatLevel.LOW elif threat_score < 0.55: threat_level = ThreatLevel.ELEVATED elif threat_score < 0.75: threat_level = ThreatLevel.HIGH else: threat_level = ThreatLevel.CRITICAL if fired_rules: rule_intents = {} for rule, activation in fired_rules: rule_intents[rule.intent_signal] = max(rule_intents.get(rule.intent_signal, 0), activation) if rule_intents: ri = max(rule_intents, key=rule_intents.get) if rule_intents[ri] > 0.3 and ri != "normal": primary_intent = ri factors = [] if emotion_threat > 0.3: top_emo = max(emotion_probs, key=emotion_probs.get) factors.append(f"Elevated emotion: {top_emo} ({emotion_probs[top_emo]:.2f})") if movement_threat > 0.3 and movement_result: factors.append(f"Movement intent: {movement_result.dominant_intent} ({movement_result.confidence:.2f})") if conflict_score > 0.3: factors.append(f"Cross-modal conflict detected (score: {conflict_score:.2f})") for conflict in conflicts[:3]: factors.append(f"Conflict: {conflict.interpretation}") for rule, activation in fired_rules: if activation > 0.3: factors.append(f"Rule {rule.rule_id}: {rule.description[:80]}") actions = { ThreatLevel.NONE: "No action required. Continue standard monitoring.", ThreatLevel.LOW: "Monitor subject. Log for review.", ThreatLevel.ELEVATED: "Increase surveillance. Alert nearby personnel.", ThreatLevel.HIGH: "Dispatch security team. Prepare for intervention.", ThreatLevel.CRITICAL: "IMMEDIATE RESPONSE. All security to location. Consider lockdown.", } action = actions.get(threat_level, "Monitor.") intent_actions = { "attack_intent": " Approach with caution. Subject may be armed.", "aggressive": " De-escalation team recommended.", "concealment": " Search may be warranted.", "evasion": " Track subject. Cover exit routes.", "panic": " Assess trigger. Crowd control may be needed.", "erratic": " Medical team on standby.", "deceptive": " Structured questioning. Second officer recommended.", } if primary_intent in intent_actions and threat_level.value in ("elevated", "high", "critical"): action += intent_actions[primary_intent] sorted_emo = sorted(emotion_probs.items(), key=lambda x: -x[1]) return ThreatAssessmentResult( threat_level=threat_level, threat_score=threat_score, primary_intent=primary_intent, intent_scores=intent_scores, emotion_summary=dict(sorted_emo[:5]), movement_summary=movement_summary, conflicts=conflicts, fired_rules=[r.rule_id for r, _ in fired_rules], contributing_factors=factors, recommended_action=action) # ============================================================================ # ALERT ENGINE # ============================================================================ def generate_alert(threat, zone_id, subject_id="SUBJ-001"): level_order = [ThreatLevel.NONE, ThreatLevel.LOW, ThreatLevel.ELEVATED, ThreatLevel.HIGH, ThreatLevel.CRITICAL] if level_order.index(threat.threat_level) < level_order.index(ThreatLevel.ELEVATED): return None priority_map = {ThreatLevel.ELEVATED: AlertPriority.WARNING, ThreatLevel.HIGH: AlertPriority.URGENT, ThreatLevel.CRITICAL: AlertPriority.CRITICAL} priority = priority_map.get(threat.threat_level, AlertPriority.INFO) intent_titles = { "attack_intent": "Potential Attack Behavior Detected", "aggressive": "Aggressive Behavior Detected", "concealment": "Suspicious Concealment Activity", "evasion": "Evasive Movement Detected", "loitering": "Suspicious Loitering", "panic": "Panic Response Detected", "erratic": "Erratic Behavior Observed", "deceptive": "Deceptive Behavior Indicators", "agitated": "Agitated Subject Detected", } title = intent_titles.get(threat.primary_intent, f"Security Alert: {threat.threat_level.value.upper()}") desc_lines = [f"Threat Score: {threat.threat_score:.2f} ({threat.threat_level.value})", f"Primary Intent: {threat.primary_intent}"] if threat.emotion_summary: top = list(threat.emotion_summary.items())[:3] desc_lines.append(f"Top Emotions: {', '.join(f'{e}: {s:.2f}' for e, s in top)}") if threat.contributing_factors: desc_lines.append("Contributing Factors:") for f in threat.contributing_factors[:5]: desc_lines.append(f" - {f}") return Alert(alert_id=str(uuid.uuid4())[:8], zone_id=zone_id, subject_id=subject_id, priority=priority, threat_level=threat.threat_level, title=title, description="\n".join(desc_lines), timestamp=time.time(), contributing_signals=threat.contributing_factors[:5], recommended_action=threat.recommended_action) # ============================================================================ # FULL FUSION PIPELINE # ============================================================================ def run_security_fusion(text, movement_features, zone_type, zone_sensitivity): text_emotions = simulate_text_emotions(text) face_emotions = simulate_face_emotions(text_emotions) voice_emotions = simulate_voice_emotions(text_emotions) face_28 = project_to_goemotions(face_emotions, FACE_TO_GOEMOTIONS_MAP) voice_28 = project_to_goemotions(voice_emotions, VOICE_TO_GOEMOTIONS_MAP) text_28 = np.array([text_emotions.get(l, 0.01) for l in GOEMOTIONS_LABELS]) t = text_28.sum() if t > 0: text_28 = text_28 / t intent_scores = map_movement_to_intents(movement_features) movement_28 = movement_to_emotion_space(intent_scores) dominant_intent = max(intent_scores, key=intent_scores.get) movement_result = MovementResult(features=movement_features, intent_scores=intent_scores, dominant_intent=dominant_intent, confidence=intent_scores[dominant_intent]) weights = {"face": 0.20, "voice": 0.25, "text": 0.20, "movement": 0.35} base_crisp = weights["face"] * face_28 + weights["voice"] * voice_28 + weights["text"] * text_28 + weights["movement"] * movement_28 modality_fuzzy = {} for mod_name, vec in [("face", face_28), ("voice", voice_28), ("text", text_28), ("movement", movement_28)]: fuzzy_states = {} for i, label in enumerate(GOEMOTIONS_LABELS): fuzzy_states[label] = fuzzify(float(vec[i])) modality_fuzzy[mod_name] = fuzzy_states rules = build_security_rules(zone_type) fired_rules = evaluate_rules(rules, modality_fuzzy, movement_features.as_dict()) conflicts = detect_conflicts(modality_fuzzy, ["face", "voice", "text", "movement"]) conflict_score = min(1.0, sum(c.severity ** 2 for c in conflicts) / max(len(conflicts), 1)) if conflicts else 0.0 # Defuzzification result = base_crisp.copy() label_idx = {label: i for i, label in enumerate(GOEMOTIONS_LABELS)} for rule, activation in fired_rules: for emotion_label, target_level in rule.outputs.items(): if emotion_label in label_idx: idx = label_idx[emotion_label] target_centroid = LEVEL_CENTROIDS.get(target_level, 0.35) effective = target_centroid * activation result[idx] = (1.0 - activation) * result[idx] + activation * effective base_fuzzy = {} for i, label in enumerate(GOEMOTIONS_LABELS): base_fuzzy[label] = fuzzify(float(base_crisp[i])) for label, memberships in base_fuzzy.items(): if label in label_idx: idx = label_idx[label] numerator = sum(mu * LEVEL_CENTROIDS.get(lev, 0) for lev, mu in memberships.items() if mu > 0) denominator = sum(mu for mu in memberships.values() if mu > 0) if denominator > 0: result[idx] = 0.7 * result[idx] + 0.3 * (numerator / denominator) result = np.maximum(result, 0) total = result.sum() if total > 0: result = result / total emotion_probs = dict(zip(GOEMOTIONS_LABELS, result.tolist())) threat = assess_threat(emotion_probs, movement_result, conflict_score, fired_rules, conflicts, zone_sensitivity) fuzzy_states = [] for i, label in enumerate(GOEMOTIONS_LABELS): memberships = base_fuzzy.get(label, {"absent": 1.0}) dominant = max(memberships, key=memberships.get) fuzzy_states.append(FuzzyEmotionState(emotion=label, memberships=memberships, dominant_level=dominant, crisp_value=float(result[i]))) alert = generate_alert(threat, f"zone_{zone_type}") return { "emotion_probs": emotion_probs, "face_emotions": face_emotions, "voice_emotions": voice_emotions, "text_emotions": text_emotions, "movement_result": movement_result, "threat": threat, "fuzzy_states": fuzzy_states, "conflicts": conflicts, "conflict_score": conflict_score, "fired_rules": fired_rules, "alert": alert, "weights": weights, } # ============================================================================ # SESSION STATE # ============================================================================ _DEFAULTS = { "auth_step": "email", "auth_email": "", "auth_code": "", "auth_code_ts": 0.0, "auth_email_pending": False, "session_start": 0.0, } for key, val in _DEFAULTS.items(): if key not in st.session_state: st.session_state[key] = val # ── Background email send (runs after rerun so UI stays responsive) ── if st.session_state.get("auth_email_pending") and st.session_state["auth_step"] == "verify": st.session_state["auth_email_pending"] = False print(f"[Auth] Attempting to send code to {st.session_state['auth_email']}...") try: sent = send_verification_email(st.session_state["auth_email"], st.session_state["auth_code"]) print(f"[Auth] send_verification_email returned: sent={sent}") if sent: st.session_state["auth_email_sent"] = True else: st.session_state["auth_email_error"] = "SMTP send failed" except Exception as _exc: print(f"[Auth] send_verification_email exception: {_exc}") st.session_state["auth_email_error"] = str(_exc) # ============================================================================ # AUTH GATE # ============================================================================ def _get_logo_b64(): import base64 from pathlib import Path logo_path = Path(__file__).parent / "logo.png" if logo_path.exists(): return base64.b64encode(logo_path.read_bytes()).decode() return None def _render_header(animated=True, size=300): _logo_b64 = _get_logo_b64() if _logo_b64: cls = 'floating-logo' if animated else '' st.markdown(f"""
Behavioral Intelligence for Security — Multimodal Threat Assessment & Intent Recognition System:
Real-time threat detection through AI-powered analysis of facial expressions, voice patterns, text sentiment, and body movement
Enter your email to receive a one-time verification code. Each email is valid for a single 60-second trial session.
A 6-digit code was sent to {st.session_state["auth_email"]}
Your 60-second demo session has ended.
Interested in the full Orionus platform?
Contact us at info@caitcore.com for enterprise access.
Orionus provides real-time multimodal threat assessment for airports, transit hubs, stadiums, and critical infrastructure.
Orionus AI performs multimodal behavioral intelligence fusion using: