pythonprincess commited on
Commit
c261abc
·
verified ·
1 Parent(s): 2b242a4

Delete backend_pam.py

Browse files
Files changed (1) hide show
  1. backend_pam.py +0 -204
backend_pam.py DELETED
@@ -1,204 +0,0 @@
1
- # filename: backend_pam.py (UPDATED FOR INFERENCE API)
2
-
3
- from transformers import pipeline, HuggingFaceHub
4
- from datetime import datetime
5
- from typing import Dict, Any, Optional
6
- import json
7
- import os
8
-
9
- # --- Constants for Data Paths ---
10
- BASE_DIR = os.path.dirname(os.path.abspath(__file__))
11
- DATA_DIR = os.path.join(BASE_DIR, "data")
12
- LOGS_FILE = os.path.join(DATA_DIR, "logs.json")
13
- COMPLIANCE_FILE = os.path.join(DATA_DIR, "compliance.json")
14
-
15
- # --- Global Storage for Loaded Components ---
16
- LOADED_MODELS = None
17
- LOADED_DATA = None
18
-
19
- # --- Data Loading Helper ---
20
- def load_json(filepath: str) -> Dict[str, Any]:
21
- try:
22
- with open(filepath, 'r') as f:
23
- return json.load(f)
24
- except FileNotFoundError:
25
- print(f"CRITICAL: Data file not found at: {filepath}")
26
- return {}
27
- except json.JSONDecodeError as e:
28
- print(f"CRITICAL: Failed to decode JSON from {filepath}: {e}")
29
- return {}
30
-
31
- # --- Agent Initialization ---
32
- def load_agent() -> 'PAM':
33
- global LOADED_MODELS, LOADED_DATA
34
-
35
- if LOADED_MODELS is not None:
36
- print("PAM agent already loaded. Skipping reinit.")
37
- return PAM(LOADED_MODELS, LOADED_DATA)
38
-
39
- print("Loading PAM technical assistant models from Hugging Face Inference API...")
40
-
41
- HUGGING_FACE_TOKEN = os.getenv("HF_READ_TOKEN")
42
- if not HUGGING_FACE_TOKEN:
43
- print("FATAL: HF_READ_TOKEN not set in environment. PAM will fail to load.")
44
-
45
- try:
46
- models = {
47
- "phi_ner": pipeline(
48
- "ner",
49
- model=HuggingFaceHub(repo_id="Jean-Baptiste/roberta-large-ner-english", token=HUGGING_FACE_TOKEN),
50
- aggregation_strategy="simple"
51
- ),
52
- "log_ner": pipeline(
53
- "ner",
54
- model=HuggingFaceHub(repo_id="dslim/bert-base-NER", token=HUGGING_FACE_TOKEN),
55
- aggregation_strategy="simple"
56
- ),
57
- "summarizer": pipeline(
58
- "summarization",
59
- model=HuggingFaceHub(repo_id="google/flan-t5-large", token=HUGGING_FACE_TOKEN)
60
- )
61
- }
62
- print("✅ All PAM models loaded via Hugging Face Inference API.")
63
- LOADED_MODELS = models
64
- except Exception as e:
65
- print(f"FATAL: Could not load inference models. {e}")
66
- LOADED_MODELS = None
67
-
68
- data = {
69
- "LOGS": load_json(LOGS_FILE),
70
- "COMPLIANCE": load_json(COMPLIANCE_FILE)
71
- }
72
-
73
- if not data["LOGS"] or not data["COMPLIANCE"]:
74
- print("❌ WARNING: Log or compliance data failed to load.")
75
- else:
76
- print("✅ Log & compliance data loaded.")
77
-
78
- LOADED_DATA = data
79
- return PAM(LOADED_MODELS, LOADED_DATA)
80
-
81
- # --- Helper: classify severity ---
82
- def classify_severity(entry: str) -> str:
83
- entry_lower = entry.lower()
84
- if any(x in entry_lower for x in ["unauthorized", "failed", "attack", "port scanning", "unavailable"]):
85
- return "CRITICAL"
86
- elif any(x in entry_lower for x in ["warning", "unexpected", "outside working hours"]):
87
- return "WARNING"
88
- else:
89
- return "INFO"
90
-
91
- # --- PAM Role ---
92
- PAM_ROLE = (
93
- "I am PAM, your technical assistant and infrastructure watchdog. "
94
- "I summarize logs, detect risks, and support developers with clarity. "
95
- "I flag anomalies, monitor compliance, and hand off client-facing issues when needed. "
96
- "I never act on my own, always permission first, protocol always."
97
- )
98
-
99
- # --- Backend PAM Class ---
100
- class PAM:
101
- def __init__(self, models: Optional[Dict[str, Any]], data: Dict[str, Dict]):
102
- self.phi_detector = models.get("phi_ner") if models else None
103
- self.log_parser = models.get("log_ner") if models else None
104
- self.summarizer = models.get("summarizer") if models else None
105
- self.LOGS = data.get("LOGS", {})
106
- self.COMPLIANCE = data.get("COMPLIANCE", {})
107
-
108
- def _check_activation(self, text: str) -> Optional[str]:
109
- if not self.phi_detector:
110
- return "Fatal Error: PAM models failed to load on startup. 🛠️"
111
- return None
112
-
113
- def detect_phi(self, text: str) -> Dict[str, Any]:
114
- error = self._check_activation(text)
115
- if error:
116
- return {"message": error, "role": PAM_ROLE}
117
-
118
- entities = self.phi_detector(text)
119
- phi = [e for e in entities if e["entity_group"] in ["PER", "LOC", "ORG", "DATE"]]
120
- return {
121
- "message": "🔒 Scanning for PHI...",
122
- "role": PAM_ROLE,
123
- "has_phi": len(phi) > 0,
124
- "entities": phi
125
- }
126
-
127
- def parse_log(self, log_text: str) -> Dict[str, Any]:
128
- error = self._check_activation(log_text)
129
- if error:
130
- return {"message": error, "role": PAM_ROLE}
131
-
132
- parsed = self.log_parser(log_text)
133
- return {
134
- "message": "🕵🏽‍♀️ Parsing log entry...",
135
- "role": PAM_ROLE,
136
- "log_entities": parsed
137
- }
138
-
139
- def summarize(self, raw_text: str) -> Dict[str, Any]:
140
- error = self._check_activation(raw_text)
141
- if error:
142
- return {"message": error, "role": PAM_ROLE}
143
-
144
- result = self.summarizer(raw_text[:1024], max_length=150, min_length=30, do_sample=False)
145
- return {
146
- "message": "📊 Summary generated:",
147
- "role": PAM_ROLE,
148
- "summary": result[0]["summary_text"]
149
- }
150
-
151
- def get_latest_logs(self) -> Dict[str, Any]:
152
- if "latest_logs" not in self.LOGS:
153
- return {"message": "No logs available. Check JSON file location and contents.", "role": PAM_ROLE}
154
-
155
- full_logset = []
156
- client_handoffs = []
157
- for item in self.LOGS["latest_logs"]:
158
- entry = item.get("entry", "")
159
- time = item.get("timestamp", "Unknown time")
160
- severity = classify_severity(entry)
161
- formatted = f"[{time}] ({severity}) -> {entry}"
162
- full_logset.append(formatted)
163
-
164
- if "frontend" in entry.lower() or "provider unavailable" in entry.lower():
165
- client_handoffs.append(formatted)
166
-
167
- return {
168
- "message": "📡 Infrastructure Log Review:",
169
- "role": PAM_ROLE,
170
- "logs": full_logset,
171
- "handoff_to_frontend": client_handoffs
172
- }
173
-
174
- def check_compliance(self) -> Dict[str, Any]:
175
- report = []
176
- for item, status in self.COMPLIANCE.items():
177
- emoji = "✅" if status else "❌"
178
- report.append(f"{item.replace('_', ' ').title()}: {emoji}")
179
- return {
180
- "message": "🛡️ Compliance Status Overview:",
181
- "role": PAM_ROLE,
182
- "compliance_report": report
183
- }
184
-
185
- def process_input(self, user_input: str) -> Dict[str, Any]:
186
- u_input = user_input.lower().strip()
187
-
188
- if "check compliance" in u_input:
189
- return self.check_compliance()
190
- if "get logs" in u_input or "latest logs" in u_input:
191
- return self.get_latest_logs()
192
- if "detect phi in" in u_input:
193
- text_to_scan = user_input[u_input.find("detect phi in") + len("detect phi in"):].strip()
194
- return self.detect_phi(text_to_scan)
195
- if "parse log" in u_input:
196
- log_to_parse = user_input[u_input.find("parse log") + len("parse log"):].strip()
197
- return self.parse_log(log_to_parse)
198
- if "summarize" in u_input or "explain" in u_input:
199
- return self.summarize(user_input)
200
-
201
- return {
202
- "response": f"Hello! I am PAM. I can process your request: '{user_input}'. Try commands like 'check compliance', 'get logs', 'detect phi in [text]', or 'parse log [log text]'.",
203
- "role": PAM_ROLE
204
- }