""" Orionus AI -- Behavioral Intelligence for Security Hugging Face Spaces Demo Self-contained Streamlit app demonstrating: - Multimodal security emotion analysis (text + simulated movement) - 10 security intent categories - 5 threat level assessment - Zone-based monitoring - Fuzzy inference visualization - Cross-modal conflict detection - Alert generation Gated behind an email-verified trial system (1 use per email, 60-second session). """ from __future__ import annotations import re import time import uuid import random from dataclasses import dataclass, field from enum import Enum from typing import Any import numpy as np import plotly.graph_objects as go import streamlit as st from auth import ( CODE_EXPIRY_SEC, LIVE_SESSION_MAX_SEC, TRIAL_DURATION_SEC, VIDEO_MAX_DURATION_SEC, generate_code, is_email_used, mark_email_used, remaining_seconds, send_verification_email, session_expired, smtp_configured, ) # ============================================================================ # PAGE CONFIG & CSS # ============================================================================ st.set_page_config( page_title="Orionus AI -- Behavioral Intelligence for Security", page_icon="shield", layout="wide", initial_sidebar_state="collapsed", ) BRAND_CSS = """ """ st.markdown(BRAND_CSS, unsafe_allow_html=True) # ============================================================================ # CONSTANTS # ============================================================================ GOEMOTIONS_LABELS = [ "admiration", "amusement", "anger", "annoyance", "approval", "caring", "confusion", "curiosity", "desire", "disappointment", "disapproval", "disgust", "embarrassment", "excitement", "fear", "gratitude", "grief", "joy", "love", "nervousness", "neutral", "optimism", "pride", "realization", "relief", "remorse", "sadness", "surprise", ] NUM_GOEMOTIONS = len(GOEMOTIONS_LABELS) INTENT_LABELS = [ "normal", "agitated", "aggressive", "attack_intent", "concealment", "evasion", "loitering", "panic", "deceptive", "erratic", ] THREAT_LEVELS_LIST = ["none", "low", "elevated", "high", "critical"] SECURITY_EMOTION_CLUSTERS = { "aggression": ["anger", "annoyance", "disapproval", "disgust"], "fear_anxiety": ["fear", "nervousness"], "distress": ["sadness", "grief", "disappointment"], "deception": ["amusement", "joy", "neutral"], "agitation": ["anger", "nervousness", "confusion", "surprise"], "calm_normal": ["neutral", "approval", "optimism", "relief"], } ZONE_TYPES = [ "checkpoint", "boarding", "transit", "queue", "retail", "restricted", "entrance", "gathering", "perimeter", "parking", ] # ============================================================================ # SCHEMAS # ============================================================================ class ThreatLevel(str, Enum): NONE = "none" LOW = "low" ELEVATED = "elevated" HIGH = "high" CRITICAL = "critical" class AlertPriority(str, Enum): INFO = "info" WARNING = "warning" URGENT = "urgent" CRITICAL = "critical" @dataclass class MovementFeatures: velocity: float = 0.0 acceleration: float = 0.0 direction_changes: float = 0.0 proximity_approach: float = 0.0 proximity_avoid: float = 0.0 hand_activity: float = 0.0 body_tension: float = 0.0 gait_regularity: float = 0.0 loiter_score: float = 0.0 crowd_interaction: float = 0.0 def as_dict(self) -> dict[str, float]: return { "velocity": self.velocity, "acceleration": self.acceleration, "direction_changes": self.direction_changes, "proximity_approach": self.proximity_approach, "proximity_avoid": self.proximity_avoid, "hand_activity": self.hand_activity, "body_tension": self.body_tension, "gait_regularity": self.gait_regularity, "loiter_score": self.loiter_score, "crowd_interaction": self.crowd_interaction, } @dataclass class MovementResult: features: MovementFeatures intent_scores: dict[str, float] dominant_intent: str = "normal" confidence: float = 1.0 timestamp: float = 0.0 @dataclass class FuzzyEmotionState: emotion: str memberships: dict[str, float] dominant_level: str crisp_value: float @dataclass class CrossModalConflict: emotion: str modality_a: str modality_b: str level_a: str level_b: str severity: float interpretation: str @dataclass class ThreatAssessmentResult: threat_level: ThreatLevel threat_score: float primary_intent: str intent_scores: dict[str, float] emotion_summary: dict[str, float] movement_summary: dict[str, float] conflicts: list[CrossModalConflict] fired_rules: list[str] contributing_factors: list[str] recommended_action: str @dataclass class Alert: alert_id: str zone_id: str subject_id: str | None priority: AlertPriority threat_level: ThreatLevel title: str description: str timestamp: float contributing_signals: list[str] recommended_action: str # ============================================================================ # FUZZY MEMBERSHIP FUNCTIONS # ============================================================================ FUZZY_LEVELS = ["absent", "low", "moderate", "high", "very_high"] LEVEL_CENTROIDS = { "absent": 0.0, "low": 0.15, "moderate": 0.35, "high": 0.60, "very_high": 0.85, } def _trapezoid(x: float, a: float, b: float, c: float, d: float) -> float: if x <= a or x >= d: return 0.0 if b <= x <= c: return 1.0 if a < x < b: return (x - a) / (b - a) return (d - x) / (d - c) def _left_shoulder(x: float, a: float, b: float) -> float: if x <= a: return 1.0 if x >= b: return 0.0 return (b - x) / (b - a) def _right_shoulder(x: float, a: float, b: float) -> float: if x <= a: return 0.0 if x >= b: return 1.0 return (x - a) / (b - a) def mf_absent(x): return _left_shoulder(x, 0.05, 0.12) def mf_low(x): return _trapezoid(x, 0.05, 0.12, 0.20, 0.30) def mf_moderate(x): return _trapezoid(x, 0.20, 0.30, 0.45, 0.55) def mf_high(x): return _trapezoid(x, 0.45, 0.55, 0.70, 0.80) def mf_very_high(x): return _right_shoulder(x, 0.70, 0.80) def fuzzify(crisp_value: float) -> dict[str, float]: return { "absent": mf_absent(crisp_value), "low": mf_low(crisp_value), "moderate": mf_moderate(crisp_value), "high": mf_high(crisp_value), "very_high": mf_very_high(crisp_value), } # ============================================================================ # FUZZY RULE BASE # ============================================================================ @dataclass class FuzzyRule: rule_id: str category: str conditions: list[dict] outputs: dict[str, str] threat_boost: float intent_signal: str description: str def build_security_rules(zone_type: str | None = None) -> list[FuzzyRule]: rules = [] # Aggression rules.append(FuzzyRule("S01_rising_aggression", "aggression", [{"modality": "face", "emotion": "angry", "level": "moderate", "op": ">="}, {"modality": "voice", "emotion": "angry", "level": "moderate", "op": ">="}, {"modality": "movement", "emotion": "body_tension", "level": "moderate", "op": ">="}], {"anger": "very_high", "nervousness": "low"}, 0.6, "aggressive", "Multi-modal aggression: face angry + voice angry + tense body")) rules.append(FuzzyRule("S02_pre_attack_posture", "aggression", [{"modality": "movement", "emotion": "body_tension", "level": "high", "op": ">="}, {"modality": "movement", "emotion": "acceleration", "level": "moderate", "op": ">="}, {"modality": "face", "emotion": "angry", "level": "low", "op": ">="}], {"anger": "high", "fear": "low"}, 0.8, "attack_intent", "Pre-attack posture: high body tension + sudden acceleration + angry face")) rules.append(FuzzyRule("S03_verbal_threat", "aggression", [{"modality": "text", "emotion": "anger", "level": "high", "op": ">="}, {"modality": "voice", "emotion": "angry", "level": "moderate", "op": ">="}], {"anger": "very_high"}, 0.5, "aggressive", "Verbal threat: angry text + angry voice")) rules.append(FuzzyRule("S04_approach_with_anger", "aggression", [{"modality": "movement", "emotion": "proximity_approach", "level": "high", "op": ">="}, {"modality": "face", "emotion": "angry", "level": "moderate", "op": ">="}], {"anger": "high"}, 0.7, "attack_intent", "Aggressive approach: rapid approach + angry face")) # Deception rules.append(FuzzyRule("S05_calm_face_stressed_voice", "deception", [{"modality": "face", "emotion": "neutral", "level": "high", "op": ">="}, {"modality": "voice", "emotion": "fearful", "level": "moderate", "op": ">="}], {"nervousness": "high", "neutral": "low"}, 0.4, "deceptive", "Deception: calm face but stressed voice -- concealing fear")) rules.append(FuzzyRule("S06_forced_calm", "deception", [{"modality": "face", "emotion": "happy", "level": "moderate", "op": ">="}, {"modality": "movement", "emotion": "body_tension", "level": "moderate", "op": ">="}, {"modality": "movement", "emotion": "hand_activity", "level": "moderate", "op": ">="}], {"nervousness": "high", "joy": "low"}, 0.35, "deceptive", "Forced calm: smiling but tense body + fidgety hands")) rules.append(FuzzyRule("S07_social_engineering", "deception", [{"modality": "face", "emotion": "happy", "level": "high", "op": ">="}, {"modality": "voice", "emotion": "happy", "level": "moderate", "op": ">="}, {"modality": "text", "emotion": "nervousness", "level": "moderate", "op": ">="}], {"nervousness": "high", "joy": "absent"}, 0.45, "deceptive", "Social engineering: overly friendly but nervous text content")) # Panic rules.append(FuzzyRule("S08_genuine_panic", "panic", [{"modality": "face", "emotion": "fear", "level": "high", "op": ">="}, {"modality": "voice", "emotion": "fearful", "level": "moderate", "op": ">="}, {"modality": "movement", "emotion": "velocity", "level": "high", "op": ">="}], {"fear": "very_high"}, 0.5, "panic", "Genuine panic: fearful face + fearful voice + running")) rules.append(FuzzyRule("S10_crowd_panic", "panic", [{"modality": "movement", "emotion": "velocity", "level": "high", "op": ">="}, {"modality": "movement", "emotion": "crowd_interaction", "level": "high", "op": ">="}], {"fear": "high", "surprise": "high"}, 0.6, "panic", "Mass movement: high velocity + crowd interaction")) # Surveillance rules.append(FuzzyRule("S11_suspicious_loitering", "surveillance", [{"modality": "movement", "emotion": "loiter_score", "level": "moderate", "op": ">="}, {"modality": "face", "emotion": "neutral", "level": "high", "op": ">="}], {"neutral": "moderate"}, 0.25, "loitering", "Suspicious loitering: lingering + watchful face")) rules.append(FuzzyRule("S12_evasive_movement", "surveillance", [{"modality": "movement", "emotion": "direction_changes", "level": "high", "op": ">="}, {"modality": "movement", "emotion": "proximity_avoid", "level": "moderate", "op": ">="}], {"nervousness": "high"}, 0.4, "evasion", "Evasive movement: direction changes + avoiding security areas")) rules.append(FuzzyRule("S13_concealment_behavior", "surveillance", [{"modality": "movement", "emotion": "hand_activity", "level": "high", "op": ">="}, {"modality": "movement", "emotion": "velocity", "level": "low", "op": "<="}, {"modality": "face", "emotion": "neutral", "level": "moderate", "op": ">="}], {"nervousness": "moderate"}, 0.35, "concealment", "Concealment: high hand activity + slow movement + controlled face")) rules.append(FuzzyRule("S14_erratic_behavior", "surveillance", [{"modality": "movement", "emotion": "gait_regularity", "level": "high", "op": ">="}, {"modality": "movement", "emotion": "direction_changes", "level": "moderate", "op": ">="}], {"confusion": "high"}, 0.3, "erratic", "Erratic behavior: irregular gait + frequent direction changes")) # De-escalation rules.append(FuzzyRule("S15_genuine_calm", "deescalation", [{"modality": "face", "emotion": "neutral", "level": "high", "op": ">="}, {"modality": "voice", "emotion": "calm", "level": "high", "op": ">="}, {"modality": "movement", "emotion": "body_tension", "level": "absent", "op": "<="}], {"neutral": "very_high"}, -0.3, "normal", "Genuine calm: neutral face + calm voice + relaxed body")) # Zone-specific if zone_type == "checkpoint": rules.append(FuzzyRule("Z02_checkpoint_evasion", "zone", [{"modality": "movement", "emotion": "direction_changes", "level": "high", "op": ">="}, {"modality": "movement", "emotion": "velocity", "level": "moderate", "op": ">="}], {"nervousness": "very_high"}, 0.6, "evasion", "Checkpoint evasion: sudden direction change near checkpoint")) elif zone_type == "restricted": rules.append(FuzzyRule("Z03_restricted_presence", "zone", [{"modality": "movement", "emotion": "loiter_score", "level": "low", "op": ">="}], {"nervousness": "moderate"}, 0.4, "evasion", "Restricted zone presence: any loitering in restricted area")) elif zone_type == "entrance": rules.append(FuzzyRule("Z04_entrance_reversal", "zone", [{"modality": "movement", "emotion": "direction_changes", "level": "high", "op": ">="}, {"modality": "face", "emotion": "fear", "level": "moderate", "op": ">="}], {"nervousness": "high"}, 0.5, "evasion", "Entrance reversal: turning back at entrance with fear")) elif zone_type == "gathering": rules.append(FuzzyRule("Z05_crowd_agitator", "zone", [{"modality": "voice", "emotion": "angry", "level": "high", "op": ">="}, {"modality": "movement", "emotion": "hand_activity", "level": "high", "op": ">="}], {"anger": "very_high"}, 0.5, "aggressive", "Crowd agitator: loud angry voice + aggressive gestures")) return rules # ============================================================================ # TEXT EMOTION SIMULATOR # ============================================================================ _TEXT_EMOTION_KEYWORDS = { "anger": ["angry", "furious", "hate", "rage", "kill", "destroy", "attack", "fight", "threat", "damn", "hell"], "annoyance": ["annoyed", "irritated", "frustrated", "bothered", "stupid", "idiot", "ridiculous", "unacceptable"], "fear": ["afraid", "scared", "terrified", "panic", "danger", "help", "run", "bomb", "gun", "weapon"], "nervousness": ["nervous", "anxious", "worried", "uneasy", "tense", "shaking", "sweating"], "joy": ["happy", "glad", "wonderful", "great", "love", "beautiful", "amazing", "fantastic"], "sadness": ["sad", "depressed", "crying", "miserable", "lonely", "grief", "mourn"], "surprise": ["surprised", "shocked", "unexpected", "wow", "unbelievable", "sudden"], "disgust": ["disgusting", "revolting", "vile", "sick", "horrible", "gross"], "confusion": ["confused", "lost", "unclear", "don't understand", "what", "spinning"], "neutral": ["ok", "fine", "normal", "nothing", "usual", "regular", "just"], "disapproval": ["wrong", "disagree", "bad", "terrible", "no"], "curiosity": ["curious", "wonder", "interested"], "admiration": ["admire", "respect", "impressive", "brilliant"], "approval": ["agree", "yes", "correct", "right", "good"], "caring": ["care", "worry about", "hope", "please", "safe"], "excitement": ["excited", "thrilled", "can't wait", "pumped"], "gratitude": ["thank", "grateful", "appreciate"], "optimism": ["hope", "optimistic", "better", "improve"], "embarrassment": ["embarrassed", "ashamed", "awkward"], "desire": ["want", "wish", "need"], "disappointment": ["disappointed", "letdown", "expected more", "waiting"], "love": ["love", "adore", "cherish", "kind"], "pride": ["proud", "accomplished"], "realization": ["realize", "understand now", "see", "dawned"], "relief": ["relieved", "finally", "phew"], "remorse": ["sorry", "regret", "apologize"], "grief": ["death", "gone forever"], "amusement": ["funny", "laugh", "hilarious", "joke", "haha"], } def simulate_text_emotions(text: str) -> dict[str, float]: text_lower = text.lower() scores = {label: 0.01 for label in GOEMOTIONS_LABELS} for emotion, keywords in _TEXT_EMOTION_KEYWORDS.items(): for kw in keywords: if kw in text_lower: scores[emotion] = min(scores[emotion] + 0.25, 1.0) total = sum(scores.values()) if total > 0: scores = {k: v / total for k, v in scores.items()} return scores def simulate_face_emotions(text_emotions: dict[str, float]) -> dict[str, float]: face_map = { "angry": text_emotions.get("anger", 0) * 0.7 + text_emotions.get("annoyance", 0) * 0.3, "disgust": text_emotions.get("disgust", 0) * 0.8 + text_emotions.get("disapproval", 0) * 0.2, "fear": text_emotions.get("fear", 0) * 0.7 + text_emotions.get("nervousness", 0) * 0.3, "happy": text_emotions.get("joy", 0) * 0.6 + text_emotions.get("amusement", 0) * 0.2 + text_emotions.get("excitement", 0) * 0.2, "sad": text_emotions.get("sadness", 0) * 0.7 + text_emotions.get("grief", 0) * 0.2 + text_emotions.get("disappointment", 0) * 0.1, "surprise": text_emotions.get("surprise", 0) * 0.8 + text_emotions.get("realization", 0) * 0.2, "neutral": text_emotions.get("neutral", 0) * 0.8 + text_emotions.get("approval", 0) * 0.2, } for k in face_map: face_map[k] = max(0, face_map[k] + random.gauss(0, 0.02)) total = sum(face_map.values()) if total > 0: face_map = {k: v / total for k, v in face_map.items()} return face_map def simulate_voice_emotions(text_emotions: dict[str, float]) -> dict[str, float]: voice_map = { "angry": text_emotions.get("anger", 0) * 0.6 + text_emotions.get("annoyance", 0) * 0.3, "calm": text_emotions.get("neutral", 0) * 0.5 + text_emotions.get("relief", 0) * 0.3, "disgust": text_emotions.get("disgust", 0) * 0.7, "fearful": text_emotions.get("fear", 0) * 0.7 + text_emotions.get("nervousness", 0) * 0.4, "happy": text_emotions.get("joy", 0) * 0.6 + text_emotions.get("excitement", 0) * 0.3, "neutral": text_emotions.get("neutral", 0) * 0.6, "sad": text_emotions.get("sadness", 0) * 0.7 + text_emotions.get("grief", 0) * 0.2, "surprised": text_emotions.get("surprise", 0) * 0.8, } for k in voice_map: voice_map[k] = max(0, voice_map[k] + random.gauss(0, 0.02)) total = sum(voice_map.values()) if total > 0: voice_map = {k: v / total for k, v in voice_map.items()} return voice_map # ============================================================================ # EMOTION PROJECTOR # ============================================================================ FACE_TO_GOEMOTIONS_MAP = { "angry": ["anger", "annoyance"], "disgust": ["disgust", "disapproval"], "fear": ["fear", "nervousness"], "happy": ["joy", "amusement", "excitement"], "sad": ["sadness", "grief", "disappointment"], "surprise": ["surprise", "realization"], "neutral": ["neutral"], } VOICE_TO_GOEMOTIONS_MAP = { "angry": ["anger", "annoyance"], "calm": ["neutral", "relief"], "disgust": ["disgust", "disapproval"], "fearful": ["fear", "nervousness"], "happy": ["joy", "amusement", "excitement"], "neutral": ["neutral"], "sad": ["sadness", "grief", "disappointment"], "surprised": ["surprise", "realization"], } def project_to_goemotions(source_probs: dict[str, float], mapping: dict[str, list[str]]) -> np.ndarray: vec = np.zeros(NUM_GOEMOTIONS) go_idx = {label: i for i, label in enumerate(GOEMOTIONS_LABELS)} for src_label, prob in source_probs.items(): targets = mapping.get(src_label, []) if targets: weight = prob / len(targets) for t in targets: if t in go_idx: vec[go_idx[t]] += weight total = vec.sum() if total > 0: vec = vec / total return vec def movement_to_emotion_space(intent_scores: dict[str, float]) -> np.ndarray: vec = np.zeros(NUM_GOEMOTIONS) idx = {label: i for i, label in enumerate(GOEMOTIONS_LABELS)} attack = intent_scores.get("attack_intent", 0) + intent_scores.get("aggressive", 0) * 0.8 vec[idx["anger"]] += attack * 0.6; vec[idx["disgust"]] += attack * 0.2; vec[idx["annoyance"]] += attack * 0.2 p = intent_scores.get("panic", 0) vec[idx["fear"]] += p * 0.5; vec[idx["nervousness"]] += p * 0.3; vec[idx["surprise"]] += p * 0.2 e = intent_scores.get("evasion", 0) vec[idx["nervousness"]] += e * 0.6; vec[idx["fear"]] += e * 0.3 c = intent_scores.get("concealment", 0) vec[idx["nervousness"]] += c * 0.5; vec[idx["fear"]] += c * 0.2; vec[idx["neutral"]] += c * 0.3 er = intent_scores.get("erratic", 0) vec[idx["confusion"]] += er * 0.5; vec[idx["surprise"]] += er * 0.3 ag = intent_scores.get("agitated", 0) vec[idx["anger"]] += ag * 0.3; vec[idx["nervousness"]] += ag * 0.4; vec[idx["annoyance"]] += ag * 0.3 lo = intent_scores.get("loitering", 0) vec[idx["neutral"]] += lo * 0.7; vec[idx["nervousness"]] += lo * 0.3 n = intent_scores.get("normal", 0) vec[idx["neutral"]] += n * 0.8; vec[idx["approval"]] += n * 0.1; vec[idx["optimism"]] += n * 0.1 total = vec.sum() if total > 0: vec = vec / total return vec # ============================================================================ # INTENT MAPPER # ============================================================================ def map_movement_to_intents(features: MovementFeatures) -> dict[str, float]: feat_dict = features.as_dict() scores = {intent: 0.0 for intent in INTENT_LABELS} scores["normal"] = 0.5 rules = [ {"intent": "attack_intent", "conditions": {"proximity_approach": (">=", 0.6), "body_tension": (">=", 0.5), "acceleration": (">=", 0.4)}, "weight": 1.0}, {"intent": "aggressive", "conditions": {"hand_activity": (">=", 0.5), "body_tension": (">=", 0.4), "velocity": (">=", 0.3)}, "weight": 0.9}, {"intent": "concealment", "conditions": {"hand_activity": (">=", 0.4), "body_tension": (">=", 0.3), "velocity": ("<=", 0.3)}, "weight": 0.8}, {"intent": "evasion", "conditions": {"direction_changes": (">=", 0.5), "proximity_avoid": (">=", 0.4), "velocity": (">=", 0.3)}, "weight": 0.85}, {"intent": "loitering", "conditions": {"loiter_score": (">=", 0.3), "velocity": ("<=", 0.2)}, "weight": 0.7}, {"intent": "panic", "conditions": {"velocity": (">=", 0.7), "acceleration": (">=", 0.5), "gait_regularity": (">=", 0.4)}, "weight": 0.95}, {"intent": "erratic", "conditions": {"gait_regularity": (">=", 0.5), "direction_changes": (">=", 0.4)}, "weight": 0.75}, {"intent": "agitated", "conditions": {"hand_activity": (">=", 0.3), "body_tension": (">=", 0.3)}, "weight": 0.6}, ] for rule in rules: activations = [] match = True for feature, (op, threshold) in rule["conditions"].items(): value = feat_dict.get(feature, 0.0) if op == ">=" and value >= threshold: activations.append(min((value - threshold) / (1.0 - threshold + 1e-6) + 0.5, 1.0)) elif op == "<=" and value <= threshold: activations.append(min((threshold - value) / (threshold + 1e-6) + 0.5, 1.0)) else: match = False; break if match and activations: activation = min(activations) weighted = activation * rule["weight"] scores[rule["intent"]] = max(scores[rule["intent"]], weighted) scores["normal"] *= (1 - weighted * 0.5) total = sum(scores.values()) if total > 0: scores = {k: v / total for k, v in scores.items()} return scores # ============================================================================ # CONFLICT DETECTION # ============================================================================ _LEVEL_ORDER = {"absent": 0, "low": 1, "moderate": 2, "high": 3, "very_high": 4} _CONFLICT_INTERPRETATIONS = { ("face", "voice", "joy", "sadness"): "Calm face masking stress -- possible deception", ("face", "voice", "joy", "anger"): "Feigned friendliness masking hostility", ("face", "voice", "neutral", "anger"): "Suppressed anger -- covert hostility", ("face", "voice", "neutral", "fear"): "Suppressed fear -- voice reveals anxiety", ("face", "text", "joy", "fear"): "Feigned composure -- smiling but expressing fear in words", ("face", "text", "neutral", "nervousness"): "Controlled exterior, anxious interior -- concealment", ("voice", "text", "neutral", "anger"): "Controlled voice, angry words -- measured hostility", } def detect_conflicts(modality_fuzzy, modality_names, threshold=0.3): conflicts = [] opposing = [("joy", "sadness"), ("joy", "anger"), ("joy", "fear"), ("anger", "fear"), ("neutral", "anger"), ("neutral", "fear"), ("neutral", "nervousness")] for i in range(len(modality_names)): for j in range(i + 1, len(modality_names)): mod_a, mod_b = modality_names[i], modality_names[j] states_a = modality_fuzzy.get(mod_a, {}) states_b = modality_fuzzy.get(mod_b, {}) for emo_a, emo_b in opposing: memb_a = states_a.get(emo_a, {"absent": 1.0}) memb_b = states_b.get(emo_b, {"absent": 1.0}) level_a = max(memb_a, key=memb_a.get) level_b = max(memb_b, key=memb_b.get) if _LEVEL_ORDER.get(level_a, 0) >= 2 and _LEVEL_ORDER.get(level_b, 0) >= 2: severity = min(1.0, (memb_a.get(level_a, 0) + memb_b.get(level_b, 0)) / 2.0) if severity >= threshold: key = (mod_a, mod_b, emo_a, emo_b) interp = _CONFLICT_INTERPRETATIONS.get(key, f"Cross-modal disagreement: {mod_a} shows {emo_a}({level_a}) while {mod_b} shows {emo_b}({level_b})") conflicts.append(CrossModalConflict( emotion=f"{emo_a}_vs_{emo_b}", modality_a=mod_a, modality_b=mod_b, level_a=level_a, level_b=level_b, severity=severity, interpretation=interp)) conflicts.sort(key=lambda c: c.severity, reverse=True) return conflicts # ============================================================================ # RULE EVALUATOR # ============================================================================ def evaluate_rules(rules, fuzzified_states, movement_features=None): fired = [] level_order = ["absent", "low", "moderate", "high", "very_high"] for rule in rules: activations = [] for cond in rule.conditions: modality, emotion, level = cond["modality"], cond["emotion"], cond["level"] op = cond.get("op", ">=") if modality == "movement" and movement_features: memberships = fuzzify(movement_features.get(emotion, 0.0)) elif modality in fuzzified_states and emotion in fuzzified_states[modality]: memberships = fuzzified_states[modality][emotion] else: activations.append(0.0); continue target_idx = level_order.index(level) if level in level_order else 0 if op == ">=": activation = sum(memberships.get(l, 0.0) for l in level_order[target_idx:]) elif op == "<=": activation = sum(memberships.get(l, 0.0) for l in level_order[:target_idx + 1]) else: activation = memberships.get(level, 0.0) activations.append(activation) rule_activation = min(activations) if activations else 0.0 if rule_activation > 0.01: fired.append((rule, rule_activation)) return fired # ============================================================================ # THREAT ASSESSOR # ============================================================================ def assess_threat(emotion_probs, movement_result, conflict_score, fired_rules, conflicts, zone_sensitivity=1.0): aggression = sum(emotion_probs.get(e, 0) for e in SECURITY_EMOTION_CLUSTERS["aggression"]) fear = sum(emotion_probs.get(e, 0) for e in SECURITY_EMOTION_CLUSTERS["fear_anxiety"]) agitation = sum(emotion_probs.get(e, 0) for e in SECURITY_EMOTION_CLUSTERS["agitation"]) emotion_threat = min(aggression * 0.5 + fear * 0.2 + agitation * 0.3, 1.0) movement_threat = 0.0; intent_scores = {}; primary_intent = "normal"; movement_summary = {} if movement_result: high_threat = {"attack_intent": 1.0, "aggressive": 0.8, "panic": 0.7, "evasion": 0.5, "concealment": 0.5, "erratic": 0.4, "loitering": 0.2, "agitated": 0.3, "deceptive": 0.4, "normal": 0.0} movement_threat = min(sum(movement_result.intent_scores.get(i, 0) * w for i, w in high_threat.items()), 1.0) intent_scores = movement_result.intent_scores primary_intent = movement_result.dominant_intent movement_summary = {k: v for k, v in movement_result.features.as_dict().items() if v > 0.1} conflict_threat = conflict_score * 0.3 rule_threat = max(min(sum(r.threat_boost * a for r, a in fired_rules), 1.0), 0.0) if fired_rules else 0.0 raw_score = emotion_threat * 0.25 + movement_threat * 0.35 + conflict_threat * 0.15 + rule_threat * 0.25 threat_score = min(raw_score * zone_sensitivity, 1.0) if threat_score < 0.15: threat_level = ThreatLevel.NONE elif threat_score < 0.35: threat_level = ThreatLevel.LOW elif threat_score < 0.55: threat_level = ThreatLevel.ELEVATED elif threat_score < 0.75: threat_level = ThreatLevel.HIGH else: threat_level = ThreatLevel.CRITICAL if fired_rules: rule_intents = {} for rule, activation in fired_rules: rule_intents[rule.intent_signal] = max(rule_intents.get(rule.intent_signal, 0), activation) if rule_intents: ri = max(rule_intents, key=rule_intents.get) if rule_intents[ri] > 0.3 and ri != "normal": primary_intent = ri factors = [] if emotion_threat > 0.3: top_emo = max(emotion_probs, key=emotion_probs.get) factors.append(f"Elevated emotion: {top_emo} ({emotion_probs[top_emo]:.2f})") if movement_threat > 0.3 and movement_result: factors.append(f"Movement intent: {movement_result.dominant_intent} ({movement_result.confidence:.2f})") if conflict_score > 0.3: factors.append(f"Cross-modal conflict detected (score: {conflict_score:.2f})") for conflict in conflicts[:3]: factors.append(f"Conflict: {conflict.interpretation}") for rule, activation in fired_rules: if activation > 0.3: factors.append(f"Rule {rule.rule_id}: {rule.description[:80]}") actions = { ThreatLevel.NONE: "No action required. Continue standard monitoring.", ThreatLevel.LOW: "Monitor subject. Log for review.", ThreatLevel.ELEVATED: "Increase surveillance. Alert nearby personnel.", ThreatLevel.HIGH: "Dispatch security team. Prepare for intervention.", ThreatLevel.CRITICAL: "IMMEDIATE RESPONSE. All security to location. Consider lockdown.", } action = actions.get(threat_level, "Monitor.") intent_actions = { "attack_intent": " Approach with caution. Subject may be armed.", "aggressive": " De-escalation team recommended.", "concealment": " Search may be warranted.", "evasion": " Track subject. Cover exit routes.", "panic": " Assess trigger. Crowd control may be needed.", "erratic": " Medical team on standby.", "deceptive": " Structured questioning. Second officer recommended.", } if primary_intent in intent_actions and threat_level.value in ("elevated", "high", "critical"): action += intent_actions[primary_intent] sorted_emo = sorted(emotion_probs.items(), key=lambda x: -x[1]) return ThreatAssessmentResult( threat_level=threat_level, threat_score=threat_score, primary_intent=primary_intent, intent_scores=intent_scores, emotion_summary=dict(sorted_emo[:5]), movement_summary=movement_summary, conflicts=conflicts, fired_rules=[r.rule_id for r, _ in fired_rules], contributing_factors=factors, recommended_action=action) # ============================================================================ # ALERT ENGINE # ============================================================================ def generate_alert(threat, zone_id, subject_id="SUBJ-001"): level_order = [ThreatLevel.NONE, ThreatLevel.LOW, ThreatLevel.ELEVATED, ThreatLevel.HIGH, ThreatLevel.CRITICAL] if level_order.index(threat.threat_level) < level_order.index(ThreatLevel.ELEVATED): return None priority_map = {ThreatLevel.ELEVATED: AlertPriority.WARNING, ThreatLevel.HIGH: AlertPriority.URGENT, ThreatLevel.CRITICAL: AlertPriority.CRITICAL} priority = priority_map.get(threat.threat_level, AlertPriority.INFO) intent_titles = { "attack_intent": "Potential Attack Behavior Detected", "aggressive": "Aggressive Behavior Detected", "concealment": "Suspicious Concealment Activity", "evasion": "Evasive Movement Detected", "loitering": "Suspicious Loitering", "panic": "Panic Response Detected", "erratic": "Erratic Behavior Observed", "deceptive": "Deceptive Behavior Indicators", "agitated": "Agitated Subject Detected", } title = intent_titles.get(threat.primary_intent, f"Security Alert: {threat.threat_level.value.upper()}") desc_lines = [f"Threat Score: {threat.threat_score:.2f} ({threat.threat_level.value})", f"Primary Intent: {threat.primary_intent}"] if threat.emotion_summary: top = list(threat.emotion_summary.items())[:3] desc_lines.append(f"Top Emotions: {', '.join(f'{e}: {s:.2f}' for e, s in top)}") if threat.contributing_factors: desc_lines.append("Contributing Factors:") for f in threat.contributing_factors[:5]: desc_lines.append(f" - {f}") return Alert(alert_id=str(uuid.uuid4())[:8], zone_id=zone_id, subject_id=subject_id, priority=priority, threat_level=threat.threat_level, title=title, description="\n".join(desc_lines), timestamp=time.time(), contributing_signals=threat.contributing_factors[:5], recommended_action=threat.recommended_action) # ============================================================================ # FULL FUSION PIPELINE # ============================================================================ def run_security_fusion(text, movement_features, zone_type, zone_sensitivity): text_emotions = simulate_text_emotions(text) face_emotions = simulate_face_emotions(text_emotions) voice_emotions = simulate_voice_emotions(text_emotions) face_28 = project_to_goemotions(face_emotions, FACE_TO_GOEMOTIONS_MAP) voice_28 = project_to_goemotions(voice_emotions, VOICE_TO_GOEMOTIONS_MAP) text_28 = np.array([text_emotions.get(l, 0.01) for l in GOEMOTIONS_LABELS]) t = text_28.sum() if t > 0: text_28 = text_28 / t intent_scores = map_movement_to_intents(movement_features) movement_28 = movement_to_emotion_space(intent_scores) dominant_intent = max(intent_scores, key=intent_scores.get) movement_result = MovementResult(features=movement_features, intent_scores=intent_scores, dominant_intent=dominant_intent, confidence=intent_scores[dominant_intent]) weights = {"face": 0.20, "voice": 0.25, "text": 0.20, "movement": 0.35} base_crisp = weights["face"] * face_28 + weights["voice"] * voice_28 + weights["text"] * text_28 + weights["movement"] * movement_28 modality_fuzzy = {} for mod_name, vec in [("face", face_28), ("voice", voice_28), ("text", text_28), ("movement", movement_28)]: fuzzy_states = {} for i, label in enumerate(GOEMOTIONS_LABELS): fuzzy_states[label] = fuzzify(float(vec[i])) modality_fuzzy[mod_name] = fuzzy_states rules = build_security_rules(zone_type) fired_rules = evaluate_rules(rules, modality_fuzzy, movement_features.as_dict()) conflicts = detect_conflicts(modality_fuzzy, ["face", "voice", "text", "movement"]) conflict_score = min(1.0, sum(c.severity ** 2 for c in conflicts) / max(len(conflicts), 1)) if conflicts else 0.0 # Defuzzification result = base_crisp.copy() label_idx = {label: i for i, label in enumerate(GOEMOTIONS_LABELS)} for rule, activation in fired_rules: for emotion_label, target_level in rule.outputs.items(): if emotion_label in label_idx: idx = label_idx[emotion_label] target_centroid = LEVEL_CENTROIDS.get(target_level, 0.35) effective = target_centroid * activation result[idx] = (1.0 - activation) * result[idx] + activation * effective base_fuzzy = {} for i, label in enumerate(GOEMOTIONS_LABELS): base_fuzzy[label] = fuzzify(float(base_crisp[i])) for label, memberships in base_fuzzy.items(): if label in label_idx: idx = label_idx[label] numerator = sum(mu * LEVEL_CENTROIDS.get(lev, 0) for lev, mu in memberships.items() if mu > 0) denominator = sum(mu for mu in memberships.values() if mu > 0) if denominator > 0: result[idx] = 0.7 * result[idx] + 0.3 * (numerator / denominator) result = np.maximum(result, 0) total = result.sum() if total > 0: result = result / total emotion_probs = dict(zip(GOEMOTIONS_LABELS, result.tolist())) threat = assess_threat(emotion_probs, movement_result, conflict_score, fired_rules, conflicts, zone_sensitivity) fuzzy_states = [] for i, label in enumerate(GOEMOTIONS_LABELS): memberships = base_fuzzy.get(label, {"absent": 1.0}) dominant = max(memberships, key=memberships.get) fuzzy_states.append(FuzzyEmotionState(emotion=label, memberships=memberships, dominant_level=dominant, crisp_value=float(result[i]))) alert = generate_alert(threat, f"zone_{zone_type}") return { "emotion_probs": emotion_probs, "face_emotions": face_emotions, "voice_emotions": voice_emotions, "text_emotions": text_emotions, "movement_result": movement_result, "threat": threat, "fuzzy_states": fuzzy_states, "conflicts": conflicts, "conflict_score": conflict_score, "fired_rules": fired_rules, "alert": alert, "weights": weights, } # ============================================================================ # SESSION STATE # ============================================================================ _DEFAULTS = { "auth_step": "email", "auth_email": "", "auth_code": "", "auth_code_ts": 0.0, "auth_email_pending": False, "session_start": 0.0, } for key, val in _DEFAULTS.items(): if key not in st.session_state: st.session_state[key] = val # ── Background email send (runs after rerun so UI stays responsive) ── if st.session_state.get("auth_email_pending") and st.session_state["auth_step"] == "verify": st.session_state["auth_email_pending"] = False print(f"[Auth] Attempting to send code to {st.session_state['auth_email']}...") try: sent = send_verification_email(st.session_state["auth_email"], st.session_state["auth_code"]) print(f"[Auth] send_verification_email returned: sent={sent}") if sent: st.session_state["auth_email_sent"] = True else: st.session_state["auth_email_error"] = "SMTP send failed" except Exception as _exc: print(f"[Auth] send_verification_email exception: {_exc}") st.session_state["auth_email_error"] = str(_exc) # ============================================================================ # AUTH GATE # ============================================================================ def _get_logo_b64(): import base64 from pathlib import Path logo_path = Path(__file__).parent / "logo.png" if logo_path.exists(): return base64.b64encode(logo_path.read_bytes()).decode() return None def _render_header(animated=True, size=300): _logo_b64 = _get_logo_b64() if _logo_b64: cls = 'floating-logo' if animated else '' st.markdown(f"""
""", unsafe_allow_html=True) st.markdown("""

Behavioral Intelligence for Security — Multimodal Threat Assessment & Intent Recognition System:
Real-time threat detection through AI-powered analysis of facial expressions, voice patterns, text sentiment, and body movement

""", unsafe_allow_html=True) def _valid_email(email): return bool(re.match(r"^[^@\s]+@[^@\s]+\.[^@\s]+$", email)) def _show_landing(): _render_header() st.markdown("""

Request Demo Access

Enter your email to receive a one-time verification code. Each email is valid for a single 60-second trial session.

""", unsafe_allow_html=True) col_l, col_c, col_r = st.columns([1, 2, 1]) with col_c: with st.form("email_form", clear_on_submit=False): email = st.text_input("Email address", key="inp_email", placeholder="you@company.com") submitted = st.form_submit_button("Send Verification Code", use_container_width=True, type="primary") if submitted: email = email.strip().lower() if not _valid_email(email): st.error("Please enter a valid email address."); return if is_email_used(email): st.error("This email has already been used. Contact info@caitcore.com for full access."); return code = generate_code() st.session_state["auth_email"] = email st.session_state["auth_code"] = code st.session_state["auth_code_ts"] = time.time() st.session_state["auth_step"] = "verify" st.session_state["auth_email_pending"] = True st.rerun() def _show_verify(): _render_header() st.markdown(f"""

Enter Verification Code

A 6-digit code was sent to {st.session_state["auth_email"]}

""", unsafe_allow_html=True) col_l, col_c, col_r = st.columns([1, 2, 1]) with col_c: with st.form("code_form", clear_on_submit=False): code_input = st.text_input("Verification code", key="inp_code", max_chars=6, placeholder="000000") verified = st.form_submit_button("Verify & Start Demo", use_container_width=True, type="primary") if verified: elapsed = time.time() - st.session_state["auth_code_ts"] if elapsed > CODE_EXPIRY_SEC: st.error("Code expired. Please request a new one.") st.session_state["auth_step"] = "email"; st.rerun(); return if code_input.strip() != st.session_state["auth_code"]: st.error("Invalid code."); return mark_email_used(st.session_state["auth_email"]) st.session_state["session_start"] = time.time() st.session_state["auth_step"] = "active" st.rerun() if st.button("Back", use_container_width=True): st.session_state["auth_step"] = "email"; st.rerun() def _show_trial_ended(): _render_header(animated=False, size=200) st.markdown("""

Trial Complete

Your 60-second demo session has ended.

Interested in the full Orionus platform?
Contact us at info@caitcore.com for enterprise access.

Orionus provides real-time multimodal threat assessment for airports, transit hubs, stadiums, and critical infrastructure.

