#!/usr/bin/env python """ Parrot OSINT MCP – Gradio Frontend Modes: - "OSINT Dashboard" (multi-tool, opinionated) - "MCP Bridge" (raw tool_name + JSON args → JSON result) - "Analyst Copilot" (streaming LLM with OSINT context injection) """ import json import traceback from typing import Any, Dict import gradio as gr from huggingface_hub import InferenceClient # --------------------------------------------------------------------- # Task registry: adapt this to your actual task API # --------------------------------------------------------------------- TASK_REGISTRY: Dict[str, Any] = {} def _register_tasks() -> None: def _try_register(name: str, module_name: str): try: module = __import__(f"tasks.{module_name}", fromlist=["*"]) fn = getattr(module, "run", None) if callable(fn): TASK_REGISTRY[name] = fn except Exception: pass _try_register("lookup_ip", "lookup_ip") _try_register("lookup_domain", "lookup_domain") _try_register("lookup_hash", "lookup_hash") _try_register("correlate_iocs", "correlate_iocs") _try_register("generate_report", "generate_report") _try_register("enrich_entity", "enrich_entity") _try_register("mitre_map", "mitre_map") _try_register("quickscan", "quickscan") _register_tasks() # --------------------------------------------------------------------- # Core execution helpers # --------------------------------------------------------------------- def call_task(tool_name: str, payload: Dict[str, Any]) -> Dict[str, Any]: fn = TASK_REGISTRY.get(tool_name) if not fn: return { "error": f"Unknown tool '{tool_name}'. Registered tools: {sorted(TASK_REGISTRY.keys())}" } try: result = fn(**payload) if not isinstance(result, dict): result = {"result": result} return result except Exception as exc: return { "error": f"Exception in tool '{tool_name}': {exc}", "traceback": traceback.format_exc(), } def format_result_for_ui(result: Dict[str, Any]) -> Dict[str, str]: pretty_json = json.dumps(result, indent=2, default=str) markdown = result.get("markdown") or result.get("report") or "" if not markdown and "summary" in result: markdown = f"## Summary\n\n{result['summary']}" mitre = json.dumps(result.get("mitre", ""), indent=2, default=str) if result.get("mitre") else "" stix = json.dumps(result.get("stix", ""), indent=2, default=str) if result.get("stix") else "" sarif = json.dumps(result.get("sarif", ""), indent=2, default=str) if result.get("sarif") else "" return { "summary": result.get("summary", ""), "markdown": markdown, "json": pretty_json, "mitre": mitre, "stix": stix, "sarif": sarif, } # --------------------------------------------------------------------- # MODE C — ANALYST COPILOT (LLM) # --------------------------------------------------------------------- def respond(message, history, system_message, model, hf_token, temperature, top_p, max_tokens): """ Streaming LLM response using HuggingFace InferenceClient. Supports injecting OSINT task results into the conversation. """ client = InferenceClient( token=hf_token.token, model=model, ) messages = [{"role": "system", "content": system_message}] messages.extend(history) messages.append({"role": "user", "content": message}) response_text = "" for chunk in client.chat_completion( messages=messages, temperature=temperature, top_p=top_p, max_tokens=max_tokens, stream=True ): delta = chunk.choices[0].delta.content if delta: response_text += delta yield response_text def inject_osint_context(history, task_result: Dict[str, Any]): """ Inject JSON + summary + MITRE mappings directly into the chat history. """ pretty = json.dumps(task_result, indent=2, default=str) blob = f""" ### OSINT Result Injected: