Spaces:
Runtime error
Runtime error
| #!/usr/bin/env python | |
| """ | |
| Parrot OSINT MCP β Gradio Frontend | |
| Modes: | |
| - "OSINT Dashboard" (multi-tool, opinionated) | |
| - "MCP Bridge" (raw tool_name + JSON args β JSON result) | |
| - "Analyst Copilot" (streaming LLM with OSINT context injection) | |
| """ | |
| import json | |
| import traceback | |
| from typing import Any, Dict | |
| import gradio as gr | |
| from huggingface_hub import InferenceClient | |
| # --------------------------------------------------------------------- | |
| # Task registry: adapt this to your actual task API | |
| # --------------------------------------------------------------------- | |
| TASK_REGISTRY: Dict[str, Any] = {} | |
| def _register_tasks() -> None: | |
| def _try_register(name: str, module_name: str): | |
| try: | |
| module = __import__(f"tasks.{module_name}", fromlist=["*"]) | |
| fn = getattr(module, "run", None) | |
| if callable(fn): | |
| TASK_REGISTRY[name] = fn | |
| except Exception: | |
| pass | |
| _try_register("lookup_ip", "lookup_ip") | |
| _try_register("lookup_domain", "lookup_domain") | |
| _try_register("lookup_hash", "lookup_hash") | |
| _try_register("correlate_iocs", "correlate_iocs") | |
| _try_register("generate_report", "generate_report") | |
| _try_register("enrich_entity", "enrich_entity") | |
| _try_register("mitre_map", "mitre_map") | |
| _try_register("quickscan", "quickscan") | |
| _register_tasks() | |
| # --------------------------------------------------------------------- | |
| # Core execution helpers | |
| # --------------------------------------------------------------------- | |
| def call_task(tool_name: str, payload: Dict[str, Any]) -> Dict[str, Any]: | |
| fn = TASK_REGISTRY.get(tool_name) | |
| if not fn: | |
| return { | |
| "error": f"Unknown tool '{tool_name}'. Registered tools: {sorted(TASK_REGISTRY.keys())}" | |
| } | |
| try: | |
| result = fn(**payload) | |
| if not isinstance(result, dict): | |
| result = {"result": result} | |
| return result | |
| except Exception as exc: | |
| return { | |
| "error": f"Exception in tool '{tool_name}': {exc}", | |
| "traceback": traceback.format_exc(), | |
| } | |
| def format_result_for_ui(result: Dict[str, Any]) -> Dict[str, str]: | |
| pretty_json = json.dumps(result, indent=2, default=str) | |
| markdown = result.get("markdown") or result.get("report") or "" | |
| if not markdown and "summary" in result: | |
| markdown = f"## Summary\n\n{result['summary']}" | |
| mitre = json.dumps(result.get("mitre", ""), indent=2, default=str) if result.get("mitre") else "" | |
| stix = json.dumps(result.get("stix", ""), indent=2, default=str) if result.get("stix") else "" | |
| sarif = json.dumps(result.get("sarif", ""), indent=2, default=str) if result.get("sarif") else "" | |
| return { | |
| "summary": result.get("summary", ""), | |
| "markdown": markdown, | |
| "json": pretty_json, | |
| "mitre": mitre, | |
| "stix": stix, | |
| "sarif": sarif, | |
| } | |
| # --------------------------------------------------------------------- | |
| # MODE C β ANALYST COPILOT (LLM) | |
| # --------------------------------------------------------------------- | |
| def respond(message, history, system_message, model, hf_token, temperature, top_p, max_tokens): | |
| """ | |
| Streaming LLM response using HuggingFace InferenceClient. | |
| Supports injecting OSINT task results into the conversation. | |
| """ | |
| client = InferenceClient( | |
| token=hf_token.token, | |
| model=model, | |
| ) | |
| messages = [{"role": "system", "content": system_message}] | |
| messages.extend(history) | |
| messages.append({"role": "user", "content": message}) | |
| response_text = "" | |
| for chunk in client.chat_completion( | |
| messages=messages, | |
| temperature=temperature, | |
| top_p=top_p, | |
| max_tokens=max_tokens, | |
| stream=True | |
| ): | |
| delta = chunk.choices[0].delta.content | |
| if delta: | |
| response_text += delta | |
| yield response_text | |
| def inject_osint_context(history, task_result: Dict[str, Any]): | |
| """ | |
| Inject JSON + summary + MITRE mappings directly into the chat history. | |
| """ | |
| pretty = json.dumps(task_result, indent=2, default=str) | |
| blob = f""" | |
| ### OSINT Result Injected: | |
| {pretty} | |
| """ | |
| history.append({"role": "system", "content": blob}) | |
| return history | |
| # --------------------------------------------------------------------- | |
| # Dashboard callbacks (Mode B) | |
| # --------------------------------------------------------------------- | |
| def ui_lookup_ip(ip, enrich, mitre): | |
| raw = call_task("lookup_ip", {"ip": ip, "enrich": enrich, "map_mitre": mitre}) | |
| normal = format_result_for_ui(raw) | |
| return normal["summary"], normal["markdown"], normal["json"], normal["mitre"], normal["stix"], raw | |
| def ui_lookup_domain(domain, enrich, mitre): | |
| raw = call_task("lookup_domain", {"domain": domain, "enrich": enrich, "map_mitre": mitre}) | |
| normal = format_result_for_ui(raw) | |
| return normal["summary"], normal["markdown"], normal["json"], normal["mitre"], normal["stix"], raw | |
| def ui_lookup_hash(h, ht, enrich, mitre): | |
| raw = call_task("lookup_hash", {"hash": h, "hash_type": ht, "enrich": enrich, "map_mitre": mitre}) | |
| normal = format_result_for_ui(raw) | |
| return normal["summary"], normal["markdown"], normal["json"], normal["mitre"], normal["stix"], raw | |
| def ui_correlate_iocs(iocs): | |
| parsed = [l.strip() for l in iocs.splitlines() if l.strip()] | |
| raw = call_task("correlate_iocs", {"iocs": parsed}) | |
| normal = format_result_for_ui(raw) | |
| return normal["summary"], normal["markdown"], normal["json"], normal["mitre"], raw | |
| def ui_quickscan(target): | |
| raw = call_task("quickscan", {"target": target}) | |
| normal = format_result_for_ui(raw) | |
| return normal["summary"], normal["markdown"], normal["json"], raw | |
| # --------------------------------------------------------------------- | |
| # MCP Bridge (Mode D) | |
| # --------------------------------------------------------------------- | |
| def ui_mcp_bridge(tool, args_json): | |
| try: | |
| payload = json.loads(args_json) | |
| except Exception as exc: | |
| err = {"error": f"Invalid JSON: {exc}"} | |
| return json.dumps(err, indent=2), "", err | |
| raw = call_task(tool, payload) | |
| normal = format_result_for_ui(raw) | |
| return normal["json"], normal["markdown"], raw | |
| # --------------------------------------------------------------------- | |
| # UI β Now with Analyst Copilot | |
| # --------------------------------------------------------------------- | |
| def build_interface() -> gr.Blocks: | |
| with gr.Blocks(title="Parrot OSINT MCP Console") as demo: | |
| gr.Markdown("# Parrot OSINT MCP Console") | |
| # Store OSINT task results for injection into the Copilot | |
| osint_result_state = gr.State([]) | |
| # ------------------------------------------ | |
| # MODE B β Dashboard | |
| # ------------------------------------------ | |
| with gr.Tab("OSINT Dashboard"): | |
| with gr.Tab("IP Lookup"): | |
| ip = gr.Textbox(label="IP Address") | |
| enrich = gr.Checkbox(value=True, label="Enrichment") | |
| mitre = gr.Checkbox(value=True, label="MITRE mapping") | |
| btn = gr.Button("Run") | |
| summary = gr.Textbox(label="Summary") | |
| md = gr.Markdown() | |
| js = gr.Code(language="json") | |
| mt = gr.Code(language="json") | |
| st = gr.Code(language="json") | |
| btn.click( | |
| ui_lookup_ip, | |
| inputs=[ip, enrich, mitre], | |
| outputs=[summary, md, js, mt, st, osint_result_state], | |
| ) | |
| # You already know: similar tabs for domain, hash, correlation, quickscan | |
| # (keeping focus on Copilot integration) | |
| # ------------------------------------------ | |
| # MODE D β MCP Bridge | |
| # ------------------------------------------ | |
| with gr.Tab("MCP Bridge"): | |
| tool = gr.Dropdown(sorted(TASK_REGISTRY.keys())) | |
| args = gr.Code(label="Args JSON") | |
| out_js = gr.Code(language="json") | |
| out_md = gr.Markdown() | |
| bridge_btn = gr.Button("Call Tool") | |
| bridge_btn.click( | |
| ui_mcp_bridge, | |
| inputs=[tool, args], | |
| outputs=[out_js, out_md, osint_result_state], | |
| ) | |
| # ------------------------------------------ | |
| # MODE C β Analyst Copilot | |
| # ------------------------------------------ | |
| with gr.Tab("Analyst Copilot"): | |
| gr.Markdown("### Streaming TI Assistant with OSINT Context Injection") | |
| system_msg = gr.Textbox( | |
| label="System Prompt", | |
| value=("You are a threat intelligence analyst. " | |
| "You think slowly, explain clearly, identify TTPs, " | |
| "and recommend next investigative steps."), | |
| ) | |
| model = gr.Textbox( | |
| label="HF Model (e.g., openai/gpt-oss-20b)", | |
| value="openai/gpt-oss-20b", | |
| ) | |
| chatbot = gr.ChatInterface( | |
| respond, | |
| additional_inputs=[ | |
| system_msg, | |
| model, | |
| gr.OAuthToken(label="HF Token"), | |
| gr.Slider(0.1, 2.0, value=0.7, step=0.1, label="Temperature"), | |
| gr.Slider(0.1, 1.0, value=0.95, step=0.05, label="Top-p"), | |
| gr.Slider(1, 2048, value=512, step=1, label="Max Tokens"), | |
| ], | |
| type="messages", | |
| ) | |
| inject_btn = gr.Button("Inject Latest OSINT Result") | |
| inject_btn.click( | |
| inject_osint_context, | |
| inputs=[chatbot._chatbot_state, osint_result_state], | |
| outputs=[chatbot._chatbot_state], | |
| ) | |
| return demo | |
| if __name__ == "__main__": | |
| demo = build_interface() | |
| demo.launch() |