File size: 4,140 Bytes
866d1e1
 
 
 
 
 
 
 
 
 
 
 
 
 
52d5899
 
 
866d1e1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
52d5899
866d1e1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
52d5899
866d1e1
 
52d5899
866d1e1
 
 
 
52d5899
 
 
 
 
866d1e1
52d5899
866d1e1
 
52d5899
 
866d1e1
 
52d5899
866d1e1
 
 
 
52d5899
 
866d1e1
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#!/usr/bin/env python
"""
Parrot OSINT MCP – Gradio Frontend

Modes:
- "OSINT Dashboard" (multi-tool, opinionated)
- "MCP Bridge" (raw tool_name + JSON args → JSON result)
- "Analyst Copilot" (streaming LLM with OSINT context injection)
"""

import json
import traceback
from typing import Any, Dict

import gradio as gr
from huggingface_hub import InferenceClient

# ---------------------------------------------------------------------
# Task registry: adapt this to your actual task API
# ---------------------------------------------------------------------

TASK_REGISTRY: Dict[str, Any] = {}

def _register_tasks() -> None:
    def _try_register(name: str, module_name: str):
        try:
            module = __import__(f"tasks.{module_name}", fromlist=["*"])
            fn = getattr(module, "run", None)
            if callable(fn):
                TASK_REGISTRY[name] = fn
        except Exception:
            pass

    _try_register("lookup_ip", "lookup_ip")
    _try_register("lookup_domain", "lookup_domain")
    _try_register("lookup_hash", "lookup_hash")
    _try_register("correlate_iocs", "correlate_iocs")
    _try_register("generate_report", "generate_report")
    _try_register("enrich_entity", "enrich_entity")
    _try_register("mitre_map", "mitre_map")
    _try_register("quickscan", "quickscan")

_register_tasks()


# ---------------------------------------------------------------------
# Core execution helpers
# ---------------------------------------------------------------------

def call_task(tool_name: str, payload: Dict[str, Any]) -> Dict[str, Any]:
    fn = TASK_REGISTRY.get(tool_name)
    if not fn:
        return {
            "error": f"Unknown tool '{tool_name}'. Registered tools: {sorted(TASK_REGISTRY.keys())}"
        }

    try:
        result = fn(**payload)
        if not isinstance(result, dict):
            result = {"result": result}
        return result
    except Exception as exc:
        return {
            "error": f"Exception in tool '{tool_name}': {exc}",
            "traceback": traceback.format_exc(),
        }


def format_result_for_ui(result: Dict[str, Any]) -> Dict[str, str]:
    pretty_json = json.dumps(result, indent=2, default=str)

    markdown = result.get("markdown") or result.get("report") or ""
    if not markdown and "summary" in result:
        markdown = f"## Summary\n\n{result['summary']}"

    mitre = json.dumps(result.get("mitre", ""), indent=2, default=str) if result.get("mitre") else ""
    stix  = json.dumps(result.get("stix", ""), indent=2, default=str) if result.get("stix")  else ""
    sarif = json.dumps(result.get("sarif", ""), indent=2, default=str) if result.get("sarif") else ""

    return {
        "summary": result.get("summary", ""),
        "markdown": markdown,
        "json": pretty_json,
        "mitre": mitre,
        "stix": stix,
        "sarif": sarif,
    }


# ---------------------------------------------------------------------
# MODE C — ANALYST COPILOT (LLM)
# ---------------------------------------------------------------------

def respond(message, history, system_message, model, hf_token, temperature, top_p, max_tokens):
    """
    Streaming LLM response using HuggingFace InferenceClient.
    Supports injecting OSINT task results into the conversation.
    """
    client = InferenceClient(
        token=hf_token.token,
        model=model,
    )

    messages = [{"role": "system", "content": system_message}]
    messages.extend(history)
    messages.append({"role": "user", "content": message})

    response_text = ""

    for chunk in client.chat_completion(
        messages=messages,
        temperature=temperature,
        top_p=top_p,
        max_tokens=max_tokens,
        stream=True
    ):
        delta = chunk.choices[0].delta.content
        if delta:
            response_text += delta
            yield response_text


def inject_osint_context(history, task_result: Dict[str, Any]):
    """
    Inject JSON + summary + MITRE mappings directly into the chat history.
    """
    pretty = json.dumps(task_result, indent=2, default=str)
    blob = f"""
### OSINT Result Injected: