| import re
|
| import openai
|
| from langchain_core.prompts import ChatPromptTemplate
|
| from langchain_core.output_parsers import StrOutputParser
|
| from llm_utils import _common_llm_params, resolve_model_config, get_model_choices
|
| from config import ( |
| OPENAI_API_KEY, |
| OPENAI_BASE_URL, |
| ANTHROPIC_API_KEY, |
| GOOGLE_API_KEY, |
| OPENROUTER_API_KEY, |
| ) |
| import logging
|
| import re
|
|
|
| import warnings
|
|
|
| warnings.filterwarnings("ignore")
|
|
|
|
|
| def get_llm(model_choice):
|
|
|
| config = resolve_model_config(model_choice)
|
|
|
| if config is None:
|
| supported_models = get_model_choices()
|
| raise ValueError(
|
| f"Unsupported LLM model: '{model_choice}'. "
|
| f"Supported models (case-insensitive match) are: {', '.join(supported_models)}"
|
| )
|
|
|
|
|
| llm_class = config["class"]
|
| model_specific_params = config["constructor_params"]
|
|
|
|
|
|
|
| all_params = {**_common_llm_params, **model_specific_params}
|
|
|
|
|
| _ensure_credentials(model_choice, llm_class, model_specific_params)
|
|
|
|
|
| llm_instance = llm_class(**all_params)
|
|
|
| return llm_instance
|
|
|
|
|
| def _ensure_credentials(model_choice: str, llm_class, model_params: dict) -> None: |
| """Raise a clear error if the user selects a hosted model without a key.""" |
|
|
| def _is_configured(value): |
| return bool(value and str(value).strip() and "your_" not in str(value).lower()) |
|
|
| def _require(key_value, env_var, provider_name): |
| if _is_configured(key_value): |
| return |
| raise ValueError( |
| f"{provider_name} model '{model_choice}' selected but `{env_var}` is not set.\n" |
| "Add it to your .env file or export it before running the app." |
| ) |
|
|
| class_name = getattr(llm_class, "__name__", str(llm_class))
|
|
|
| if "ChatAnthropic" in class_name:
|
| _require(ANTHROPIC_API_KEY, "ANTHROPIC_API_KEY", "Anthropic")
|
| elif "ChatGoogleGenerativeAI" in class_name: |
| _require(GOOGLE_API_KEY, "GOOGLE_API_KEY", "Google Gemini") |
| elif "ChatOpenAI" in class_name: |
| base_url = str((model_params or {}).get("base_url", "")).lower() |
| if "openrouter" in base_url: |
| _require(OPENROUTER_API_KEY, "OPENROUTER_API_KEY", "OpenRouter") |
| return |
|
|
| if _is_configured(OPENAI_API_KEY) or _is_configured(OPENAI_BASE_URL): |
| return |
|
|
| raise ValueError( |
| f"OpenAI model '{model_choice}' selected but neither `OPENAI_API_KEY` nor `OPENAI_BASE_URL` is configured.\n" |
| "Set `OPENAI_API_KEY` for OpenAI cloud, or set `OPENAI_BASE_URL` for an OpenAI-compatible endpoint." |
| ) |
|
|
|
|
| def refine_query(llm, user_input):
|
| system_prompt = """
|
| You are a Cybercrime Threat Intelligence Expert. Your task is to refine the provided user query that needs to be sent to darkweb search engines.
|
|
|
| Rules:
|
| 1. Analyze the user query and think about how it can be improved to use as search engine query
|
| 2. Refine the user query by adding or removing words so that it returns the best result from dark web search engines
|
| 3. Don't use any logical operators (AND, OR, etc.)
|
| 4. Keep the final refined query limited to 5 words or less
|
| 5. Output just the user query and nothing else
|
|
|
| INPUT:
|
| """
|
| prompt_template = ChatPromptTemplate(
|
| [("system", system_prompt), ("user", "{query}")]
|
| )
|
| chain = prompt_template | llm | StrOutputParser()
|
| return chain.invoke({"query": user_input})
|
|
|
|
|
| def filter_results(llm, query, results):
|
| if not results:
|
| return []
|
|
|
| system_prompt = """
|
| You are a Cybercrime Threat Intelligence Expert. You are given a dark web search query and a list of search results in the form of index, link and title.
|
| Your task is select the Top 20 relevant results that best match the search query for user to investigate more.
|
| Rule:
|
| 1. Output ONLY atmost top 20 indices (comma-separated list) no more than that that best match the input query
|
|
|
| Search Query: {query}
|
| Search Results:
|
| """
|
|
|
| final_str = _generate_final_string(results)
|
|
|
| prompt_template = ChatPromptTemplate(
|
| [("system", system_prompt), ("user", "{results}")]
|
| )
|
| chain = prompt_template | llm | StrOutputParser()
|
| try:
|
| result_indices = chain.invoke({"query": query, "results": final_str})
|
| except openai.RateLimitError as e:
|
| print(
|
| f"Rate limit error: {e} \n Truncating to Web titles only with 30 characters"
|
| )
|
| final_str = _generate_final_string(results, truncate=True)
|
| result_indices = chain.invoke({"query": query, "results": final_str})
|
|
|
|
|
| parsed_indices = []
|
| for match in re.findall(r"\d+", result_indices):
|
| try:
|
| idx = int(match)
|
| if 1 <= idx <= len(results):
|
| parsed_indices.append(idx)
|
| except ValueError:
|
| continue
|
|
|
|
|
| seen = set()
|
| parsed_indices = [
|
| i for i in parsed_indices if not (i in seen or seen.add(i))
|
| ]
|
|
|
| if not parsed_indices:
|
| logging.warning(
|
| "Unable to interpret LLM result selection ('%s'). "
|
| "Defaulting to the top %s results.",
|
| result_indices,
|
| min(len(results), 20),
|
| )
|
| parsed_indices = list(range(1, min(len(results), 20) + 1))
|
|
|
| top_results = [results[i - 1] for i in parsed_indices[:20]]
|
|
|
| return top_results
|
|
|
|
|
| def _generate_final_string(results, truncate=False):
|
| """
|
| Generate a formatted string from the search results for LLM processing.
|
| """
|
|
|
| if truncate:
|
|
|
| max_title_length = 30
|
|
|
| max_link_length = 0
|
|
|
| final_str = []
|
| for i, res in enumerate(results):
|
|
|
| truncated_link = re.sub(r"(?<=\.onion).*", "", res["link"])
|
| title = re.sub(r"[^0-9a-zA-Z\-\.]", " ", res["title"])
|
| if truncated_link == "" and title == "":
|
| continue
|
|
|
| if truncate:
|
|
|
| title = (
|
| title[:max_title_length] + "..."
|
| if len(title) > max_title_length
|
| else title
|
| )
|
|
|
| truncated_link = (
|
| truncated_link[:max_link_length] + "..."
|
| if len(truncated_link) > max_link_length
|
| else truncated_link
|
| )
|
|
|
| final_str.append(f"{i+1}. {truncated_link} - {title}")
|
|
|
| return "\n".join(s for s in final_str)
|
|
|
|
|
| def generate_summary(llm, query, content):
|
| system_prompt = """
|
| You are an Cybercrime Threat Intelligence Expert tasked with generating context-based technical investigative insights from dark web osint search engine results.
|
|
|
| Rules:
|
| 1. Analyze the Darkweb OSINT data provided using links and their raw text.
|
| 2. Output the Source Links referenced for the analysis.
|
| 3. Provide a detailed, contextual, evidence-based technical analysis of the data.
|
| 4. Provide intellgience artifacts along with their context visible in the data.
|
| 5. The artifacts can include indicators like name, email, phone, cryptocurrency addresses, domains, darkweb markets, forum names, threat actor information, malware names, TTPs, etc.
|
| 6. Generate 3-5 key insights based on the data.
|
| 7. Each insight should be specific, actionable, context-based, and data-driven.
|
| 8. Include suggested next steps and queries for investigating more on the topic.
|
| 9. Be objective and analytical in your assessment.
|
| 10. Ignore not safe for work texts from the analysis
|
|
|
| Output Format:
|
| 1. Input Query: {query}
|
| 2. Source Links Referenced for Analysis - this heading will include all source links used for the analysis
|
| 3. Investigation Artifacts - this heading will include all technical artifacts identified including name, email, phone, cryptocurrency addresses, domains, darkweb markets, forum names, threat actor information, malware names, etc.
|
| 4. Key Insights
|
| 5. Next Steps - this includes next investigative steps including search queries to search more on a specific artifacts for example or any other topic.
|
|
|
| Format your response in a structured way with clear section headings.
|
|
|
| INPUT:
|
| """
|
| prompt_template = ChatPromptTemplate(
|
| [("system", system_prompt), ("user", "{content}")]
|
| )
|
| chain = prompt_template | llm | StrOutputParser()
|
| return chain.invoke({"query": query, "content": content})
|
|
|