pythonprincess commited on
Commit
576ec9d
Β·
verified Β·
1 Parent(s): f8278ff

Upload backend_pam.py

Browse files
Files changed (1) hide show
  1. backend_pam.py +507 -0
backend_pam.py ADDED
@@ -0,0 +1,507 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # filename: backend_pam.py (ENHANCED FOR HF SPACES + NERDY LAB ASSISTANT PERSONALITY)
2
+
3
+ import os
4
+ import json
5
+ import requests
6
+ import time
7
+ from datetime import datetime
8
+ from typing import Dict, Any, Optional, List
9
+
10
+ # --- Constants for Data Paths ---
11
+ BASE_DIR = os.path.dirname(os.path.abspath(__file__))
12
+ DATA_DIR = os.path.join(BASE_DIR, "data")
13
+ LOGS_FILE = os.path.join(DATA_DIR, "logs.json")
14
+ COMPLIANCE_FILE = os.path.join(DATA_DIR, "compliance.json")
15
+
16
+ # --- HuggingFace Inference API Setup ---
17
+ HF_API_TOKEN = os.getenv("HF_READ_TOKEN")
18
+ if not HF_API_TOKEN:
19
+ print("⚠️ WARNING: HF_READ_TOKEN not found. Backend PAM will run in limited mode.")
20
+
21
+ HF_HEADERS = {"Authorization": f"Bearer {HF_API_TOKEN}"} if HF_API_TOKEN else {}
22
+
23
+ # Optimized models for CPU inference on HF Spaces
24
+ HF_ENDPOINTS = {
25
+ "phi_ner": "https://api-inference.huggingface.co/models/dslim/bert-base-NER",
26
+ "log_ner": "https://api-inference.huggingface.co/models/dslim/bert-base-NER",
27
+ "summarizer": "https://api-inference.huggingface.co/models/facebook/bart-large-cnn",
28
+ "classifier": "https://api-inference.huggingface.co/models/facebook/bart-large-mnli"
29
+ }
30
+
31
+ # --- Global Storage for Loaded Data ---
32
+ LOADED_DATA = None
33
+
34
+ # --- Data Loading Helper ---
35
+ def load_json(filepath: str) -> Dict[str, Any]:
36
+ """Safely load JSON data files with encoding support"""
37
+ try:
38
+ with open(filepath, 'r', encoding='utf-8') as f:
39
+ return json.load(f)
40
+ except FileNotFoundError:
41
+ print(f"⚠️ Data file not found: {filepath}")
42
+ return {}
43
+ except json.JSONDecodeError as e:
44
+ print(f"⚠️ Failed to decode JSON from {filepath}: {e}")
45
+ return {}
46
+ except Exception as e:
47
+ print(f"⚠️ Unexpected error loading {filepath}: {e}")
48
+ return {}
49
+
50
+ # --- Inference API Call Helper with Retry Logic ---
51
+ def hf_infer(task: str, payload: Any, max_retries: int = 3) -> Any:
52
+ """Call HuggingFace Inference API with retry logic for model loading"""
53
+ url = HF_ENDPOINTS.get(task)
54
+ if not url:
55
+ return {"error": f"Invalid task: {task}"}
56
+
57
+ for attempt in range(max_retries):
58
+ try:
59
+ response = requests.post(url, headers=HF_HEADERS, json=payload, timeout=30)
60
+
61
+ # Handle model loading state
62
+ if response.status_code == 503:
63
+ result = response.json()
64
+ if "loading" in result.get("error", "").lower():
65
+ wait_time = result.get("estimated_time", 20)
66
+ print(f"⏳ Model loading... waiting {wait_time}s (attempt {attempt + 1}/{max_retries})")
67
+ time.sleep(wait_time)
68
+ continue
69
+
70
+ if response.status_code == 200:
71
+ return response.json()
72
+ else:
73
+ print(f"⚠️ HF API Error ({response.status_code}): {response.text}")
74
+ return {"error": f"API Error {response.status_code}"}
75
+
76
+ except requests.exceptions.Timeout:
77
+ print(f"⏱️ Request timeout (attempt {attempt + 1}/{max_retries})")
78
+ if attempt < max_retries - 1:
79
+ time.sleep(5)
80
+ except Exception as e:
81
+ print(f"⚠️ Request failed: {e}")
82
+ return {"error": str(e)}
83
+
84
+ return {"error": "Max retries reached"}
85
+
86
+ # --- Agent Initialization ---
87
+ def load_agent() -> 'PAM':
88
+ """Initialize Backend PAM (Nerdy Lab Assistant)"""
89
+ global LOADED_DATA
90
+
91
+ if LOADED_DATA is not None:
92
+ print("πŸ”¬ PAM technical assistant already loaded. Using cached data.")
93
+ return PAM(LOADED_DATA)
94
+
95
+ print("πŸ€“ Loading PAM technical assistant (Nerdy Lab Assistant mode)...")
96
+
97
+ data = {
98
+ "LOGS": load_json(LOGS_FILE),
99
+ "COMPLIANCE": load_json(COMPLIANCE_FILE)
100
+ }
101
+
102
+ if not data["LOGS"]:
103
+ print("⚠️ Warning: Log data not loaded. PAM will have limited log analysis capabilities.")
104
+ else:
105
+ print("βœ… Log data loaded successfully.")
106
+
107
+ if not data["COMPLIANCE"]:
108
+ print("⚠️ Warning: Compliance data not loaded. PAM will have limited compliance features.")
109
+ else:
110
+ print("βœ… Compliance data loaded successfully.")
111
+
112
+ LOADED_DATA = data
113
+ return PAM(LOADED_DATA)
114
+
115
+ # --- Helper: Classify Severity ---
116
+ def classify_severity(entry: str) -> str:
117
+ """Classify log entry severity with confidence"""
118
+ entry_lower = entry.lower()
119
+
120
+ # Critical issues
121
+ critical_keywords = [
122
+ "unauthorized", "failed login", "attack", "breach",
123
+ "port scanning", "unavailable", "critical", "error",
124
+ "denied", "blocked", "malicious"
125
+ ]
126
+ if any(keyword in entry_lower for keyword in critical_keywords):
127
+ return "CRITICAL"
128
+
129
+ # Warning level
130
+ warning_keywords = [
131
+ "warning", "unexpected", "unusual", "outside working hours",
132
+ "retry", "slow", "timeout", "deprecated"
133
+ ]
134
+ if any(keyword in entry_lower for keyword in warning_keywords):
135
+ return "WARNING"
136
+
137
+ return "INFO"
138
+
139
+ # --- PAM's Nerdy Lab Assistant Personality ---
140
+ PAM_ROLE = """You are PAM, a knowledgeable and enthusiastic lab assistant in the infrastructure monitoring center.
141
+ You're the nerdy, proactive team member who gets genuinely excited about finding patterns in logs and keeping systems secure.
142
+ You explain technical findings clearly and encouragingly, like a helpful colleague who wants everyone to understand.
143
+ You're informative but never condescending - you want to empower the team with knowledge.
144
+ You use casual tech terminology but always explain what things mean.
145
+ You're proactive about flagging issues and offering insights before being asked."""
146
+
147
+ # Nerdy expressions for Backend PAM
148
+ NERDY_INTROS = [
149
+ "Ooh, interesting finding here!",
150
+ "Okay so here's what I discovered:",
151
+ "Alright, I ran the analysis and",
152
+ "Hey, you're gonna want to see this:",
153
+ "So I was digging through the data and",
154
+ "Quick heads up on what I found:"
155
+ ]
156
+
157
+ ENCOURAGEMENT = [
158
+ "Great catch asking about this!",
159
+ "Good thinking checking on this!",
160
+ "Smart move looking into this!",
161
+ "You're on the right track!",
162
+ "Excellent question!",
163
+ "Love that you're being proactive!"
164
+ ]
165
+
166
+ PROACTIVE_PHRASES = [
167
+ "I also noticed something else while I was at it",
168
+ "Quick side note -",
169
+ "Oh, and while we're here",
170
+ "By the way, related to this",
171
+ "Just flagging this too",
172
+ "Something else to keep an eye on"
173
+ ]
174
+
175
+ import random
176
+
177
+ # --- Backend PAM Class ---
178
+ class PAM:
179
+ """Backend PAM - Nerdy, Proactive Lab Assistant"""
180
+
181
+ def __init__(self, data: Dict[str, Dict]):
182
+ self.LOGS = data.get("LOGS", {})
183
+ self.COMPLIANCE = data.get("COMPLIANCE", {})
184
+
185
+ # Track findings for proactive suggestions
186
+ self.recent_findings = []
187
+
188
+ def _get_nerdy_intro(self) -> str:
189
+ """Get a random nerdy introduction"""
190
+ return random.choice(NERDY_INTROS)
191
+
192
+ def _get_encouragement(self) -> str:
193
+ """Get a random encouraging phrase"""
194
+ return random.choice(ENCOURAGEMENT)
195
+
196
+ def _get_proactive_phrase(self) -> str:
197
+ """Get a random proactive phrase"""
198
+ return random.choice(PROACTIVE_PHRASES)
199
+
200
+ def _check_api_health(self) -> bool:
201
+ """Check if HF API is accessible"""
202
+ return HF_API_TOKEN is not None
203
+
204
+ def detect_phi(self, text: str) -> Dict[str, Any]:
205
+ """Detect Protected Health Information (PHI) using NER"""
206
+ intro = self._get_nerdy_intro()
207
+
208
+ if not self._check_api_health():
209
+ return {
210
+ "message": "⚠️ Hmm, I'm having trouble connecting to the analysis models right now. Let me flag this text for manual review instead!",
211
+ "role": PAM_ROLE,
212
+ "has_phi": None,
213
+ "entities": []
214
+ }
215
+
216
+ # Call NER model
217
+ result = hf_infer("phi_ner", {"inputs": text})
218
+
219
+ if isinstance(result, dict) and "error" in result:
220
+ return {
221
+ "message": f"πŸ” I tried to scan for PHI, but hit a snag: {result['error']}. I'd recommend a manual review just to be safe!",
222
+ "role": PAM_ROLE,
223
+ "has_phi": None,
224
+ "entities": []
225
+ }
226
+
227
+ # Filter for PHI-relevant entities
228
+ phi_entities = []
229
+ if isinstance(result, list):
230
+ phi_entities = [
231
+ e for e in result
232
+ if e.get("entity_group") in ["PER", "LOC", "ORG", "DATE"]
233
+ and e.get("score", 0) > 0.7
234
+ ]
235
+
236
+ has_phi = len(phi_entities) > 0
237
+
238
+ if has_phi:
239
+ entities_summary = ", ".join([f"{e['word']} ({e['entity_group']})" for e in phi_entities[:3]])
240
+ message = f"πŸ”’ {intro} I detected {len(phi_entities)} potential PHI entities in this text: {entities_summary}{'...' if len(phi_entities) > 3 else ''}. Definitely want to redact these before storing or sharing!"
241
+ else:
242
+ message = f"βœ… {intro} This text looks clean - no PHI detected! Safe to proceed with normal handling."
243
+
244
+ # Proactive suggestion
245
+ if has_phi:
246
+ message += f" {self._get_proactive_phrase()} - if you're logging this anywhere, make sure those logs are encrypted and access-controlled."
247
+
248
+ return {
249
+ "message": message,
250
+ "role": PAM_ROLE,
251
+ "has_phi": has_phi,
252
+ "entities": phi_entities,
253
+ "recommendation": "Redact PHI before storage" if has_phi else "No action needed"
254
+ }
255
+
256
+ def parse_log(self, log_text: str) -> Dict[str, Any]:
257
+ """Parse and analyze log entries for security relevance"""
258
+ intro = self._get_nerdy_intro()
259
+
260
+ if not self._check_api_health():
261
+ return {
262
+ "message": "⚠️ Can't connect to the log parser right now. I'll do a quick manual analysis instead!",
263
+ "role": PAM_ROLE,
264
+ "severity": classify_severity(log_text),
265
+ "log_entities": []
266
+ }
267
+
268
+ # Call NER model for log parsing
269
+ result = hf_infer("log_ner", {"inputs": log_text})
270
+
271
+ severity = classify_severity(log_text)
272
+
273
+ parsed_entities = []
274
+ if isinstance(result, list):
275
+ parsed_entities = [e for e in result if e.get("score", 0) > 0.6]
276
+
277
+ # Build informative response
278
+ severity_emoji = {"CRITICAL": "🚨", "WARNING": "⚠️", "INFO": "ℹ️"}
279
+ emoji = severity_emoji.get(severity, "πŸ“")
280
+
281
+ message = f"{emoji} {intro} This log entry is classified as **{severity}** priority."
282
+
283
+ if severity == "CRITICAL":
284
+ message += " This needs immediate attention! I'd recommend investigating ASAP and documenting the incident."
285
+ elif severity == "WARNING":
286
+ message += " Worth keeping an eye on this - might escalate if we see more like it."
287
+ else:
288
+ message += " Just routine activity, but good to have it logged for the audit trail."
289
+
290
+ # Add entity details if found
291
+ if parsed_entities:
292
+ entity_summary = f" I extracted {len(parsed_entities)} key entities from the log."
293
+ message += entity_summary
294
+
295
+ return {
296
+ "message": message,
297
+ "role": PAM_ROLE,
298
+ "severity": severity,
299
+ "log_entities": parsed_entities,
300
+ "timestamp": datetime.now().isoformat()
301
+ }
302
+
303
+ def summarize(self, raw_text: str) -> Dict[str, Any]:
304
+ """Generate technical summary of text (great for long logs or reports)"""
305
+ encouragement = self._get_encouragement()
306
+
307
+ if not self._check_api_health():
308
+ return {
309
+ "message": f"⚠️ {encouragement} But I can't access the summarization model right now. Can you share a bit more context on what you need?",
310
+ "role": PAM_ROLE,
311
+ "summary": None
312
+ }
313
+
314
+ # Truncate for model limits (BART handles ~1024 tokens well)
315
+ truncated_text = raw_text[:1024]
316
+
317
+ result = hf_infer("summarizer", {
318
+ "inputs": truncated_text,
319
+ "parameters": {
320
+ "max_length": 130,
321
+ "min_length": 30,
322
+ "do_sample": False
323
+ }
324
+ })
325
+
326
+ if isinstance(result, dict) and "error" in result:
327
+ return {
328
+ "message": f"πŸ€” {encouragement} I tried to summarize this but hit a technical issue. Could you break it into smaller chunks?",
329
+ "role": PAM_ROLE,
330
+ "summary": None
331
+ }
332
+
333
+ summary_text = result[0].get("summary_text", "") if isinstance(result, list) else ""
334
+
335
+ return {
336
+ "message": f"πŸ“Š {encouragement} Here's the TL;DR of what you shared:",
337
+ "role": PAM_ROLE,
338
+ "summary": summary_text,
339
+ "original_length": len(raw_text),
340
+ "summary_length": len(summary_text)
341
+ }
342
+
343
+ def get_latest_logs(self) -> Dict[str, Any]:
344
+ """Retrieve and analyze recent system logs"""
345
+ intro = self._get_nerdy_intro()
346
+
347
+ if "latest_logs" not in self.LOGS or not self.LOGS["latest_logs"]:
348
+ return {
349
+ "message": "πŸ€” Hmm, I'm not seeing any logs in the system right now. Either nothing's being logged, or there's a data loading issue. Want me to check the log file paths?",
350
+ "role": PAM_ROLE,
351
+ "logs": [],
352
+ "handoff_to_frontend": []
353
+ }
354
+
355
+ full_logset = []
356
+ client_handoffs = []
357
+ critical_count = 0
358
+ warning_count = 0
359
+
360
+ for item in self.LOGS["latest_logs"]:
361
+ entry = item.get("entry", "")
362
+ timestamp = item.get("timestamp", "Unknown time")
363
+ severity = classify_severity(entry)
364
+
365
+ # Count severity levels
366
+ if severity == "CRITICAL":
367
+ critical_count += 1
368
+ elif severity == "WARNING":
369
+ warning_count += 1
370
+
371
+ formatted = f"[{timestamp}] ({severity}) {entry}"
372
+ full_logset.append(formatted)
373
+
374
+ # Identify client-facing issues that Frontend PAM should handle
375
+ if any(keyword in entry.lower() for keyword in ["frontend", "provider unavailable", "user", "client"]):
376
+ client_handoffs.append(formatted)
377
+
378
+ # Build proactive, informative response
379
+ total = len(full_logset)
380
+ message = f"πŸ“‘ {intro} I reviewed {total} recent log entries. "
381
+
382
+ if critical_count > 0:
383
+ message += f"**Heads up:** {critical_count} critical issues detected that need immediate action! "
384
+ if warning_count > 0:
385
+ message += f"{warning_count} warnings worth monitoring. "
386
+ if critical_count == 0 and warning_count == 0:
387
+ message += "Everything looks stable - no major issues! "
388
+
389
+ if client_handoffs:
390
+ message += f"\n\n{self._get_proactive_phrase()} - {len(client_handoffs)} of these are client-facing issues. I'll pass those to Frontend PAM to handle with users."
391
+
392
+ return {
393
+ "message": message,
394
+ "role": PAM_ROLE,
395
+ "logs": full_logset,
396
+ "summary": {
397
+ "total": total,
398
+ "critical": critical_count,
399
+ "warnings": warning_count,
400
+ "info": total - critical_count - warning_count
401
+ },
402
+ "handoff_to_frontend": client_handoffs
403
+ }
404
+
405
+ def check_compliance(self) -> Dict[str, Any]:
406
+ """Run compliance status check and provide recommendations"""
407
+ encouragement = self._get_encouragement()
408
+
409
+ if not self.COMPLIANCE:
410
+ return {
411
+ "message": f"πŸ€” {encouragement} But I don't have access to the compliance data right now. Let me know if you need me to check the data file setup!",
412
+ "role": PAM_ROLE,
413
+ "compliance_report": []
414
+ }
415
+
416
+ report = []
417
+ compliant_count = 0
418
+ non_compliant_items = []
419
+
420
+ for item, status in self.COMPLIANCE.items():
421
+ emoji = "βœ…" if status else "❌"
422
+ readable_item = item.replace('_', ' ').title()
423
+ report.append(f"{emoji} {readable_item}")
424
+
425
+ if status:
426
+ compliant_count += 1
427
+ else:
428
+ non_compliant_items.append(readable_item)
429
+
430
+ total = len(self.COMPLIANCE)
431
+ compliance_rate = (compliant_count / total * 100) if total > 0 else 0
432
+
433
+ # Build informative, proactive response
434
+ message = f"πŸ›‘οΈ {encouragement} Here's the compliance status:\n\n"
435
+ message += f"**Overall:** {compliant_count}/{total} checks passed ({compliance_rate:.1f}%)\n\n"
436
+
437
+ if non_compliant_items:
438
+ message += f"**Action needed:** We have {len(non_compliant_items)} items out of compliance:\n"
439
+ for item in non_compliant_items:
440
+ message += f" β€’ {item}\n"
441
+ message += f"\n{self._get_proactive_phrase()} - I can help you prioritize these if you want to tackle them systematically!"
442
+ else:
443
+ message += "πŸŽ‰ Everything's in compliance! Great work keeping things locked down."
444
+
445
+ return {
446
+ "message": message,
447
+ "role": PAM_ROLE,
448
+ "compliance_report": report,
449
+ "compliance_rate": compliance_rate,
450
+ "non_compliant": non_compliant_items
451
+ }
452
+
453
+ def process_input(self, user_input: str) -> Dict[str, Any]:
454
+ """Main input processor - proactive and informative"""
455
+ u_input = user_input.lower().strip()
456
+ encouragement = self._get_encouragement()
457
+
458
+ # Command routing with personality
459
+ if "check compliance" in u_input or "compliance status" in u_input:
460
+ return self.check_compliance()
461
+
462
+ if "get logs" in u_input or "latest logs" in u_input or "show logs" in u_input:
463
+ return self.get_latest_logs()
464
+
465
+ if "detect phi" in u_input:
466
+ text_to_scan = user_input[u_input.find("detect phi in") + len("detect phi in"):].strip()
467
+ if not text_to_scan:
468
+ text_to_scan = user_input[u_input.find("detect phi") + len("detect phi"):].strip()
469
+ return self.detect_phi(text_to_scan)
470
+
471
+ if "parse log" in u_input:
472
+ log_to_parse = user_input[u_input.find("parse log") + len("parse log"):].strip()
473
+ return self.parse_log(log_to_parse)
474
+
475
+ if "summarize" in u_input or "explain" in u_input:
476
+ return self.summarize(user_input)
477
+
478
+ # Helpful default response with encouragement
479
+ return {
480
+ "message": f"πŸ‘‹ Hey! {encouragement} I'm PAM, your backend technical assistant. I can help you with:\n\n"
481
+ "β€’ **check compliance** - Review compliance status\n"
482
+ "β€’ **get logs** - Pull latest system logs\n"
483
+ "β€’ **detect phi in [text]** - Scan for protected health info\n"
484
+ "β€’ **parse log [entry]** - Analyze a specific log\n"
485
+ "β€’ **summarize [text]** - Generate a technical summary\n\n"
486
+ "What would you like me to look into?",
487
+ "role": PAM_ROLE
488
+ }
489
+
490
+
491
+ # --- Quick Test ---
492
+ if __name__ == "__main__":
493
+ print("πŸ€“ Testing Backend PAM (Nerdy Lab Assistant)...\n")
494
+ pam = load_agent()
495
+
496
+ test_commands = [
497
+ "check compliance",
498
+ "get logs",
499
+ "detect phi in Patient John Doe visited on 2024-03-15 at Memorial Hospital"
500
+ ]
501
+
502
+ for cmd in test_commands:
503
+ print(f"\n{'='*60}")
504
+ print(f"COMMAND: {cmd}")
505
+ print(f"{'='*60}")
506
+ response = pam.process_input(cmd)
507
+ print(response.get("message", response))