PAM-UmiNur / backend_pam.py
pythonprincess's picture
Upload 6 files
9423397 verified
raw
history blame
7.83 kB
# filename: backend_pam.py (UPDATED FOR INFERENCE API)
from transformers import pipeline, HuggingFaceHub
from datetime import datetime
from typing import Dict, Any, Optional
import json
import os
# --- Constants for Data Paths ---
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
DATA_DIR = os.path.join(BASE_DIR, "data")
LOGS_FILE = os.path.join(DATA_DIR, "logs.json")
COMPLIANCE_FILE = os.path.join(DATA_DIR, "compliance.json")
# --- Global Storage for Loaded Components ---
LOADED_MODELS = None
LOADED_DATA = None
# --- Data Loading Helper ---
def load_json(filepath: str) -> Dict[str, Any]:
try:
with open(filepath, 'r') as f:
return json.load(f)
except FileNotFoundError:
print(f"CRITICAL: Data file not found at: {filepath}")
return {}
except json.JSONDecodeError as e:
print(f"CRITICAL: Failed to decode JSON from {filepath}: {e}")
return {}
# --- Agent Initialization ---
def load_agent() -> 'PAM':
global LOADED_MODELS, LOADED_DATA
if LOADED_MODELS is not None:
print("PAM agent already loaded. Skipping reinit.")
return PAM(LOADED_MODELS, LOADED_DATA)
print("Loading PAM technical assistant models from Hugging Face Inference API...")
HUGGING_FACE_TOKEN = os.getenv("HF_READ_TOKEN")
if not HUGGING_FACE_TOKEN:
print("FATAL: HF_READ_TOKEN not set in environment. PAM will fail to load.")
try:
models = {
"phi_ner": pipeline(
"ner",
model=HuggingFaceHub(repo_id="Jean-Baptiste/roberta-large-ner-english", token=HUGGING_FACE_TOKEN),
aggregation_strategy="simple"
),
"log_ner": pipeline(
"ner",
model=HuggingFaceHub(repo_id="dslim/bert-base-NER", token=HUGGING_FACE_TOKEN),
aggregation_strategy="simple"
),
"summarizer": pipeline(
"summarization",
model=HuggingFaceHub(repo_id="google/flan-t5-large", token=HUGGING_FACE_TOKEN)
)
}
print("βœ… All PAM models loaded via Hugging Face Inference API.")
LOADED_MODELS = models
except Exception as e:
print(f"FATAL: Could not load inference models. {e}")
LOADED_MODELS = None
data = {
"LOGS": load_json(LOGS_FILE),
"COMPLIANCE": load_json(COMPLIANCE_FILE)
}
if not data["LOGS"] or not data["COMPLIANCE"]:
print("❌ WARNING: Log or compliance data failed to load.")
else:
print("βœ… Log & compliance data loaded.")
LOADED_DATA = data
return PAM(LOADED_MODELS, LOADED_DATA)
# --- Helper: classify severity ---
def classify_severity(entry: str) -> str:
entry_lower = entry.lower()
if any(x in entry_lower for x in ["unauthorized", "failed", "attack", "port scanning", "unavailable"]):
return "CRITICAL"
elif any(x in entry_lower for x in ["warning", "unexpected", "outside working hours"]):
return "WARNING"
else:
return "INFO"
# --- PAM Role ---
PAM_ROLE = (
"I am PAM, your technical assistant and infrastructure watchdog. "
"I summarize logs, detect risks, and support developers with clarity. "
"I flag anomalies, monitor compliance, and hand off client-facing issues when needed. "
"I never act on my own, always permission first, protocol always."
)
# --- Backend PAM Class ---
class PAM:
def __init__(self, models: Optional[Dict[str, Any]], data: Dict[str, Dict]):
self.phi_detector = models.get("phi_ner") if models else None
self.log_parser = models.get("log_ner") if models else None
self.summarizer = models.get("summarizer") if models else None
self.LOGS = data.get("LOGS", {})
self.COMPLIANCE = data.get("COMPLIANCE", {})
def _check_activation(self, text: str) -> Optional[str]:
if not self.phi_detector:
return "Fatal Error: PAM models failed to load on startup. πŸ› οΈ"
return None
def detect_phi(self, text: str) -> Dict[str, Any]:
error = self._check_activation(text)
if error:
return {"message": error, "role": PAM_ROLE}
entities = self.phi_detector(text)
phi = [e for e in entities if e["entity_group"] in ["PER", "LOC", "ORG", "DATE"]]
return {
"message": "πŸ”’ Scanning for PHI...",
"role": PAM_ROLE,
"has_phi": len(phi) > 0,
"entities": phi
}
def parse_log(self, log_text: str) -> Dict[str, Any]:
error = self._check_activation(log_text)
if error:
return {"message": error, "role": PAM_ROLE}
parsed = self.log_parser(log_text)
return {
"message": "πŸ•΅πŸ½β€β™€οΈ Parsing log entry...",
"role": PAM_ROLE,
"log_entities": parsed
}
def summarize(self, raw_text: str) -> Dict[str, Any]:
error = self._check_activation(raw_text)
if error:
return {"message": error, "role": PAM_ROLE}
result = self.summarizer(raw_text[:1024], max_length=150, min_length=30, do_sample=False)
return {
"message": "πŸ“Š Summary generated:",
"role": PAM_ROLE,
"summary": result[0]["summary_text"]
}
def get_latest_logs(self) -> Dict[str, Any]:
if "latest_logs" not in self.LOGS:
return {"message": "No logs available. Check JSON file location and contents.", "role": PAM_ROLE}
full_logset = []
client_handoffs = []
for item in self.LOGS["latest_logs"]:
entry = item.get("entry", "")
time = item.get("timestamp", "Unknown time")
severity = classify_severity(entry)
formatted = f"[{time}] ({severity}) -> {entry}"
full_logset.append(formatted)
if "frontend" in entry.lower() or "provider unavailable" in entry.lower():
client_handoffs.append(formatted)
return {
"message": "πŸ“‘ Infrastructure Log Review:",
"role": PAM_ROLE,
"logs": full_logset,
"handoff_to_frontend": client_handoffs
}
def check_compliance(self) -> Dict[str, Any]:
report = []
for item, status in self.COMPLIANCE.items():
emoji = "βœ…" if status else "❌"
report.append(f"{item.replace('_', ' ').title()}: {emoji}")
return {
"message": "πŸ›‘οΈ Compliance Status Overview:",
"role": PAM_ROLE,
"compliance_report": report
}
def process_input(self, user_input: str) -> Dict[str, Any]:
u_input = user_input.lower().strip()
if "check compliance" in u_input:
return self.check_compliance()
if "get logs" in u_input or "latest logs" in u_input:
return self.get_latest_logs()
if "detect phi in" in u_input:
text_to_scan = user_input[u_input.find("detect phi in") + len("detect phi in"):].strip()
return self.detect_phi(text_to_scan)
if "parse log" in u_input:
log_to_parse = user_input[u_input.find("parse log") + len("parse log"):].strip()
return self.parse_log(log_to_parse)
if "summarize" in u_input or "explain" in u_input:
return self.summarize(user_input)
return {
"response": f"Hello! I am PAM. I can process your request: '{user_input}'. Try commands like 'check compliance', 'get logs', 'detect phi in [text]', or 'parse log [log text]'.",
"role": PAM_ROLE
}