Spaces:
Runtime error
Runtime error
| #!/usr/bin/env python | |
| """ | |
| Parrot OSINT MCP β Gradio Frontend | |
| Modes: | |
| - "OSINT Dashboard" (multi-tool, opinionated) | |
| - "MCP Bridge" (raw tool_name + JSON args β JSON result) | |
| - "Analyst Copilot" (streaming LLM with OSINT context injection) | |
| """ | |
| import json | |
| import traceback | |
| from typing import Any, Dict | |
| import gradio as gr | |
| from huggingface_hub import InferenceClient | |
| # --------------------------------------------------------------------- | |
| # Task registry: adapt this to your actual task API | |
| # --------------------------------------------------------------------- | |
| TASK_REGISTRY: Dict[str, Any] = {} | |
| def _register_tasks() -> None: | |
| def _try_register(name: str, module_name: str): | |
| try: | |
| module = __import__(f"tasks.{module_name}", fromlist=["*"]) | |
| fn = getattr(module, "run", None) | |
| if callable(fn): | |
| TASK_REGISTRY[name] = fn | |
| except Exception: | |
| pass | |
| _try_register("lookup_ip", "lookup_ip") | |
| _try_register("lookup_domain", "lookup_domain") | |
| _try_register("lookup_hash", "lookup_hash") | |
| _try_register("correlate_iocs", "correlate_iocs") | |
| _try_register("generate_report", "generate_report") | |
| _try_register("enrich_entity", "enrich_entity") | |
| _try_register("mitre_map", "mitre_map") | |
| _try_register("quickscan", "quickscan") | |
| _register_tasks() | |
| # --------------------------------------------------------------------- | |
| # Core execution helpers | |
| # --------------------------------------------------------------------- | |
| def call_task(tool_name: str, payload: Dict[str, Any]) -> Dict[str, Any]: | |
| fn = TASK_REGISTRY.get(tool_name) | |
| if not fn: | |
| return { | |
| "error": f"Unknown tool '{tool_name}'. Registered tools: {sorted(TASK_REGISTRY.keys())}" | |
| } | |
| try: | |
| result = fn(**payload) | |
| if not isinstance(result, dict): | |
| result = {"result": result} | |
| return result | |
| except Exception as exc: | |
| return { | |
| "error": f"Exception in tool '{tool_name}': {exc}", | |
| "traceback": traceback.format_exc(), | |
| } | |
| def format_result_for_ui(result: Dict[str, Any]) -> Dict[str, str]: | |
| pretty_json = json.dumps(result, indent=2, default=str) | |
| markdown = result.get("markdown") or result.get("report") or "" | |
| if not markdown and "summary" in result: | |
| markdown = f"## Summary\n\n{result['summary']}" | |
| mitre = json.dumps(result.get("mitre", ""), indent=2, default=str) if result.get("mitre") else "" | |
| stix = json.dumps(result.get("stix", ""), indent=2, default=str) if result.get("stix") else "" | |
| sarif = json.dumps(result.get("sarif", ""), indent=2, default=str) if result.get("sarif") else "" | |
| return { | |
| "summary": result.get("summary", ""), | |
| "markdown": markdown, | |
| "json": pretty_json, | |
| "mitre": mitre, | |
| "stix": stix, | |
| "sarif": sarif, | |
| } | |
| # --------------------------------------------------------------------- | |
| # MODE C β ANALYST COPILOT (LLM) | |
| # --------------------------------------------------------------------- | |
| def respond(message, history, system_message, model, hf_token, temperature, top_p, max_tokens): | |
| """ | |
| Streaming LLM response using HuggingFace InferenceClient. | |
| Supports injecting OSINT task results into the conversation. | |
| """ | |
| client = InferenceClient( | |
| token=hf_token.token, | |
| model=model, | |
| ) | |
| messages = [{"role": "system", "content": system_message}] | |
| messages.extend(history) | |
| messages.append({"role": "user", "content": message}) | |
| response_text = "" | |
| for chunk in client.chat_completion( | |
| messages=messages, | |
| temperature=temperature, | |
| top_p=top_p, | |
| max_tokens=max_tokens, | |
| stream=True | |
| ): | |
| delta = chunk.choices[0].delta.content | |
| if delta: | |
| response_text += delta | |
| yield response_text | |
| def inject_osint_context(history, task_result: Dict[str, Any]): | |
| """ | |
| Inject JSON + summary + MITRE mappings directly into the chat history. | |
| """ | |
| pretty = json.dumps(task_result, indent=2, default=str) | |
| blob = f""" | |
| ### OSINT Result Injected: |