NexusInstruments commited on
Commit
81aedc5
·
verified ·
1 Parent(s): 28ed01f

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +88 -152
app.py CHANGED
@@ -1,13 +1,12 @@
1
  import os
2
  import json
3
- import time
4
  import hashlib
5
  from datetime import datetime
6
  import gradio as gr
7
  from huggingface_hub import InferenceClient
8
 
9
  # ============================================================
10
- # CORE CONFIGURATION
11
  # ============================================================
12
 
13
  OMNISCIENT_ROOT = "/opt/omniscient"
@@ -21,14 +20,10 @@ MODEL_OPTIONS = {
21
 
22
  FALLBACK_MODEL = "HuggingFaceH4/zephyr-7b-beta"
23
 
24
- MAX_HISTORY_PAIRS = 4
25
- MAX_INPUT_CHARS = 4000
26
-
27
- # Ensure directories exist
28
  os.makedirs(EVIDENCE_ROOT, exist_ok=True)
29
 
30
  # ============================================================
31
- # AUDIT LOGGING (APPEND ONLY)
32
  # ============================================================
33
 
34
  def append_audit_log(action, details):
@@ -50,7 +45,7 @@ def append_audit_log(action, details):
50
  json.dump(data, f, indent=2)
51
 
52
  # ============================================================
53
- # HASHING + MANIFEST
54
  # ============================================================
55
 
56
  def sha256_file(path):
@@ -60,10 +55,13 @@ def sha256_file(path):
60
  h.update(chunk)
61
  return h.hexdigest()
62
 
 
 
 
63
 
64
  def register_evidence(file):
65
  if not file:
66
- return "No file provided."
67
 
68
  dest_path = os.path.join(EVIDENCE_ROOT, os.path.basename(file.name))
69
 
@@ -72,35 +70,33 @@ def register_evidence(file):
72
 
73
  file_hash = sha256_file(dest_path)
74
 
75
- manifest_entry = {
76
- "file": dest_path,
77
- "sha256": file_hash,
78
- "registered_utc": datetime.utcnow().isoformat()
79
- }
80
-
81
  manifest_path = os.path.join(EVIDENCE_ROOT, "manifest.json")
 
82
 
83
  if os.path.exists(manifest_path):
84
  with open(manifest_path, "r") as f:
85
  manifest = json.load(f)
86
- else:
87
- manifest = []
88
 
89
- manifest.append(manifest_entry)
 
 
 
 
 
 
90
 
91
  with open(manifest_path, "w") as f:
92
  json.dump(manifest, f, indent=2)
93
 
94
- append_audit_log("EVIDENCE_REGISTERED", manifest_entry)
95
-
96
- return f"Evidence Registered.\nSHA256: {file_hash}"
97
 
 
98
 
99
  def verify_manifest():
100
  manifest_path = os.path.join(EVIDENCE_ROOT, "manifest.json")
101
 
102
  if not os.path.exists(manifest_path):
103
- return "No manifest found."
104
 
105
  with open(manifest_path, "r") as f:
106
  manifest = json.load(f)
@@ -109,25 +105,25 @@ def verify_manifest():
109
 
110
  for entry in manifest:
111
  path = entry["file"]
112
- expected_hash = entry["sha256"]
113
 
114
  if not os.path.exists(path):
115
- results.append(f"MISSING: {path}")
116
  continue
117
 
118
- actual_hash = sha256_file(path)
119
 
120
- if actual_hash == expected_hash:
121
- results.append(f"VALID: {path}")
122
  else:
123
- results.append(f"HASH MISMATCH: {path}")
124
 
125
- append_audit_log("MANIFEST_VERIFICATION", {"result_count": len(results)})
126
 
127
  return "\n".join(results)
128
 
129
  # ============================================================
130
- # AI AUGMENTATION LAYER (READ-ONLY)
131
  # ============================================================
132
 
133
  def get_client(model_id):
@@ -136,7 +132,6 @@ def get_client(model_id):
136
  raise RuntimeError("HF_TOKEN not set.")
137
  return InferenceClient(model=model_id, token=token)
138
 
139
-
140
  def run_ai(model_id, messages, temperature=0.4):
141
  try:
142
  client = get_client(model_id)
@@ -148,8 +143,8 @@ def run_ai(model_id, messages, temperature=0.4):
148
  )
149
  return response.choices[0].message.content
150
  except Exception:
151
- fallback_client = get_client(FALLBACK_MODEL)
152
- response = fallback_client.chat_completion(
153
  messages=messages,
154
  max_tokens=900,
155
  temperature=temperature,
@@ -157,120 +152,63 @@ def run_ai(model_id, messages, temperature=0.4):
157
  )
158
  return "⚠️ Fallback model used.\n\n" + response.choices[0].message.content
159
 
160
-
161
- def ai_augment_analysis(message, model_id):
162
  system_prompt = """
163
- You are an AI analytical augmentation module for digital forensics.
164
-
165
- You may:
166
- - Summarize evidence
167
- - Identify technical patterns
168
- - Suggest hypotheses
169
- - Outline investigative steps
170
-
171
- You may NOT:
172
- - Modify evidence
173
- - Make legal determinations
174
- - Assert conclusions beyond observed data
175
-
176
- Structure output as:
177
 
 
178
  1. Observed Technical Artifacts
179
  2. Analytical Interpretation
180
  3. Competing Hypotheses
181
  4. Risk-Relevant Observations
182
  5. Limitations
183
  6. Recommended Human Review Steps
 
 
184
  """
185
 
186
  messages = [
187
  {"role": "system", "content": system_prompt},
188
- {"role": "user", "content": message}
189
  ]
190
 
191
- result = run_ai(model_id, messages)
192
-
193
- append_audit_log("AI_ANALYSIS", {"input_length": len(message)})
194
 
195
  return result
196
 
197
  # ============================================================
198
- # DOCUMENTATION ENGINE
199
  # ============================================================
200
 
201
- def generate_system_documentation(model_id):
202
- prompt = """
203
- Generate formal system documentation for Omniscient Investigative Infrastructure.
204
-
205
- Include:
206
- - Executive Overview
207
- - Architecture
208
- - Evidence Handling Model
209
- - Hashing & Integrity Controls
210
- - AI Governance Model
211
- - Audit Logging Controls
212
- - Court Defensibility Posture
213
- - Operational Boundaries
214
-
215
- Write in professional enterprise documentation format.
216
- """
217
 
218
- messages = [
219
- {"role": "system", "content": "You are a senior DFIR infrastructure documentation specialist."},
220
- {"role": "user", "content": prompt}
221
- ]
222
-
223
- return run_ai(model_id, messages)
224
-
225
-
226
- def generate_dfir_practice_documentation(model_id):
227
- prompt = """
228
- Generate professional digital forensics best practice documentation.
229
 
230
- Include:
231
- - Evidence Preservation
232
- - Imaging Standards
233
- - Hash Verification
234
- - Logging & Audit Trails
235
- - AI Augmentation Boundaries
236
- - Legal Safeguards
237
- - Expert Witness Preparation
238
 
239
- Write formally.
240
- """
241
 
242
  messages = [
243
- {"role": "system", "content": "You are a senior DFIR practitioner."},
244
  {"role": "user", "content": prompt}
245
  ]
246
 
247
- return run_ai(model_id, messages)
248
 
249
  # ============================================================
250
- # ROUTER
251
  # ============================================================
252
 
253
- def manager_router(message, model_label, mode, file_upload):
254
-
255
- model_id = MODEL_OPTIONS[model_label]
256
- message = (message or "")[:MAX_INPUT_CHARS]
257
-
258
- if mode == "Register Evidence":
259
- return register_evidence(file_upload)
260
-
261
- if mode == "Verify Manifest Integrity":
262
- return verify_manifest()
263
-
264
- if mode == "AI Augmentation Analysis":
265
- return ai_augment_analysis(message, model_id)
266
-
267
- if mode == "Generate System Documentation":
268
- return generate_system_documentation(model_id)
269
-
270
- if mode == "Generate DFIR Practice Documentation":
271
- return generate_dfir_practice_documentation(model_id)
272
-
273
- return "Invalid mode selected."
274
 
275
  # ============================================================
276
  # UI
@@ -278,54 +216,52 @@ def manager_router(message, model_label, mode, file_upload):
278
 
279
  with gr.Blocks(title="Omniscient Investigative Infrastructure") as demo:
280
 
281
- gr.Markdown("## Omniscient — Investigative Infrastructure & DFIR AI Augmentation")
282
-
283
- with gr.Row():
284
-
285
- with gr.Column(scale=3):
286
 
287
- output_box = gr.Textbox(lines=30, label="Output")
288
-
289
- msg = gr.Textbox(
290
- placeholder="Enter analysis request or documentation command...",
291
- label="Input"
292
- )
293
 
 
 
294
  file_upload = gr.File(label="Upload Evidence File")
 
 
295
 
296
- execute_btn = gr.Button("Execute")
297
-
298
- with gr.Column(scale=1):
 
299
 
 
 
 
300
  model_selector = gr.Dropdown(
301
  choices=list(MODEL_OPTIONS.keys()),
302
  value=list(MODEL_OPTIONS.keys())[0],
303
  label="AI Model"
304
  )
305
-
306
- mode_selector = gr.Dropdown(
307
- choices=[
308
- "Register Evidence",
309
- "Verify Manifest Integrity",
310
- "AI Augmentation Analysis",
311
- "Generate System Documentation",
312
- "Generate DFIR Practice Documentation",
313
- ],
314
- value="AI Augmentation Analysis",
315
- label="Operation Mode"
316
  )
317
-
318
- execute_btn.click(
319
- manager_router,
320
- inputs=[msg, model_selector, mode_selector, file_upload],
321
- outputs=output_box,
322
- )
323
-
324
- msg.submit(
325
- manager_router,
326
- inputs=[msg, model_selector, mode_selector, file_upload],
327
- outputs=output_box,
328
- )
 
329
 
330
  if __name__ == "__main__":
331
  demo.queue()
 
1
  import os
2
  import json
 
3
  import hashlib
4
  from datetime import datetime
5
  import gradio as gr
6
  from huggingface_hub import InferenceClient
7
 
8
  # ============================================================
9
+ # CONFIG
10
  # ============================================================
11
 
12
  OMNISCIENT_ROOT = "/opt/omniscient"
 
20
 
21
  FALLBACK_MODEL = "HuggingFaceH4/zephyr-7b-beta"
22
 
 
 
 
 
23
  os.makedirs(EVIDENCE_ROOT, exist_ok=True)
24
 
25
  # ============================================================
26
+ # AUDIT LOGGING
27
  # ============================================================
28
 
29
  def append_audit_log(action, details):
 
45
  json.dump(data, f, indent=2)
46
 
47
  # ============================================================
48
+ # HASHING
49
  # ============================================================
50
 
51
  def sha256_file(path):
 
55
  h.update(chunk)
56
  return h.hexdigest()
57
 
58
+ # ============================================================
59
+ # EVIDENCE
60
+ # ============================================================
61
 
62
  def register_evidence(file):
63
  if not file:
64
+ return "⚠️ No file uploaded."
65
 
66
  dest_path = os.path.join(EVIDENCE_ROOT, os.path.basename(file.name))
67
 
 
70
 
71
  file_hash = sha256_file(dest_path)
72
 
 
 
 
 
 
 
73
  manifest_path = os.path.join(EVIDENCE_ROOT, "manifest.json")
74
+ manifest = []
75
 
76
  if os.path.exists(manifest_path):
77
  with open(manifest_path, "r") as f:
78
  manifest = json.load(f)
 
 
79
 
80
+ entry = {
81
+ "file": dest_path,
82
+ "sha256": file_hash,
83
+ "registered_utc": datetime.utcnow().isoformat()
84
+ }
85
+
86
+ manifest.append(entry)
87
 
88
  with open(manifest_path, "w") as f:
89
  json.dump(manifest, f, indent=2)
90
 
91
+ append_audit_log("EVIDENCE_REGISTERED", entry)
 
 
92
 
93
+ return f"✅ Evidence Registered\n\nFile: {dest_path}\nSHA256: {file_hash}"
94
 
95
  def verify_manifest():
96
  manifest_path = os.path.join(EVIDENCE_ROOT, "manifest.json")
97
 
98
  if not os.path.exists(manifest_path):
99
+ return "⚠️ No manifest found."
100
 
101
  with open(manifest_path, "r") as f:
102
  manifest = json.load(f)
 
105
 
106
  for entry in manifest:
107
  path = entry["file"]
108
+ expected = entry["sha256"]
109
 
110
  if not os.path.exists(path):
111
+ results.append(f"MISSING: {path}")
112
  continue
113
 
114
+ actual = sha256_file(path)
115
 
116
+ if actual == expected:
117
+ results.append(f"VALID: {path}")
118
  else:
119
+ results.append(f"⚠️ HASH MISMATCH: {path}")
120
 
121
+ append_audit_log("MANIFEST_VERIFIED", {"entries": len(results)})
122
 
123
  return "\n".join(results)
124
 
125
  # ============================================================
126
+ # AI LAYER
127
  # ============================================================
128
 
129
  def get_client(model_id):
 
132
  raise RuntimeError("HF_TOKEN not set.")
133
  return InferenceClient(model=model_id, token=token)
134
 
 
135
  def run_ai(model_id, messages, temperature=0.4):
136
  try:
137
  client = get_client(model_id)
 
143
  )
144
  return response.choices[0].message.content
145
  except Exception:
146
+ fallback = get_client(FALLBACK_MODEL)
147
+ response = fallback.chat_completion(
148
  messages=messages,
149
  max_tokens=900,
150
  temperature=temperature,
 
152
  )
153
  return "⚠️ Fallback model used.\n\n" + response.choices[0].message.content