""", unsafe_allow_html=True) def _render_timer(): secs = remaining_seconds(st.session_state["session_start"]) if secs <= 0: st.session_state["auth_step"] = "ended"; st.rerun(); return False css = "timer-green" if secs > 30 else ("timer-yellow" if secs > 10 else "timer-red") st.markdown(f'
Trial session: {secs // 60}:{secs % 60:02d} remaining
', unsafe_allow_html=True) return True # ============================================================================ # MAIN DEMO UI # ============================================================================ def _run_demo(): if session_expired(st.session_state["session_start"]): st.session_state["auth_step"] = "ended"; st.rerun(); return _render_header() if not _render_timer(): return # Sidebar with st.sidebar: st.markdown("### Zone Configuration") zone_type = st.selectbox("Monitoring Zone", ZONE_TYPES, index=0) zone_descriptions = { "checkpoint": "Security checkpoint -- mild anxiety normal, evasion critical", "boarding": "Boarding/gate area -- watch for concealment", "transit": "Corridors -- movement patterns are key", "queue": "Queuing -- agitation from waiting is common", "retail": "Commercial -- normal social behavior expected", "restricted": "Staff-only -- ANY presence suspicious", "entrance": "Entry/exit -- reversal behavior is key signal", "gathering": "Waiting/lounge -- crowd dynamics important", "perimeter": "External boundary -- loitering and evasion", "parking": "Parking -- concealment and evasion patterns", } st.caption(zone_descriptions.get(zone_type, "")) zone_sensitivity = st.slider("Zone Sensitivity", 0.5, 2.0, 1.0, 0.1) st.markdown("---") st.markdown("### Movement Simulation") velocity = st.slider("Velocity", 0.0, 1.0, 0.15, 0.05) acceleration = st.slider("Acceleration", 0.0, 1.0, 0.1, 0.05) direction_changes = st.slider("Direction Changes", 0.0, 1.0, 0.1, 0.05) proximity_approach = st.slider("Proximity Approach", 0.0, 1.0, 0.1, 0.05) proximity_avoid = st.slider("Proximity Avoid", 0.0, 1.0, 0.1, 0.05) hand_activity = st.slider("Hand Activity", 0.0, 1.0, 0.1, 0.05) body_tension = st.slider("Body Tension", 0.0, 1.0, 0.15, 0.05) gait_regularity = st.slider("Gait Irregularity", 0.0, 1.0, 0.05, 0.05) loiter_score = st.slider("Loiter Score", 0.0, 1.0, 0.0, 0.05) crowd_interaction = st.slider("Crowd Interaction", 0.0, 1.0, 0.1, 0.05) st.markdown("---") st.markdown("### Scenario Presets") preset = st.selectbox("Load Scenario", [ "-- Custom --", "Normal Passenger", "Agitated Traveler", "Aggressive Confrontation", "Suspicious Loitering", "Evasive Subject", "Concealment Behavior", "Panic Flight", "Deceptive Social Engineering", "Erratic Behavior"]) preset_texts = { "Normal Passenger": "Everything is fine. I'm just waiting for my flight.", "Agitated Traveler": "This is ridiculous! I've been waiting two hours. Frustrated and annoyed!", "Aggressive Confrontation": "I'm going to destroy you! Get out of my way! I hate this place!", "Suspicious Loitering": "Just looking around. Nothing special. Fine. Normal.", "Evasive Subject": "I don't know what you're talking about. I need to go somewhere else now.", "Concealment Behavior": "Everything is fine, nothing to worry about. Just standing here normally.", "Panic Flight": "Help! Run! There's danger! I'm scared! We need to get out!", "Deceptive Social Engineering": "Oh you're so kind! I'm so happy! Everything is wonderful! I just need past this area...", "Erratic Behavior": "What? Where am I? I don't understand anything. Why is everything spinning?", } preset_movements = { "Normal Passenger": MovementFeatures(velocity=0.15, acceleration=0.05, body_tension=0.1, gait_regularity=0.05), "Agitated Traveler": MovementFeatures(velocity=0.3, acceleration=0.2, hand_activity=0.4, body_tension=0.4, direction_changes=0.2), "Aggressive Confrontation": MovementFeatures(velocity=0.5, acceleration=0.6, proximity_approach=0.7, hand_activity=0.7, body_tension=0.8), "Suspicious Loitering": MovementFeatures(velocity=0.05, loiter_score=0.7, direction_changes=0.15), "Evasive Subject": MovementFeatures(velocity=0.5, acceleration=0.4, direction_changes=0.7, proximity_avoid=0.6, body_tension=0.3), "Concealment Behavior": MovementFeatures(velocity=0.1, hand_activity=0.7, body_tension=0.5, proximity_avoid=0.3), "Panic Flight": MovementFeatures(velocity=0.9, acceleration=0.8, gait_regularity=0.5, crowd_interaction=0.6), "Deceptive Social Engineering": MovementFeatures(velocity=0.2, hand_activity=0.3, body_tension=0.4, proximity_approach=0.3), "Erratic Behavior": MovementFeatures(velocity=0.4, acceleration=0.5, direction_changes=0.7, gait_regularity=0.8, hand_activity=0.3), } default_text = "Enter text to analyze for emotional content and security intent..." if preset != "-- Custom --": default_text = preset_texts.get(preset, default_text) st.markdown('
Subject Communication Input
', unsafe_allow_html=True) text_input = st.text_area("Text input (simulates transcribed speech)", value=default_text, height=100, label_visibility="collapsed") if preset != "-- Custom --" and preset in preset_movements: mf = preset_movements[preset] else: mf = MovementFeatures(velocity=velocity, acceleration=acceleration, direction_changes=direction_changes, proximity_approach=proximity_approach, proximity_avoid=proximity_avoid, hand_activity=hand_activity, body_tension=body_tension, gait_regularity=gait_regularity, loiter_score=loiter_score, crowd_interaction=crowd_interaction) # Re-check timer if session_expired(st.session_state["session_start"]): st.session_state["auth_step"] = "ended"; st.rerun(); return if text_input and text_input != "Enter text to analyze for emotional content and security intent...": results = run_security_fusion(text_input, mf, zone_type, zone_sensitivity) threat = results["threat"] # === THREAT LEVEL BANNER === tc_map = {"none": "#2E7D32", "low": "#4CAF50", "elevated": "#FF9800", "high": "#f44336", "critical": "#d50000"} tc = tc_map.get(threat.threat_level.value, "#666") st.markdown(f"""
THREAT LEVEL

{threat.threat_level.value.upper()}

Threat Score

{threat.threat_score:.2f}

Primary Intent

{threat.primary_intent.upper().replace('_', ' ')}

Zone

{zone_type.upper()}

""", unsafe_allow_html=True) # Recommended action if threat.threat_level.value in ("elevated", "high", "critical"): alert_class = "critical" if threat.threat_level.value == "critical" else ("urgent" if threat.threat_level.value == "high" else "warning") ac = "#d50000" if threat.threat_level.value == "critical" else ("#f44336" if threat.threat_level.value == "high" else "#FF9800") st.markdown(f"""
RECOMMENDED ACTION:
{threat.recommended_action}
""", unsafe_allow_html=True) # Metrics c1, c2, c3, c4 = st.columns(4) with c1: st.metric("Conflict Score", f"{results['conflict_score']:.2f}") with c2: st.metric("Rules Fired", str(len(results['fired_rules']))) with c3: st.metric("Conflicts", str(len(results['conflicts']))) with c4: st.metric("Modalities", "4 (F+V+T+M)") # === TABS === tab1, tab2, tab3, tab4, tab5, tab6 = st.tabs([ "Intent Classification", "Emotion Analysis", "Fuzzy Inference", "Conflict Detection", "Movement Analysis", "Alert Log"]) with tab1: st.markdown('
Security Intent Categories (10-Class)
', unsafe_allow_html=True) intent_scores = results["movement_result"].intent_scores intent_colors = {"normal": "#4CAF50", "agitated": "#FFC107", "aggressive": "#f44336", "attack_intent": "#d50000", "concealment": "#9C27B0", "evasion": "#FF5722", "loitering": "#FF9800", "panic": "#E91E63", "deceptive": "#673AB7", "erratic": "#795548"} sorted_intents = sorted(intent_scores.items(), key=lambda x: -x[1]) fig = go.Figure(go.Bar( x=[s for _, s in sorted_intents], y=[i.replace("_", " ").title() for i, _ in sorted_intents], orientation='h', marker_color=[intent_colors.get(i, "#666") for i, _ in sorted_intents], text=[f"{s:.3f}" for _, s in sorted_intents], textposition='auto', textfont=dict(color='white', size=12))) fig.update_layout(paper_bgcolor='rgba(0,0,0,0)', plot_bgcolor='rgba(15,20,30,0.8)', font=dict(color='#e0e0e0'), height=400, margin=dict(l=150, r=20, t=30, b=30), xaxis=dict(title="Score", gridcolor='#1e293b', range=[0, max(0.5, max(s for _, s in sorted_intents) * 1.2)]), yaxis=dict(gridcolor='#1e293b')) st.plotly_chart(fig, use_container_width=True) intent_desc = {"normal": "Baseline -- no threat", "agitated": "Elevated stress / aggression building", "aggressive": "Active aggression / confrontation", "attack_intent": "Pre-attack posture / movement", "concealment": "Hiding objects / face / identity", "evasion": "Avoiding detection / cameras / security", "loitering": "Unusual lingering / surveillance", "panic": "Fleeing / mass panic", "deceptive": "Emotional masking / social engineering", "erratic": "Unpredictable / drug-influenced"} for intent, desc in intent_desc.items(): score = intent_scores.get(intent, 0) color = intent_colors.get(intent, "#666") st.markdown(f'
' f'{intent.replace("_"," ").title()}' f'
' f'
' f'{score:.3f}' f'{desc}
', unsafe_allow_html=True) with tab2: st.markdown('
Fused Emotion Distribution (28 GoEmotions)
', unsafe_allow_html=True) emotion_probs = results["emotion_probs"] sorted_emo = sorted(emotion_probs.items(), key=lambda x: -x[1])[:15] fig = go.Figure(go.Bar( x=[e for e, _ in sorted_emo], y=[s for _, s in sorted_emo], marker_color=['#00E676' if s > 0.1 else '#2E7D32' if s > 0.05 else '#1a3a1a' for _, s in sorted_emo], text=[f"{s:.3f}" for _, s in sorted_emo], textposition='auto', textfont=dict(color='white', size=10))) fig.update_layout(paper_bgcolor='rgba(0,0,0,0)', plot_bgcolor='rgba(15,20,30,0.8)', font=dict(color='#e0e0e0'), height=350, margin=dict(l=40, r=20, t=30, b=80), xaxis=dict(tickangle=-45, gridcolor='#1e293b'), yaxis=dict(title="Probability", gridcolor='#1e293b')) st.plotly_chart(fig, use_container_width=True) st.markdown('
Security Emotion Clusters
', unsafe_allow_html=True) cluster_scores = {} for cn, emotions in SECURITY_EMOTION_CLUSTERS.items(): cluster_scores[cn] = sum(emotion_probs.get(e, 0) for e in emotions) fig = go.Figure(go.Scatterpolar( r=list(cluster_scores.values()), theta=[c.replace("_", " ").title() for c in cluster_scores.keys()], fill='toself', fillcolor='rgba(0, 230, 118, 0.15)', line=dict(color='#00E676', width=2), marker=dict(color='#00E676', size=8))) fig.update_layout(polar=dict(bgcolor='rgba(15,20,30,0.8)', radialaxis=dict(visible=True, gridcolor='#1e293b', color='#64748b'), angularaxis=dict(gridcolor='#1e293b', color='#e0e0e0')), paper_bgcolor='rgba(0,0,0,0)', font=dict(color='#e0e0e0'), height=400, margin=dict(l=60, r=60, t=40, b=40), showlegend=False) st.plotly_chart(fig, use_container_width=True) with tab3: st.markdown('
Fuzzy Membership Visualization
', unsafe_allow_html=True) x_vals = np.linspace(0, 1, 200) mf_funcs = [("Absent", mf_absent, "#4CAF50"), ("Low", mf_low, "#8BC34A"), ("Moderate", mf_moderate, "#FFC107"), ("High", mf_high, "#FF9800"), ("Very High", mf_very_high, "#f44336")] fig = go.Figure() for name, func, color in mf_funcs: fig.add_trace(go.Scatter(x=x_vals, y=[func(x) for x in x_vals], mode='lines', name=name, line=dict(color=color, width=2))) fig.update_layout(title="Trapezoidal Membership Functions", paper_bgcolor='rgba(0,0,0,0)', plot_bgcolor='rgba(15,20,30,0.8)', font=dict(color='#e0e0e0'), height=300, margin=dict(l=40, r=20, t=40, b=30), xaxis=dict(title="Crisp Value", gridcolor='#1e293b'), yaxis=dict(title="Membership", gridcolor='#1e293b'), legend=dict(bgcolor='rgba(0,0,0,0.5)')) st.plotly_chart(fig, use_container_width=True) st.markdown('
Top Emotion Fuzzy States
', unsafe_allow_html=True) top_fuzzy = sorted(results["fuzzy_states"], key=lambda x: x.crisp_value, reverse=True)[:10] fuzzy_data = [] for fs in top_fuzzy: for level, mu in fs.memberships.items(): if mu > 0.01: fuzzy_data.append({"Emotion": fs.emotion, "Level": level, "Membership": mu}) if fuzzy_data: lc = {"absent": "#4CAF50", "low": "#8BC34A", "moderate": "#FFC107", "high": "#FF9800", "very_high": "#f44336"} fig = go.Figure() for level in FUZZY_LEVELS: ld = [d for d in fuzzy_data if d["Level"] == level] if ld: fig.add_trace(go.Bar(x=[d["Emotion"] for d in ld], y=[d["Membership"] for d in ld], name=level.replace("_", " ").title(), marker_color=lc.get(level, "#666"))) fig.update_layout(barmode='stack', paper_bgcolor='rgba(0,0,0,0)', plot_bgcolor='rgba(15,20,30,0.8)', font=dict(color='#e0e0e0'), height=350, margin=dict(l=40, r=20, t=30, b=80), xaxis=dict(tickangle=-45, gridcolor='#1e293b'), yaxis=dict(title="Membership", gridcolor='#1e293b'), legend=dict(bgcolor='rgba(0,0,0,0.5)')) st.plotly_chart(fig, use_container_width=True) st.markdown('
Fired Fuzzy Rules
', unsafe_allow_html=True) if results["fired_rules"]: cat_colors = {"aggression": "#f44336", "deception": "#673AB7", "panic": "#E91E63", "surveillance": "#FF5722", "zone": "#2196F3", "deescalation": "#4CAF50"} for rule, activation in results["fired_rules"]: cc = cat_colors.get(rule.category, "#888") st.markdown(f"""
{rule.rule_id} [{rule.category.upper()}] Activation: {activation:.3f} Threat: {'+' if rule.threat_boost > 0 else ''}{rule.threat_boost:.2f}
{rule.description}
""", unsafe_allow_html=True) else: st.info("No fuzzy rules fired for current input.") with tab4: st.markdown('
Cross-Modal Conflict Analysis
', unsafe_allow_html=True) st.markdown("""
Cross-modal conflicts indicate when different channels disagree about the subject's emotional state -- a key indicator of deception, concealment, or coerced behavior.
""", unsafe_allow_html=True) if results["conflicts"]: for conflict in results["conflicts"]: sc = "#4CAF50" if conflict.severity < 0.4 else ("#FF9800" if conflict.severity < 0.7 else "#f44336") st.markdown(f"""
{conflict.emotion.replace('_', ' ').title()} Severity: {conflict.severity:.2f}
{conflict.modality_a.upper()} ({conflict.level_a}) vs {conflict.modality_b.upper()} ({conflict.level_b})
{conflict.interpretation}
""", unsafe_allow_html=True) else: st.success("No significant cross-modal conflicts. Modalities are consistent.") with tab5: st.markdown('
Movement Feature Analysis
', unsafe_allow_html=True) feat_dict = results["movement_result"].features.as_dict() fig = go.Figure(go.Scatterpolar( r=list(feat_dict.values()), theta=[f.replace("_", " ").title() for f in feat_dict.keys()], fill='toself', fillcolor='rgba(0, 230, 118, 0.15)', line=dict(color='#00E676', width=2), marker=dict(color='#00E676', size=6))) fig.update_layout(polar=dict(bgcolor='rgba(15,20,30,0.8)', radialaxis=dict(visible=True, range=[0, 1], gridcolor='#1e293b', color='#64748b'), angularaxis=dict(gridcolor='#1e293b', color='#e0e0e0')), paper_bgcolor='rgba(0,0,0,0)', font=dict(color='#e0e0e0'), height=450, margin=dict(l=80, r=80, t=40, b=40), showlegend=False) st.plotly_chart(fig, use_container_width=True) feat_desc = { "velocity": "Overall movement speed", "acceleration": "Sudden speed changes", "direction_changes": "Frequency of path alterations", "proximity_approach": "Approach toward targets", "proximity_avoid": "Avoidance of security", "hand_activity": "Hand movement intensity", "body_tension": "Muscular rigidity", "gait_regularity": "Walking irregularity", "loiter_score": "Purposeless lingering", "crowd_interaction": "Movement vs crowd flow"} for feat, val in feat_dict.items(): color = "#4CAF50" if val < 0.3 else ("#FF9800" if val < 0.6 else "#f44336") st.markdown(f'
' f'{feat.replace("_"," ").title()}' f'
' f'
' f'{val:.2f}' f'{feat_desc.get(feat,"")}
', unsafe_allow_html=True) with tab6: st.markdown('
Security Alert Generation
', unsafe_allow_html=True) alert = results["alert"] if alert: pc_map = {"info": "#2196F3", "warning": "#FF9800", "urgent": "#f44336", "critical": "#d50000"} pc = pc_map.get(alert.priority.value, "#666") st.markdown(f"""
{alert.priority.value.upper()} {alert.title}
ID: {alert.alert_id}
{alert.description}
Recommended Action: {alert.recommended_action}
Zone: {alert.zone_id} | Subject: {alert.subject_id}
""", unsafe_allow_html=True) if alert.contributing_signals: st.markdown('
Contributing Signals
', unsafe_allow_html=True) for signal in alert.contributing_signals: st.markdown(f'
{signal}
', unsafe_allow_html=True) else: st.success("No alert generated. Threat level below ELEVATED threshold.") if threat.contributing_factors: st.markdown('
Contributing Factors Summary
', unsafe_allow_html=True) for factor in threat.contributing_factors: st.markdown(f'
{factor}
', unsafe_allow_html=True) else: st.markdown("""

Enter text or select a preset to begin analysis

Orionus AI performs multimodal behavioral intelligence fusion using:

28
GoEmotions Labels
10
Intent Categories
5
Threat Levels
4
Modalities Fused
16+
Fuzzy Rules
""", unsafe_allow_html=True) # Footer st.markdown("---") st.markdown("""
ORIONUS AI — Behavioral Intelligence for Security
Mamdani Fuzzy Inference • Multimodal Fusion • Real-Time Threat Assessment
Demo Mode — Text simulates multimodal analysis. Production uses live camera, microphone, and pose tracking.
""", unsafe_allow_html=True) # Auto-refresh timer time.sleep(0.1) st.rerun() # ============================================================================ # MAIN ROUTING # ============================================================================ def main(): step = st.session_state["auth_step"] if step == "active" and session_expired(st.session_state["session_start"]): st.session_state["auth_step"] = "ended" step = "ended" if step == "email": _show_landing() elif step == "verify": _show_verify() elif step == "active": _run_demo() elif step == "ended": _show_trial_ended() else: st.session_state["auth_step"] = "email"; st.rerun() if __name__ == "__main__": main()