OSINTMCPServer / app.py
S-Dreamer's picture
Update app.py
866d1e1 verified
raw
history blame
4.14 kB
#!/usr/bin/env python
"""
Parrot OSINT MCP – Gradio Frontend
Modes:
- "OSINT Dashboard" (multi-tool, opinionated)
- "MCP Bridge" (raw tool_name + JSON args β†’ JSON result)
- "Analyst Copilot" (streaming LLM with OSINT context injection)
"""
import json
import traceback
from typing import Any, Dict
import gradio as gr
from huggingface_hub import InferenceClient
# ---------------------------------------------------------------------
# Task registry: adapt this to your actual task API
# ---------------------------------------------------------------------
TASK_REGISTRY: Dict[str, Any] = {}
def _register_tasks() -> None:
def _try_register(name: str, module_name: str):
try:
module = __import__(f"tasks.{module_name}", fromlist=["*"])
fn = getattr(module, "run", None)
if callable(fn):
TASK_REGISTRY[name] = fn
except Exception:
pass
_try_register("lookup_ip", "lookup_ip")
_try_register("lookup_domain", "lookup_domain")
_try_register("lookup_hash", "lookup_hash")
_try_register("correlate_iocs", "correlate_iocs")
_try_register("generate_report", "generate_report")
_try_register("enrich_entity", "enrich_entity")
_try_register("mitre_map", "mitre_map")
_try_register("quickscan", "quickscan")
_register_tasks()
# ---------------------------------------------------------------------
# Core execution helpers
# ---------------------------------------------------------------------
def call_task(tool_name: str, payload: Dict[str, Any]) -> Dict[str, Any]:
fn = TASK_REGISTRY.get(tool_name)
if not fn:
return {
"error": f"Unknown tool '{tool_name}'. Registered tools: {sorted(TASK_REGISTRY.keys())}"
}
try:
result = fn(**payload)
if not isinstance(result, dict):
result = {"result": result}
return result
except Exception as exc:
return {
"error": f"Exception in tool '{tool_name}': {exc}",
"traceback": traceback.format_exc(),
}
def format_result_for_ui(result: Dict[str, Any]) -> Dict[str, str]:
pretty_json = json.dumps(result, indent=2, default=str)
markdown = result.get("markdown") or result.get("report") or ""
if not markdown and "summary" in result:
markdown = f"## Summary\n\n{result['summary']}"
mitre = json.dumps(result.get("mitre", ""), indent=2, default=str) if result.get("mitre") else ""
stix = json.dumps(result.get("stix", ""), indent=2, default=str) if result.get("stix") else ""
sarif = json.dumps(result.get("sarif", ""), indent=2, default=str) if result.get("sarif") else ""
return {
"summary": result.get("summary", ""),
"markdown": markdown,
"json": pretty_json,
"mitre": mitre,
"stix": stix,
"sarif": sarif,
}
# ---------------------------------------------------------------------
# MODE C β€” ANALYST COPILOT (LLM)
# ---------------------------------------------------------------------
def respond(message, history, system_message, model, hf_token, temperature, top_p, max_tokens):
"""
Streaming LLM response using HuggingFace InferenceClient.
Supports injecting OSINT task results into the conversation.
"""
client = InferenceClient(
token=hf_token.token,
model=model,
)
messages = [{"role": "system", "content": system_message}]
messages.extend(history)
messages.append({"role": "user", "content": message})
response_text = ""
for chunk in client.chat_completion(
messages=messages,
temperature=temperature,
top_p=top_p,
max_tokens=max_tokens,
stream=True
):
delta = chunk.choices[0].delta.content
if delta:
response_text += delta
yield response_text
def inject_osint_context(history, task_result: Dict[str, Any]):
"""
Inject JSON + summary + MITRE mappings directly into the chat history.
"""
pretty = json.dumps(task_result, indent=2, default=str)
blob = f"""
### OSINT Result Injected: