Spaces:
Paused
Paused
| # filename: backend_pam.py (UPDATED FOR INFERENCE API) | |
| from transformers import pipeline, HuggingFaceHub | |
| from datetime import datetime | |
| from typing import Dict, Any, Optional | |
| import json | |
| import os | |
| # --- Constants for Data Paths --- | |
| BASE_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| DATA_DIR = os.path.join(BASE_DIR, "data") | |
| LOGS_FILE = os.path.join(DATA_DIR, "logs.json") | |
| COMPLIANCE_FILE = os.path.join(DATA_DIR, "compliance.json") | |
| # --- Global Storage for Loaded Components --- | |
| LOADED_MODELS = None | |
| LOADED_DATA = None | |
| # --- Data Loading Helper --- | |
| def load_json(filepath: str) -> Dict[str, Any]: | |
| try: | |
| with open(filepath, 'r') as f: | |
| return json.load(f) | |
| except FileNotFoundError: | |
| print(f"CRITICAL: Data file not found at: {filepath}") | |
| return {} | |
| except json.JSONDecodeError as e: | |
| print(f"CRITICAL: Failed to decode JSON from {filepath}: {e}") | |
| return {} | |
| # --- Agent Initialization --- | |
| def load_agent() -> 'PAM': | |
| global LOADED_MODELS, LOADED_DATA | |
| if LOADED_MODELS is not None: | |
| print("PAM agent already loaded. Skipping reinit.") | |
| return PAM(LOADED_MODELS, LOADED_DATA) | |
| print("Loading PAM technical assistant models from Hugging Face Inference API...") | |
| HUGGING_FACE_TOKEN = os.getenv("HF_READ_TOKEN") | |
| if not HUGGING_FACE_TOKEN: | |
| print("FATAL: HF_READ_TOKEN not set in environment. PAM will fail to load.") | |
| try: | |
| models = { | |
| "phi_ner": pipeline( | |
| "ner", | |
| model=HuggingFaceHub(repo_id="Jean-Baptiste/roberta-large-ner-english", token=HUGGING_FACE_TOKEN), | |
| aggregation_strategy="simple" | |
| ), | |
| "log_ner": pipeline( | |
| "ner", | |
| model=HuggingFaceHub(repo_id="dslim/bert-base-NER", token=HUGGING_FACE_TOKEN), | |
| aggregation_strategy="simple" | |
| ), | |
| "summarizer": pipeline( | |
| "summarization", | |
| model=HuggingFaceHub(repo_id="google/flan-t5-large", token=HUGGING_FACE_TOKEN) | |
| ) | |
| } | |
| print("β All PAM models loaded via Hugging Face Inference API.") | |
| LOADED_MODELS = models | |
| except Exception as e: | |
| print(f"FATAL: Could not load inference models. {e}") | |
| LOADED_MODELS = None | |
| data = { | |
| "LOGS": load_json(LOGS_FILE), | |
| "COMPLIANCE": load_json(COMPLIANCE_FILE) | |
| } | |
| if not data["LOGS"] or not data["COMPLIANCE"]: | |
| print("β WARNING: Log or compliance data failed to load.") | |
| else: | |
| print("β Log & compliance data loaded.") | |
| LOADED_DATA = data | |
| return PAM(LOADED_MODELS, LOADED_DATA) | |
| # --- Helper: classify severity --- | |
| def classify_severity(entry: str) -> str: | |
| entry_lower = entry.lower() | |
| if any(x in entry_lower for x in ["unauthorized", "failed", "attack", "port scanning", "unavailable"]): | |
| return "CRITICAL" | |
| elif any(x in entry_lower for x in ["warning", "unexpected", "outside working hours"]): | |
| return "WARNING" | |
| else: | |
| return "INFO" | |
| # --- PAM Role --- | |
| PAM_ROLE = ( | |
| "I am PAM, your technical assistant and infrastructure watchdog. " | |
| "I summarize logs, detect risks, and support developers with clarity. " | |
| "I flag anomalies, monitor compliance, and hand off client-facing issues when needed. " | |
| "I never act on my own, always permission first, protocol always." | |
| ) | |
| # --- Backend PAM Class --- | |
| class PAM: | |
| def __init__(self, models: Optional[Dict[str, Any]], data: Dict[str, Dict]): | |
| self.phi_detector = models.get("phi_ner") if models else None | |
| self.log_parser = models.get("log_ner") if models else None | |
| self.summarizer = models.get("summarizer") if models else None | |
| self.LOGS = data.get("LOGS", {}) | |
| self.COMPLIANCE = data.get("COMPLIANCE", {}) | |
| def _check_activation(self, text: str) -> Optional[str]: | |
| if not self.phi_detector: | |
| return "Fatal Error: PAM models failed to load on startup. π οΈ" | |
| return None | |
| def detect_phi(self, text: str) -> Dict[str, Any]: | |
| error = self._check_activation(text) | |
| if error: | |
| return {"message": error, "role": PAM_ROLE} | |
| entities = self.phi_detector(text) | |
| phi = [e for e in entities if e["entity_group"] in ["PER", "LOC", "ORG", "DATE"]] | |
| return { | |
| "message": "π Scanning for PHI...", | |
| "role": PAM_ROLE, | |
| "has_phi": len(phi) > 0, | |
| "entities": phi | |
| } | |
| def parse_log(self, log_text: str) -> Dict[str, Any]: | |
| error = self._check_activation(log_text) | |
| if error: | |
| return {"message": error, "role": PAM_ROLE} | |
| parsed = self.log_parser(log_text) | |
| return { | |
| "message": "π΅π½ββοΈ Parsing log entry...", | |
| "role": PAM_ROLE, | |
| "log_entities": parsed | |
| } | |
| def summarize(self, raw_text: str) -> Dict[str, Any]: | |
| error = self._check_activation(raw_text) | |
| if error: | |
| return {"message": error, "role": PAM_ROLE} | |
| result = self.summarizer(raw_text[:1024], max_length=150, min_length=30, do_sample=False) | |
| return { | |
| "message": "π Summary generated:", | |
| "role": PAM_ROLE, | |
| "summary": result[0]["summary_text"] | |
| } | |
| def get_latest_logs(self) -> Dict[str, Any]: | |
| if "latest_logs" not in self.LOGS: | |
| return {"message": "No logs available. Check JSON file location and contents.", "role": PAM_ROLE} | |
| full_logset = [] | |
| client_handoffs = [] | |
| for item in self.LOGS["latest_logs"]: | |
| entry = item.get("entry", "") | |
| time = item.get("timestamp", "Unknown time") | |
| severity = classify_severity(entry) | |
| formatted = f"[{time}] ({severity}) -> {entry}" | |
| full_logset.append(formatted) | |
| if "frontend" in entry.lower() or "provider unavailable" in entry.lower(): | |
| client_handoffs.append(formatted) | |
| return { | |
| "message": "π‘ Infrastructure Log Review:", | |
| "role": PAM_ROLE, | |
| "logs": full_logset, | |
| "handoff_to_frontend": client_handoffs | |
| } | |
| def check_compliance(self) -> Dict[str, Any]: | |
| report = [] | |
| for item, status in self.COMPLIANCE.items(): | |
| emoji = "β " if status else "β" | |
| report.append(f"{item.replace('_', ' ').title()}: {emoji}") | |
| return { | |
| "message": "π‘οΈ Compliance Status Overview:", | |
| "role": PAM_ROLE, | |
| "compliance_report": report | |
| } | |
| def process_input(self, user_input: str) -> Dict[str, Any]: | |
| u_input = user_input.lower().strip() | |
| if "check compliance" in u_input: | |
| return self.check_compliance() | |
| if "get logs" in u_input or "latest logs" in u_input: | |
| return self.get_latest_logs() | |
| if "detect phi in" in u_input: | |
| text_to_scan = user_input[u_input.find("detect phi in") + len("detect phi in"):].strip() | |
| return self.detect_phi(text_to_scan) | |
| if "parse log" in u_input: | |
| log_to_parse = user_input[u_input.find("parse log") + len("parse log"):].strip() | |
| return self.parse_log(log_to_parse) | |
| if "summarize" in u_input or "explain" in u_input: | |
| return self.summarize(user_input) | |
| return { | |
| "response": f"Hello! I am PAM. I can process your request: '{user_input}'. Try commands like 'check compliance', 'get logs', 'detect phi in [text]', or 'parse log [log text]'.", | |
| "role": PAM_ROLE | |
| } |