154
 
155
+ def ai_analysis(input_text, model_label):
 
156
  system_prompt = """
157
+ You are an AI augmentation module for digital forensics.
 
 
 
 
 
 
 
 
 
 
 
 
 
158
 
159
+ Structure:
160
  1. Observed Technical Artifacts
161
  2. Analytical Interpretation
162
  3. Competing Hypotheses
163
  4. Risk-Relevant Observations
164
  5. Limitations
165
  6. Recommended Human Review Steps
166
+
167
+ AI is advisory only.
168
  """
169
 
170
  messages = [
171
  {"role": "system", "content": system_prompt},
172
+ {"role": "user", "content": input_text}
173
  ]
174
 
175
+ result = run_ai(MODEL_OPTIONS[model_label], messages)
176
+ append_audit_log("AI_ANALYSIS", {"length": len(input_text)})
 
177
 
178
  return result
179
 
180
  # ============================================================
181
+ # DOCUMENTATION
182
  # ============================================================
183
 
184
+ def generate_documentation(doc_type, model_label):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
185
 
186
+ if doc_type == "System Architecture":
187
+ prompt = "Generate formal Omniscient system architecture documentation."
 
 
 
 
 
 
 
 
 
188
 
189
+ elif doc_type == "DFIR Best Practices":
190
+ prompt = "Generate professional digital forensics best practice documentation."
 
 
 
 
 
 
191
 
192
+ else:
193
+ return "Invalid documentation type."
194
 
195
  messages = [
196
+ {"role": "system", "content": "You are a professional DFIR documentation specialist."},
197
  {"role": "user", "content": prompt}
198
  ]
199
 
200
+ return run_ai(MODEL_OPTIONS[model_label], messages)
201
 
202
  # ============================================================
203
+ # SYSTEM STATUS
204
  # ============================================================
205
 
206
+ def system_status():
207
+ return f"""
208
+ Omniscient Root: {OMNISCIENT_ROOT}
209
+ Evidence Directory Exists: {os.path.exists(EVIDENCE_ROOT)}
210
+ Audit Log Exists: {os.path.exists(LOG_FILE)}
211
+ """
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
212
 
213
  # ============================================================
214
  # UI
 
216
 
217
  with gr.Blocks(title="Omniscient Investigative Infrastructure") as demo:
218
 
219
+ gr.Markdown("## Omniscient — Investigative Infrastructure Control Panel")
 
 
 
 
220
 
221
+ with gr.Tabs():
 
 
 
 
 
222
 
223
+ # Evidence Tab
224
+ with gr.Tab("Evidence Management"):
225
  file_upload = gr.File(label="Upload Evidence File")
226
+ register_btn = gr.Button("Register Evidence")
227
+ evidence_output = gr.Textbox(lines=10)
228
 
229
+ # Integrity Tab
230
+ with gr.Tab("Integrity Verification"):
231
+ verify_btn = gr.Button("Verify Manifest Integrity")
232
+ integrity_output = gr.Textbox(lines=15)
233
 
234
+ # AI Tab
235
+ with gr.Tab("AI Augmentation"):
236
+ ai_input = gr.Textbox(lines=8, label="Analysis Input")
237
  model_selector = gr.Dropdown(
238
  choices=list(MODEL_OPTIONS.keys()),
239
  value=list(MODEL_OPTIONS.keys())[0],
240
  label="AI Model"
241
  )
242
+ ai_btn = gr.Button("Run AI Analysis")
243
+ ai_output = gr.Textbox(lines=20)
244
+
245
+ # Documentation Tab
246
+ with gr.Tab("Documentation"):
247
+ doc_type = gr.Dropdown(
248
+ choices=["System Architecture", "DFIR Best Practices"],
249
+ value="System Architecture",
250
+ label="Documentation Type"
 
 
251
  )
252
+ doc_btn = gr.Button("Generate Documentation")
253
+ doc_output = gr.Textbox(lines=20)
254
+
255
+ # Status Tab
256
+ with gr.Tab("System Status"):
257
+ status_btn = gr.Button("Check System Status")
258
+ status_output = gr.Textbox(lines=8)
259
+
260
+ register_btn.click(register_evidence, inputs=file_upload, outputs=evidence_output)
261
+ verify_btn.click(verify_manifest, outputs=integrity_output)
262
+ ai_btn.click(ai_analysis, inputs=[ai_input, model_selector], outputs=ai_output)
263
+ doc_btn.click(generate_documentation, inputs=[doc_type, model_selector], outputs=doc_output)
264
+ status_btn.click(system_status, outputs=status_output)
265
 
266
  if __name__ == "__main__":
267
  demo.queue()