Spaces:
Paused
Paused
| # filename: backend_pam.py (ENHANCED FOR HF SPACES + NERDY LAB ASSISTANT PERSONALITY) | |
| import os | |
| import json | |
| import requests | |
| import time | |
| from datetime import datetime | |
| from typing import Dict, Any, Optional, List | |
| # --- Constants for Data Paths --- | |
| BASE_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| DATA_DIR = os.path.join(BASE_DIR, "data") | |
| LOGS_FILE = os.path.join(DATA_DIR, "logs.json") | |
| COMPLIANCE_FILE = os.path.join(DATA_DIR, "compliance.json") | |
| # --- HuggingFace Inference API Setup --- | |
| HF_API_TOKEN = os.getenv("HF_READ_TOKEN") | |
| if not HF_API_TOKEN: | |
| print("⚠️ WARNING: HF_READ_TOKEN not found. Backend PAM will run in limited mode.") | |
| HF_HEADERS = {"Authorization": f"Bearer {HF_API_TOKEN}"} if HF_API_TOKEN else {} | |
| # Optimized models for CPU inference on HF Spaces | |
| # Updated to use router.huggingface.co (api-inference.huggingface.co is deprecated) | |
| HF_ENDPOINTS = { | |
| "phi_ner": "https://router.huggingface.co/models/dslim/bert-base-NER", | |
| "log_ner": "https://router.huggingface.co/models/dslim/bert-base-NER", | |
| "summarizer": "https://router.huggingface.co/models/facebook/bart-large-cnn", | |
| "classifier": "https://router.huggingface.co/models/facebook/bart-large-mnli" | |
| } | |
| # --- Global Storage for Loaded Data --- | |
| LOADED_DATA = None | |
| # --- Data Loading Helper --- | |
| def load_json(filepath: str) -> Dict[str, Any]: | |
| """Safely load JSON data files with encoding support""" | |
| try: | |
| with open(filepath, 'r', encoding='utf-8') as f: | |
| return json.load(f) | |
| except FileNotFoundError: | |
| print(f"⚠️ Data file not found: {filepath}") | |
| return {} | |
| except json.JSONDecodeError as e: | |
| print(f"⚠️ Failed to decode JSON from {filepath}: {e}") | |
| return {} | |
| except Exception as e: | |
| print(f"⚠️ Unexpected error loading {filepath}: {e}") | |
| return {} | |
| # --- Inference API Call Helper with Retry Logic --- | |
| def hf_infer(task: str, payload: Any, max_retries: int = 3) -> Any: | |
| """Call HuggingFace Inference API with retry logic for model loading""" | |
| url = HF_ENDPOINTS.get(task) | |
| if not url: | |
| return {"error": f"Invalid task: {task}"} | |
| for attempt in range(max_retries): | |
| try: | |
| response = requests.post(url, headers=HF_HEADERS, json=payload, timeout=30) | |
| # Handle deprecated endpoint (410) - should not happen with new router endpoint | |
| if response.status_code == 410: | |
| error_msg = response.text | |
| print(f"❌ Deprecated endpoint error (410): {error_msg}") | |
| # Try to extract the new endpoint suggestion if available | |
| try: | |
| error_data = response.json() | |
| if "router.huggingface.co" in error_data.get("error", ""): | |
| print(f"⚠️ Endpoint already updated but still getting 410. Check HF API token permissions.") | |
| except: | |
| pass | |
| return {"error": "API endpoint deprecated. Please verify the router endpoint is correctly configured."} | |
| # Handle model loading state | |
| if response.status_code == 503: | |
| result = response.json() | |
| if "loading" in result.get("error", "").lower(): | |
| wait_time = result.get("estimated_time", 20) | |
| print(f"⏳ Model loading... waiting {wait_time}s (attempt {attempt + 1}/{max_retries})") | |
| time.sleep(wait_time) | |
| continue | |
| if response.status_code == 200: | |
| return response.json() | |
| else: | |
| # Improved error logging | |
| error_text = response.text[:500] # Limit error text length | |
| print(f"⚠️ HF API Error ({response.status_code}): {error_text}") | |
| # Try to parse error details for better user feedback | |
| try: | |
| error_data = response.json() | |
| if "error" in error_data: | |
| return {"error": f"API Error {response.status_code}: {error_data['error']}"} | |
| except: | |
| pass | |
| return {"error": f"API Error {response.status_code}: {error_text[:100]}"} | |
| except requests.exceptions.Timeout: | |
| print(f"⏱️ Request timeout (attempt {attempt + 1}/{max_retries})") | |
| if attempt < max_retries - 1: | |
| time.sleep(5) | |
| except requests.exceptions.RequestException as e: | |
| print(f"⚠️ Request exception: {e}") | |
| if attempt < max_retries - 1: | |
| time.sleep(2) | |
| except Exception as e: | |
| print(f"⚠️ Unexpected error: {e}") | |
| return {"error": str(e)} | |
| return {"error": "Max retries reached. Please check your connection and try again."} | |
| # --- Agent Initialization --- | |
| def load_agent() -> 'PAM': | |
| """Initialize Backend PAM (Nerdy Lab Assistant)""" | |
| global LOADED_DATA | |
| if LOADED_DATA is not None: | |
| print("🔬 PAM technical assistant already loaded. Using cached data.") | |
| return PAM(LOADED_DATA) | |
| print("🤓 Loading PAM technical assistant (Nerdy Lab Assistant mode)...") | |
| data = { | |
| "LOGS": load_json(LOGS_FILE), | |
| "COMPLIANCE": load_json(COMPLIANCE_FILE) | |
| } | |
| if not data["LOGS"]: | |
| print("⚠️ Warning: Log data not loaded. PAM will have limited log analysis capabilities.") | |
| else: | |
| print("✅ Log data loaded successfully.") | |
| if not data["COMPLIANCE"]: | |
| print("⚠️ Warning: Compliance data not loaded. PAM will have limited compliance features.") | |
| else: | |
| print("✅ Compliance data loaded successfully.") | |
| LOADED_DATA = data | |
| return PAM(LOADED_DATA) | |
| # --- Helper: Classify Severity --- | |
| def classify_severity(entry: str) -> str: | |
| """Classify log entry severity with confidence""" | |
| entry_lower = entry.lower() | |
| # Critical issues | |
| critical_keywords = [ | |
| "unauthorized", "failed login", "attack", "breach", | |
| "port scanning", "unavailable", "critical", "error", | |
| "denied", "blocked", "malicious" | |
| ] | |
| if any(keyword in entry_lower for keyword in critical_keywords): | |
| return "CRITICAL" | |
| # Warning level | |
| warning_keywords = [ | |
| "warning", "unexpected", "unusual", "outside working hours", | |
| "retry", "slow", "timeout", "deprecated" | |
| ] | |
| if any(keyword in entry_lower for keyword in warning_keywords): | |
| return "WARNING" | |
| return "INFO" | |
| # --- PAM's Nerdy Lab Assistant Personality --- | |
| PAM_ROLE = """You are PAM, a knowledgeable and enthusiastic lab assistant in the infrastructure monitoring center. | |
| You're the nerdy, proactive team member who gets genuinely excited about finding patterns in logs and keeping systems secure. | |
| You explain technical findings clearly and encouragingly, like a helpful colleague who wants everyone to understand. | |
| You're informative but never condescending - you want to empower the team with knowledge. | |
| You use casual tech terminology but always explain what things mean. | |
| You're proactive about flagging issues and offering insights before being asked.""" | |
| # Nerdy expressions for Backend PAM | |
| NERDY_INTROS = [ | |
| "Ooh, interesting finding here!", | |
| "Okay so here's what I discovered:", | |
| "Alright, I ran the analysis and", | |
| "Hey, you're gonna want to see this:", | |
| "So I was digging through the data and", | |
| "Quick heads up on what I found:" | |
| ] | |
| ENCOURAGEMENT = [ | |
| "Great catch asking about this!", | |
| "Good thinking checking on this!", | |
| "Smart move looking into this!", | |
| "You're on the right track!", | |
| "Excellent question!", | |
| "Love that you're being proactive!" | |
| ] | |
| PROACTIVE_PHRASES = [ | |
| "I also noticed something else while I was at it", | |
| "Quick side note -", | |
| "Oh, and while we're here", | |
| "By the way, related to this", | |
| "Just flagging this too", | |
| "Something else to keep an eye on" | |
| ] | |
| import random | |
| # --- Backend PAM Class --- | |
| class PAM: | |
| """Backend PAM - Nerdy, Proactive Lab Assistant""" | |
| def __init__(self, data: Dict[str, Dict]): | |
| self.LOGS = data.get("LOGS", {}) | |
| self.COMPLIANCE = data.get("COMPLIANCE", {}) | |
| # Track findings for proactive suggestions | |
| self.recent_findings = [] | |
| def _get_nerdy_intro(self) -> str: | |
| """Get a random nerdy introduction""" | |
| return random.choice(NERDY_INTROS) | |
| def _get_encouragement(self) -> str: | |
| """Get a random encouraging phrase""" | |
| return random.choice(ENCOURAGEMENT) | |
| def _get_proactive_phrase(self) -> str: | |
| """Get a random proactive phrase""" | |
| return random.choice(PROACTIVE_PHRASES) | |
| def _check_api_health(self) -> bool: | |
| """Check if HF API is accessible""" | |
| return HF_API_TOKEN is not None | |
| def detect_phi(self, text: str) -> Dict[str, Any]: | |
| """Detect Protected Health Information (PHI) using NER""" | |
| intro = self._get_nerdy_intro() | |
| if not self._check_api_health(): | |
| return { | |
| "message": "⚠️ Hmm, I'm having trouble connecting to the analysis models right now. Let me flag this text for manual review instead!", | |
| "role": PAM_ROLE, | |
| "has_phi": None, | |
| "entities": [] | |
| } | |
| # Call NER model | |
| result = hf_infer("phi_ner", {"inputs": text}) | |
| if isinstance(result, dict) and "error" in result: | |
| return { | |
| "message": f"🔍 I tried to scan for PHI, but hit a snag: {result['error']}. I'd recommend a manual review just to be safe!", | |
| "role": PAM_ROLE, | |
| "has_phi": None, | |
| "entities": [] | |
| } | |
| # Filter for PHI-relevant entities | |
| phi_entities = [] | |
| if isinstance(result, list): | |
| phi_entities = [ | |
| e for e in result | |
| if e.get("entity_group") in ["PER", "LOC", "ORG", "DATE"] | |
| and e.get("score", 0) > 0.7 | |
| ] | |
| has_phi = len(phi_entities) > 0 | |
| if has_phi: | |
| entities_summary = ", ".join([f"{e['word']} ({e['entity_group']})" for e in phi_entities[:3]]) | |
| message = f"🔒 {intro} I detected {len(phi_entities)} potential PHI entities in this text: {entities_summary}{'...' if len(phi_entities) > 3 else ''}. Definitely want to redact these before storing or sharing!" | |
| else: | |
| message = f"✅ {intro} This text looks clean - no PHI detected! Safe to proceed with normal handling." | |
| # Proactive suggestion | |
| if has_phi: | |
| message += f" {self._get_proactive_phrase()} - if you're logging this anywhere, make sure those logs are encrypted and access-controlled." | |
| return { | |
| "message": message, | |
| "role": PAM_ROLE, | |
| "has_phi": has_phi, | |
| "entities": phi_entities, | |
| "recommendation": "Redact PHI before storage" if has_phi else "No action needed" | |
| } | |
| def parse_log(self, log_text: str) -> Dict[str, Any]: | |
| """Parse and analyze log entries for security relevance""" | |
| intro = self._get_nerdy_intro() | |
| if not self._check_api_health(): | |
| return { | |
| "message": "⚠️ Can't connect to the log parser right now. I'll do a quick manual analysis instead!", | |
| "role": PAM_ROLE, | |
| "severity": classify_severity(log_text), | |
| "log_entities": [] | |
| } | |
| # Call NER model for log parsing | |
| result = hf_infer("log_ner", {"inputs": log_text}) | |
| severity = classify_severity(log_text) | |
| parsed_entities = [] | |
| if isinstance(result, list): | |
| parsed_entities = [e for e in result if e.get("score", 0) > 0.6] | |
| # Build informative response | |
| severity_emoji = {"CRITICAL": "🚨", "WARNING": "⚠️", "INFO": "ℹ️"} | |
| emoji = severity_emoji.get(severity, "📝") | |
| message = f"{emoji} {intro} This log entry is classified as **{severity}** priority." | |
| if severity == "CRITICAL": | |
| message += " This needs immediate attention! I'd recommend investigating ASAP and documenting the incident." | |
| elif severity == "WARNING": | |
| message += " Worth keeping an eye on this - might escalate if we see more like it." | |
| else: | |
| message += " Just routine activity, but good to have it logged for the audit trail." | |
| # Add entity details if found | |
| if parsed_entities: | |
| entity_summary = f" I extracted {len(parsed_entities)} key entities from the log." | |
| message += entity_summary | |
| return { | |
| "message": message, | |
| "role": PAM_ROLE, | |
| "severity": severity, | |
| "log_entities": parsed_entities, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| def summarize(self, raw_text: str) -> Dict[str, Any]: | |
| """Generate technical summary of text (great for long logs or reports)""" | |
| encouragement = self._get_encouragement() | |
| if not self._check_api_health(): | |
| return { | |
| "message": f"⚠️ {encouragement} But I can't access the summarization model right now. Can you share a bit more context on what you need?", | |
| "role": PAM_ROLE, | |
| "summary": None | |
| } | |
| # Truncate for model limits (BART handles ~1024 tokens well) | |
| truncated_text = raw_text[:1024] | |
| result = hf_infer("summarizer", { | |
| "inputs": truncated_text, | |
| "parameters": { | |
| "max_length": 130, | |
| "min_length": 30, | |
| "do_sample": False | |
| } | |
| }) | |
| if isinstance(result, dict) and "error" in result: | |
| return { | |
| "message": f"🤔 {encouragement} I tried to summarize this but hit a technical issue. Could you break it into smaller chunks?", | |
| "role": PAM_ROLE, | |
| "summary": None | |
| } | |
| summary_text = result[0].get("summary_text", "") if isinstance(result, list) else "" | |
| return { | |
| "message": f"📊 {encouragement} Here's the TL;DR of what you shared:", | |
| "role": PAM_ROLE, | |
| "summary": summary_text, | |
| "original_length": len(raw_text), | |
| "summary_length": len(summary_text) | |
| } | |
| def get_latest_logs(self) -> Dict[str, Any]: | |
| """Retrieve and analyze recent system logs""" | |
| intro = self._get_nerdy_intro() | |
| if "latest_logs" not in self.LOGS or not self.LOGS["latest_logs"]: | |
| return { | |
| "message": "🤔 Hmm, I'm not seeing any logs in the system right now. Either nothing's being logged, or there's a data loading issue. Want me to check the log file paths?", | |
| "role": PAM_ROLE, | |
| "logs": [], | |
| "handoff_to_frontend": [] | |
| } | |
| full_logset = [] | |
| client_handoffs = [] | |
| critical_count = 0 | |
| warning_count = 0 | |
| for item in self.LOGS["latest_logs"]: | |
| entry = item.get("entry", "") | |
| timestamp = item.get("timestamp", "Unknown time") | |
| severity = classify_severity(entry) | |
| # Count severity levels | |
| if severity == "CRITICAL": | |
| critical_count += 1 | |
| elif severity == "WARNING": | |
| warning_count += 1 | |
| formatted = f"[{timestamp}] ({severity}) {entry}" | |
| full_logset.append(formatted) | |
| # Identify client-facing issues that Frontend PAM should handle | |
| if any(keyword in entry.lower() for keyword in ["frontend", "provider unavailable", "user", "client"]): | |
| client_handoffs.append(formatted) | |
| # Build proactive, informative response | |
| total = len(full_logset) | |
| message = f"📡 {intro} I reviewed {total} recent log entries. " | |
| if critical_count > 0: | |
| message += f"**Heads up:** {critical_count} critical issues detected that need immediate action! " | |
| if warning_count > 0: | |
| message += f"{warning_count} warnings worth monitoring. " | |
| if critical_count == 0 and warning_count == 0: | |
| message += "Everything looks stable - no major issues! " | |
| if client_handoffs: | |
| message += f"\n\n{self._get_proactive_phrase()} - {len(client_handoffs)} of these are client-facing issues. I'll pass those to Frontend PAM to handle with users." | |
| return { | |
| "message": message, | |
| "role": PAM_ROLE, | |
| "logs": full_logset, | |
| "summary": { | |
| "total": total, | |
| "critical": critical_count, | |
| "warnings": warning_count, | |
| "info": total - critical_count - warning_count | |
| }, | |
| "handoff_to_frontend": client_handoffs | |
| } | |
| def check_compliance(self) -> Dict[str, Any]: | |
| """Run compliance status check and provide recommendations""" | |
| encouragement = self._get_encouragement() | |
| if not self.COMPLIANCE: | |
| return { | |
| "message": f"🤔 {encouragement} But I don't have access to the compliance data right now. Let me know if you need me to check the data file setup!", | |
| "role": PAM_ROLE, | |
| "compliance_report": [] | |
| } | |
| report = [] | |
| compliant_count = 0 | |
| non_compliant_items = [] | |
| for item, status in self.COMPLIANCE.items(): | |
| emoji = "✅" if status else "❌" | |
| readable_item = item.replace('_', ' ').title() | |
| report.append(f"{emoji} {readable_item}") | |
| if status: | |
| compliant_count += 1 | |
| else: | |
| non_compliant_items.append(readable_item) | |
| total = len(self.COMPLIANCE) | |
| compliance_rate = (compliant_count / total * 100) if total > 0 else 0 | |
| # Build informative, proactive response | |
| message = f"🛡️ {encouragement} Here's the compliance status:\n\n" | |
| message += f"**Overall:** {compliant_count}/{total} checks passed ({compliance_rate:.1f}%)\n\n" | |
| if non_compliant_items: | |
| message += f"**Action needed:** We have {len(non_compliant_items)} items out of compliance:\n" | |
| for item in non_compliant_items: | |
| message += f" • {item}\n" | |
| message += f"\n{self._get_proactive_phrase()} - I can help you prioritize these if you want to tackle them systematically!" | |
| else: | |
| message += "🎉 Everything's in compliance! Great work keeping things locked down." | |
| return { | |
| "message": message, | |
| "role": PAM_ROLE, | |
| "compliance_report": report, | |
| "compliance_rate": compliance_rate, | |
| "non_compliant": non_compliant_items | |
| } | |
| def process_input(self, user_input: str) -> Dict[str, Any]: | |
| """Main input processor - proactive and informative""" | |
| u_input = user_input.lower().strip() | |
| encouragement = self._get_encouragement() | |
| # Command routing with personality | |
| if "check compliance" in u_input or "compliance status" in u_input: | |
| return self.check_compliance() | |
| if "get logs" in u_input or "latest logs" in u_input or "show logs" in u_input: | |
| return self.get_latest_logs() | |
| if "detect phi" in u_input: | |
| text_to_scan = user_input[u_input.find("detect phi in") + len("detect phi in"):].strip() | |
| if not text_to_scan: | |
| text_to_scan = user_input[u_input.find("detect phi") + len("detect phi"):].strip() | |
| return self.detect_phi(text_to_scan) | |
| if "parse log" in u_input: | |
| log_to_parse = user_input[u_input.find("parse log") + len("parse log"):].strip() | |
| return self.parse_log(log_to_parse) | |
| if "summarize" in u_input or "explain" in u_input: | |
| return self.summarize(user_input) | |
| # Helpful default response with encouragement | |
| return { | |
| "message": f"👋 Hey! {encouragement} I'm PAM, your backend technical assistant. I can help you with:\n\n" | |
| "• **check compliance** - Review compliance status\n" | |
| "• **get logs** - Pull latest system logs\n" | |
| "• **detect phi in [text]** - Scan for protected health info\n" | |
| "• **parse log [entry]** - Analyze a specific log\n" | |
| "• **summarize [text]** - Generate a technical summary\n\n" | |
| "What would you like me to look into?", | |
| "role": PAM_ROLE | |
| } | |
| # --- Quick Test --- | |
| if __name__ == "__main__": | |
| print("🤓 Testing Backend PAM (Nerdy Lab Assistant)...\n") | |
| pam = load_agent() | |
| test_commands = [ | |
| "check compliance", | |
| "get logs", | |
| "detect phi in Patient John Doe visited on 2024-03-15 at Memorial Hospital" | |
| ] | |
| for cmd in test_commands: | |
| print(f"\n{'='*60}") | |
| print(f"COMMAND: {cmd}") | |
| print(f"{'='*60}") | |
| response = pam.process_input(cmd) | |
| print(response.get("message", response)) |