robin / llm.py
pjpjq's picture
Support custom OPENAI_BASE_URL and document HF secret
e65fffa
import re
import openai
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from llm_utils import _common_llm_params, resolve_model_config, get_model_choices
from config import (
OPENAI_API_KEY,
OPENAI_BASE_URL,
ANTHROPIC_API_KEY,
GOOGLE_API_KEY,
OPENROUTER_API_KEY,
)
import logging
import re
import warnings
warnings.filterwarnings("ignore")
def get_llm(model_choice):
# Look up the configuration (cloud or local Ollama)
config = resolve_model_config(model_choice)
if config is None: # Extra error check
supported_models = get_model_choices()
raise ValueError(
f"Unsupported LLM model: '{model_choice}'. "
f"Supported models (case-insensitive match) are: {', '.join(supported_models)}"
)
# Extract the necessary information from the configuration
llm_class = config["class"]
model_specific_params = config["constructor_params"]
# Combine common parameters with model-specific parameters
# Model-specific parameters will override common ones if there are any conflicts
all_params = {**_common_llm_params, **model_specific_params}
# Validate that the required credentials exist before we hit the API
_ensure_credentials(model_choice, llm_class, model_specific_params)
# Create the LLM instance using the gathered parameters
llm_instance = llm_class(**all_params)
return llm_instance
def _ensure_credentials(model_choice: str, llm_class, model_params: dict) -> None:
"""Raise a clear error if the user selects a hosted model without a key."""
def _is_configured(value):
return bool(value and str(value).strip() and "your_" not in str(value).lower())
def _require(key_value, env_var, provider_name):
if _is_configured(key_value):
return
raise ValueError(
f"{provider_name} model '{model_choice}' selected but `{env_var}` is not set.\n"
"Add it to your .env file or export it before running the app."
)
class_name = getattr(llm_class, "__name__", str(llm_class))
if "ChatAnthropic" in class_name:
_require(ANTHROPIC_API_KEY, "ANTHROPIC_API_KEY", "Anthropic")
elif "ChatGoogleGenerativeAI" in class_name:
_require(GOOGLE_API_KEY, "GOOGLE_API_KEY", "Google Gemini")
elif "ChatOpenAI" in class_name:
base_url = str((model_params or {}).get("base_url", "")).lower()
if "openrouter" in base_url:
_require(OPENROUTER_API_KEY, "OPENROUTER_API_KEY", "OpenRouter")
return
if _is_configured(OPENAI_API_KEY) or _is_configured(OPENAI_BASE_URL):
return
raise ValueError(
f"OpenAI model '{model_choice}' selected but neither `OPENAI_API_KEY` nor `OPENAI_BASE_URL` is configured.\n"
"Set `OPENAI_API_KEY` for OpenAI cloud, or set `OPENAI_BASE_URL` for an OpenAI-compatible endpoint."
)
def refine_query(llm, user_input):
system_prompt = """
You are a Cybercrime Threat Intelligence Expert. Your task is to refine the provided user query that needs to be sent to darkweb search engines.
Rules:
1. Analyze the user query and think about how it can be improved to use as search engine query
2. Refine the user query by adding or removing words so that it returns the best result from dark web search engines
3. Don't use any logical operators (AND, OR, etc.)
4. Keep the final refined query limited to 5 words or less
5. Output just the user query and nothing else
INPUT:
"""
prompt_template = ChatPromptTemplate(
[("system", system_prompt), ("user", "{query}")]
)
chain = prompt_template | llm | StrOutputParser()
return chain.invoke({"query": user_input})
def filter_results(llm, query, results):
if not results:
return []
system_prompt = """
You are a Cybercrime Threat Intelligence Expert. You are given a dark web search query and a list of search results in the form of index, link and title.
Your task is select the Top 20 relevant results that best match the search query for user to investigate more.
Rule:
1. Output ONLY atmost top 20 indices (comma-separated list) no more than that that best match the input query
Search Query: {query}
Search Results:
"""
final_str = _generate_final_string(results)
prompt_template = ChatPromptTemplate(
[("system", system_prompt), ("user", "{results}")]
)
chain = prompt_template | llm | StrOutputParser()
try:
result_indices = chain.invoke({"query": query, "results": final_str})
except openai.RateLimitError as e:
print(
f"Rate limit error: {e} \n Truncating to Web titles only with 30 characters"
)
final_str = _generate_final_string(results, truncate=True)
result_indices = chain.invoke({"query": query, "results": final_str})
# Select top_k results using original (non-truncated) results
parsed_indices = []
for match in re.findall(r"\d+", result_indices):
try:
idx = int(match)
if 1 <= idx <= len(results):
parsed_indices.append(idx)
except ValueError:
continue
# Remove duplicates while preserving order
seen = set()
parsed_indices = [
i for i in parsed_indices if not (i in seen or seen.add(i))
]
if not parsed_indices:
logging.warning(
"Unable to interpret LLM result selection ('%s'). "
"Defaulting to the top %s results.",
result_indices,
min(len(results), 20),
)
parsed_indices = list(range(1, min(len(results), 20) + 1))
top_results = [results[i - 1] for i in parsed_indices[:20]]
return top_results
def _generate_final_string(results, truncate=False):
"""
Generate a formatted string from the search results for LLM processing.
"""
if truncate:
# Use only the first 35 characters of the title
max_title_length = 30
# Do not use link at all
max_link_length = 0
final_str = []
for i, res in enumerate(results):
# Truncate link at .onion for display
truncated_link = re.sub(r"(?<=\.onion).*", "", res["link"])
title = re.sub(r"[^0-9a-zA-Z\-\.]", " ", res["title"])
if truncated_link == "" and title == "":
continue
if truncate:
# Truncate title to max_title_length characters
title = (
title[:max_title_length] + "..."
if len(title) > max_title_length
else title
)
# Truncate link to max_link_length characters
truncated_link = (
truncated_link[:max_link_length] + "..."
if len(truncated_link) > max_link_length
else truncated_link
)
final_str.append(f"{i+1}. {truncated_link} - {title}")
return "\n".join(s for s in final_str)
def generate_summary(llm, query, content):
system_prompt = """
You are an Cybercrime Threat Intelligence Expert tasked with generating context-based technical investigative insights from dark web osint search engine results.
Rules:
1. Analyze the Darkweb OSINT data provided using links and their raw text.
2. Output the Source Links referenced for the analysis.
3. Provide a detailed, contextual, evidence-based technical analysis of the data.
4. Provide intellgience artifacts along with their context visible in the data.
5. The artifacts can include indicators like name, email, phone, cryptocurrency addresses, domains, darkweb markets, forum names, threat actor information, malware names, TTPs, etc.
6. Generate 3-5 key insights based on the data.
7. Each insight should be specific, actionable, context-based, and data-driven.
8. Include suggested next steps and queries for investigating more on the topic.
9. Be objective and analytical in your assessment.
10. Ignore not safe for work texts from the analysis
Output Format:
1. Input Query: {query}
2. Source Links Referenced for Analysis - this heading will include all source links used for the analysis
3. Investigation Artifacts - this heading will include all technical artifacts identified including name, email, phone, cryptocurrency addresses, domains, darkweb markets, forum names, threat actor information, malware names, etc.
4. Key Insights
5. Next Steps - this includes next investigative steps including search queries to search more on a specific artifacts for example or any other topic.
Format your response in a structured way with clear section headings.
INPUT:
"""
prompt_template = ChatPromptTemplate(
[("system", system_prompt), ("user", "{content}")]
)
chain = prompt_template | llm | StrOutputParser()
return chain.invoke({"query": query, "content": content})