|
|
import re, os, threading, queue, requests, time
|
|
|
from typing import List, Optional, Union
|
|
|
from pydantic import BaseModel, Field
|
|
|
from pydantic_settings import BaseSettings
|
|
|
|
|
|
from api_types import ChatMessage
|
|
|
|
|
|
|
|
|
def parse_think_response(full_response: str):
|
|
|
think_start = full_response.find("<think")
|
|
|
if think_start == -1:
|
|
|
return None, full_response.strip()
|
|
|
|
|
|
think_end = full_response.find("</think>")
|
|
|
if think_end == -1:
|
|
|
reasoning = full_response[think_start:].strip()
|
|
|
content = ""
|
|
|
else:
|
|
|
reasoning = full_response[think_start : think_end + 9].strip()
|
|
|
content = full_response[think_end + 9 :].strip()
|
|
|
|
|
|
|
|
|
reasoning_content = reasoning.replace("<think", "").replace("</think>", "").strip()
|
|
|
return reasoning_content, content
|
|
|
|
|
|
|
|
|
def cleanMessages(messages: List[ChatMessage], removeThinkingContent: bool = False):
|
|
|
promptStrList = []
|
|
|
|
|
|
for message in messages:
|
|
|
content = message.content.strip()
|
|
|
content = re.sub(r"\n+", "\n", content)
|
|
|
promptStrList.append(
|
|
|
f"{message.role.strip().lower().capitalize()}: {content if message.role.strip().lower().capitalize()!='Assistant' or not removeThinkingContent else remove_nested_think_tags_stack(content)}"
|
|
|
)
|
|
|
|
|
|
return "\n\n".join(promptStrList)
|
|
|
|
|
|
|
|
|
def remove_nested_think_tags_stack(text):
|
|
|
stack = []
|
|
|
result = ""
|
|
|
i = 0
|
|
|
while i < len(text):
|
|
|
if text[i : i + 7] == "<think>":
|
|
|
stack.append("<think>")
|
|
|
i += 7
|
|
|
elif text[i : i + 8] == "</think>":
|
|
|
if stack and stack[-1] == "<think>":
|
|
|
stack.pop()
|
|
|
i += 8
|
|
|
else:
|
|
|
result += text[i : i + 8]
|
|
|
i += 8
|
|
|
elif not stack:
|
|
|
result += text[i]
|
|
|
i += 1
|
|
|
else:
|
|
|
i += 1
|
|
|
return result
|
|
|
|
|
|
|
|
|
def format_bytes(size):
|
|
|
power = 2**10
|
|
|
n = 0
|
|
|
power_labels = {0: "", 1: "K", 2: "M", 3: "G", 4: "T"}
|
|
|
while size > power:
|
|
|
size /= power
|
|
|
n += 1
|
|
|
return f"{size:.4f}{power_labels[n]+'B'}"
|
|
|
|
|
|
|
|
|
LOGGER_QUEUE = queue.Queue(int(os.environ.get('LOGGER_QUEUE_SIZE', 100)))
|
|
|
|
|
|
|
|
|
def logger():
|
|
|
"""Background thread to post logs to LOG_PORT. Uses blocking get so the thread
|
|
|
will wait for items and won't spin when queue empty. Any errors are swallowed
|
|
|
to avoid crashing the logger thread.
|
|
|
"""
|
|
|
print("enable")
|
|
|
while True:
|
|
|
try:
|
|
|
item = LOGGER_QUEUE.get()
|
|
|
except Exception:
|
|
|
|
|
|
time.sleep(0.1)
|
|
|
continue
|
|
|
try:
|
|
|
LOG_PORT = os.environ.get("LOG_PORT")
|
|
|
if LOG_PORT:
|
|
|
|
|
|
requests.post(
|
|
|
LOG_PORT,
|
|
|
headers={"Content-Type": "application/json"},
|
|
|
json=item,
|
|
|
timeout=5,
|
|
|
)
|
|
|
except Exception:
|
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
if os.environ.get("LOG_PORT"):
|
|
|
|
|
|
t = threading.Thread(target=logger, daemon=True)
|
|
|
t.start()
|
|
|
|
|
|
|
|
|
def log(item):
|
|
|
try:
|
|
|
LOGGER_QUEUE.put_nowait(item)
|
|
|
except queue.Full:
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
print("LOG DROP: queue full, dropping log item")
|
|
|
except Exception:
|
|
|
pass
|
|
|
|
|
|
|
|
|
def web_search(query: str, top_k: int = 3) -> str:
|
|
|
"""Perform a simple web search via DuckDuckGo HTML and return top_k results as a combined string.
|
|
|
|
|
|
This is a lightweight fallback search that does not call external model services —
|
|
|
it queries a public search endpoint, parses titles/snippets/urls and returns them as
|
|
|
formatted text to be included into the model's prompt context.
|
|
|
"""
|
|
|
if not query or query.strip() == "":
|
|
|
return ""
|
|
|
try:
|
|
|
from bs4 import BeautifulSoup
|
|
|
except Exception:
|
|
|
return ""
|
|
|
try:
|
|
|
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"}
|
|
|
q = query.strip()
|
|
|
resp = requests.get("https://duckduckgo.com/html/", params={"q": q}, headers=headers, timeout=10)
|
|
|
soup = BeautifulSoup(resp.text, "html.parser")
|
|
|
|
|
|
results = []
|
|
|
for r in soup.find_all("div", class_="result", limit=top_k):
|
|
|
a = r.find("a", class_="result__a") or r.find("a", href=True)
|
|
|
title = a.get_text(strip=True) if a else ""
|
|
|
href = a.get("href") if a else ""
|
|
|
snippet = ""
|
|
|
s = r.find("a", class_="result__snippet") or r.find("div", class_="result__snippet")
|
|
|
if s:
|
|
|
snippet = s.get_text(strip=True)
|
|
|
results.append(f"{title} - {snippet} - {href}")
|
|
|
return "\n".join(results)
|
|
|
except Exception:
|
|
|
return ""
|
|
|
|
|
|
|
|
|
def calc(expr: str) -> str:
|
|
|
"""Safely evaluate a simple arithmetic expression and return the result as string.
|
|
|
|
|
|
This uses ast parsing to disallow attributes and only permit arithmetic operators.
|
|
|
"""
|
|
|
try:
|
|
|
import ast, operator as op
|
|
|
|
|
|
|
|
|
allowed_ops = {
|
|
|
ast.Add: op.add,
|
|
|
ast.Sub: op.sub,
|
|
|
ast.Mult: op.mul,
|
|
|
ast.Div: op.truediv,
|
|
|
ast.Pow: op.pow,
|
|
|
ast.BitXor: op.xor,
|
|
|
ast.USub: op.neg,
|
|
|
ast.Mod: op.mod,
|
|
|
ast.FloorDiv: op.floordiv,
|
|
|
}
|
|
|
|
|
|
def _eval(node):
|
|
|
if isinstance(node, ast.Num):
|
|
|
return node.n
|
|
|
elif isinstance(node, ast.BinOp):
|
|
|
left = _eval(node.left)
|
|
|
right = _eval(node.right)
|
|
|
op_type = type(node.op)
|
|
|
if op_type in allowed_ops:
|
|
|
return allowed_ops[op_type](left, right)
|
|
|
else:
|
|
|
raise ValueError("Unsupported operator")
|
|
|
elif isinstance(node, ast.UnaryOp):
|
|
|
operand = _eval(node.operand)
|
|
|
op_type = type(node.op)
|
|
|
if op_type in allowed_ops:
|
|
|
return allowed_ops[op_type](operand)
|
|
|
raise ValueError("Unsupported unary op")
|
|
|
else:
|
|
|
raise ValueError("Unsupported expression type")
|
|
|
|
|
|
node = ast.parse(expr, mode='eval')
|
|
|
result = _eval(node.body)
|
|
|
return str(result)
|
|
|
except Exception as e:
|
|
|
return f"ERROR: {e}"
|
|
|
|
|
|
|
|
|
def detect_tools_and_reasoning(text_or_messages) -> dict:
|
|
|
"""Detects whether web_search, calc, or reasoning are likely needed based on heuristics.
|
|
|
|
|
|
Accepts either a single string prompt or a list of ChatMessage. Returns a dict with booleans and detected tools list.
|
|
|
"""
|
|
|
if isinstance(text_or_messages, list):
|
|
|
try:
|
|
|
text = "\n\n".join([m.get('content', '') if isinstance(m, dict) else (getattr(m, 'content', '') or '') for m in text_or_messages if m])
|
|
|
except Exception:
|
|
|
text = ""
|
|
|
else:
|
|
|
text = str(text_or_messages or "")
|
|
|
|
|
|
t = text.lower()
|
|
|
|
|
|
need_calc = False
|
|
|
need_web_search = False
|
|
|
need_reasoning = False
|
|
|
need_universal = False
|
|
|
need_fetch_url = False
|
|
|
need_summarize = False
|
|
|
need_keywords = False
|
|
|
need_sentiment = False
|
|
|
need_translate = False
|
|
|
need_spell_check = False
|
|
|
need_format_code = False
|
|
|
need_explain_code = False
|
|
|
detected_tools = []
|
|
|
|
|
|
|
|
|
if (re.search(r"\d+\s*[-+*/%]\s*\d+", t) or (re.search(r"\b(calculate|compute|solve|evaluate|sum|add|subtract|multiply|divide)\b", t) and re.search(r"\d", t))):
|
|
|
need_calc = True
|
|
|
|
|
|
|
|
|
m = re.search(r"([\d\(\)\s+\-*/%^.]+)", text)
|
|
|
expr = m.group(0).strip() if m else None
|
|
|
|
|
|
if expr and not re.search(r"[-+*/%]", expr):
|
|
|
expr = None
|
|
|
detected_tools.append({"name": "calc", "args": {"expression": expr, "confidence": 0.95 if expr else 0.5}})
|
|
|
|
|
|
|
|
|
|
|
|
if (
|
|
|
re.search(r"\b(who is|who's|what is|what's|when is|where is|current|latest|news|is the president|president of|population of|capital of|how many|GDP of)\b", t)
|
|
|
and not re.search(r"\d+\s*[-+*/%]\s*\d+", t)
|
|
|
):
|
|
|
need_web_search = True
|
|
|
detected_tools.append({"name": "web_search", "args": {"query": text, "confidence": 0.9}})
|
|
|
|
|
|
|
|
|
if re.search(r"\b(explain|why|because|reason|prove|derive|compare|analysis|analysis:|evaluate|argue|consequence|trade-offs)\b", t):
|
|
|
need_reasoning = True
|
|
|
|
|
|
|
|
|
if re.search(r"\b(use (a )?tool|execute (a )?tool|call (a )?tool|function call|run tool|do this via a tool|invoke tool|call tool)\b", t):
|
|
|
need_universal = True
|
|
|
|
|
|
if re.search(r"https?://\S+", t) or re.search(r"\b(open|visit)\s+(https?://|www\.)", t):
|
|
|
need_fetch_url = True
|
|
|
m_url = re.search(r'https?://\S+', text)
|
|
|
url_val = m_url.group(0) if m_url else text
|
|
|
detected_tools.append({"name": "fetch_url", "args": {"url": url_val, "confidence": 0.85}})
|
|
|
|
|
|
if re.search(r"\btranslate\b.*to\s+([a-z]{2,})|\btraducir\b.*a\s+([a-z]{2,})", t):
|
|
|
need_translate = True
|
|
|
m = re.search(r"\btranslate\b.*to\s+([a-z]{2,})|\btraducir\b.*a\s+([a-z]{2,})", t)
|
|
|
tgt = (m.group(1) if m and m.group(1) else (m.group(2) if m and len(m.groups()) > 1 else 'en'))
|
|
|
detected_tools.append({"name": "translate", "args": {"text": text, "target_lang": tgt, "confidence": 0.85}})
|
|
|
|
|
|
if re.search(r"\b(summarize|summarise|tl;dr|tl;dr:)\b", t):
|
|
|
need_summarize = True
|
|
|
detected_tools.append({"name": "summarize", "args": {"text": text, "max_sentences": 3, "confidence": 0.8}})
|
|
|
|
|
|
if re.search(r"\b(keywords|key words|key terms|extract keywords)\b", t):
|
|
|
need_keywords = True
|
|
|
detected_tools.append({"name": "keywords", "args": {"text": text, "top_k": 5, "confidence": 0.78}})
|
|
|
|
|
|
if re.search(r"\b(sentiment|tone|is this positive|is this negative|what is the sentiment)\b", t):
|
|
|
need_sentiment = True
|
|
|
detected_tools.append({"name": "sentiment", "args": {"text": text, "confidence": 0.8}})
|
|
|
|
|
|
if re.search(r"```[a-zA-Z]*|format code|format this code|pretty print code", t):
|
|
|
need_format_code = True
|
|
|
detected_tools.append({"name": "format_code", "args": {"code": text, "language": "python", "confidence": 0.8}})
|
|
|
if re.search(r"\bexplain( this)? code\b|what does this (function|method|snippet) do", t):
|
|
|
need_explain_code = True
|
|
|
detected_tools.append({"name": "explain_code", "args": {"code": text, "language": "python", "confidence": 0.75}})
|
|
|
|
|
|
if re.search(r"\b(spell check|spellcheck|check spelling|corregir ortografía|revisar ortografía)\b", t):
|
|
|
need_spell_check = True
|
|
|
detected_tools.append({"name": "spell_check", "args": {"text": text, "confidence": 0.6}})
|
|
|
if re.search(r"\b(sentiment|tone|is this positive|is this negative|what is the sentiment)\b", t):
|
|
|
need_sentiment = True
|
|
|
detected_tools.append({"name": "sentiment", "args": {"text": text, "confidence": 0.8}})
|
|
|
|
|
|
|
|
|
|
|
|
confs = {
|
|
|
"calc_confidence": 0.95 if need_calc else 0.0,
|
|
|
"web_search_confidence": 0.9 if need_web_search else 0.0,
|
|
|
"reasoning_confidence": 0.85 if need_reasoning else 0.0,
|
|
|
"universal_confidence": 0.65 if need_universal else 0.0,
|
|
|
"translate_confidence": 0.85 if need_translate else 0.0,
|
|
|
"spell_check_confidence": 0.6 if need_spell_check else 0.0,
|
|
|
"format_code_confidence": 0.7 if need_format_code else 0.0,
|
|
|
"explain_code_confidence": 0.7 if need_explain_code else 0.0,
|
|
|
}
|
|
|
return {
|
|
|
"need_calc": need_calc,
|
|
|
"need_web_search": need_web_search,
|
|
|
"need_reasoning": need_reasoning,
|
|
|
"need_universal": need_universal,
|
|
|
"need_fetch_url": need_fetch_url,
|
|
|
"need_summarize": need_summarize,
|
|
|
"need_keywords": need_keywords,
|
|
|
"need_sentiment": need_sentiment,
|
|
|
"need_translate": need_translate,
|
|
|
"need_spell_check": need_spell_check,
|
|
|
"need_format_code": need_format_code,
|
|
|
"need_explain_code": need_explain_code,
|
|
|
"detected_tools": detected_tools,
|
|
|
"confidence": confs,
|
|
|
}
|
|
|
|
|
|
|
|
|
def fetch_url(url: str, max_chars: int = 20000) -> str:
|
|
|
"""Fetch the content of a URL and return cleaned text (strip HTML tags).
|
|
|
|
|
|
Returns a truncated plain-text string of up to `max_chars` characters.
|
|
|
"""
|
|
|
if not url:
|
|
|
return ""
|
|
|
try:
|
|
|
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"}
|
|
|
resp = requests.get(url, headers=headers, timeout=10)
|
|
|
if not resp.ok:
|
|
|
return ""
|
|
|
text = resp.text
|
|
|
|
|
|
try:
|
|
|
from bs4 import BeautifulSoup
|
|
|
|
|
|
soup = BeautifulSoup(text, "html.parser")
|
|
|
for s in soup(["script", "style"]):
|
|
|
s.decompose()
|
|
|
body = soup.get_text(separator=" \n ")
|
|
|
cleaned = re.sub(r"\s+", " ", body).strip()
|
|
|
return cleaned[:max_chars]
|
|
|
except Exception:
|
|
|
|
|
|
cleaned = re.sub(r"<[^>]+>", "", text)
|
|
|
cleaned = re.sub(r"\s+", " ", cleaned)
|
|
|
return cleaned[:max_chars]
|
|
|
except Exception:
|
|
|
return ""
|
|
|
|
|
|
|
|
|
def summarize_text(text: str, max_sentences: int = 3) -> str:
|
|
|
"""Naive summary by selecting the leading sentences (simple extractive summarizer).
|
|
|
|
|
|
This is intentionally simple to avoid heavy dependencies.
|
|
|
"""
|
|
|
if not text or not isinstance(text, str):
|
|
|
return ""
|
|
|
sents = re.split(r"(?<=[.!?])\s+", text.strip())
|
|
|
if len(sents) <= max_sentences:
|
|
|
return " ".join(sents).strip()
|
|
|
return " ".join(sents[:max_sentences]).strip()
|
|
|
|
|
|
|
|
|
def extract_keywords(text: str, top_k: int = 5) -> List[str]:
|
|
|
"""Return top_k frequent non-stopword tokens from text (naive extraction).
|
|
|
"""
|
|
|
if not text:
|
|
|
return []
|
|
|
try:
|
|
|
tokens = re.findall(r"\w+", text.lower())
|
|
|
stopwords = set(["the", "and", "is", "in", "to", "a", "an", "of", "for", "with", "on", "that", "this", "it", "as", "are"])
|
|
|
filtered = [t for t in tokens if t not in stopwords and len(t) > 2]
|
|
|
freq = {}
|
|
|
for t in filtered:
|
|
|
freq[t] = freq.get(t, 0) + 1
|
|
|
items = sorted(freq.items(), key=lambda x: -x[1])[:top_k]
|
|
|
return [k for k, v in items]
|
|
|
except Exception:
|
|
|
return []
|
|
|
|
|
|
|
|
|
def sentiment_analysis(text: str) -> dict:
|
|
|
"""Very basic lexicon-based sentiment analysis.
|
|
|
|
|
|
Returns an opinion: {sentiment: 'positive'/'neutral'/'negative', 'score': float }.
|
|
|
"""
|
|
|
if not text:
|
|
|
return {"sentiment": "neutral", "score": 0.0}
|
|
|
pos = set(["good", "great", "excellent", "positive", "success", "love", "like", "happy", "best"])
|
|
|
neg = set(["bad", "horrible", "poor", "negative", "hate", "dislike", "sad", "worst", "angry"])
|
|
|
tokens = re.findall(r"\w+", text.lower())
|
|
|
score = 0
|
|
|
for t in tokens:
|
|
|
if t in pos:
|
|
|
score += 1
|
|
|
elif t in neg:
|
|
|
score -= 1
|
|
|
if score > 0:
|
|
|
return {"sentiment": "positive", "score": float(score)}
|
|
|
if score < 0:
|
|
|
return {"sentiment": "negative", "score": float(score)}
|
|
|
return {"sentiment": "neutral", "score": 0.0}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def translate_text(text: str, target_lang: str = 'en') -> dict:
|
|
|
"""Translate text to target language using `googletrans` if available; otherwise return a no-op dict indicating translation is unavailable.
|
|
|
|
|
|
This is intentionally conservative; prefer server-side libraries if available.
|
|
|
"""
|
|
|
if not text:
|
|
|
return {"action": "translate", "result": "", "metadata": {"lang": target_lang, "confidence": 0.0}}
|
|
|
try:
|
|
|
import importlib.util
|
|
|
googletrans_spec = importlib.util.find_spec("googletrans")
|
|
|
if googletrans_spec is not None:
|
|
|
|
|
|
try:
|
|
|
import importlib
|
|
|
googletrans_spec = importlib.util.find_spec("googletrans")
|
|
|
if googletrans_spec is not None:
|
|
|
googletrans = importlib.import_module("googletrans")
|
|
|
Translator = getattr(googletrans, 'Translator', None)
|
|
|
if Translator:
|
|
|
t = Translator()
|
|
|
res = t.translate(text, dest=target_lang)
|
|
|
return {"action": "translate", "result": res.text, "metadata": {"lang": target_lang, "confidence": 0.9}}
|
|
|
except Exception:
|
|
|
pass
|
|
|
|
|
|
return {"action": "translate", "result": f"[translated to {target_lang}]: {text}", "metadata": {"lang": target_lang, "confidence": 0.0}}
|
|
|
except Exception:
|
|
|
return {"action": "translate", "result": f"[translated to {target_lang}]: {text}", "metadata": {"lang": target_lang, "confidence": 0.0}}
|
|
|
|
|
|
|
|
|
def spell_check_text(text: str) -> dict:
|
|
|
"""Naive spell check that returns the original text and a no-op list of suggestions.
|
|
|
|
|
|
If libraries like `textblob` are installed, would provide suggestions; fallback to identity.
|
|
|
"""
|
|
|
if not text:
|
|
|
return {"action": "spell_check", "result": text, "metadata": {"suggestions": [], "confidence": 0.0}}
|
|
|
try:
|
|
|
import importlib.util
|
|
|
textblob_spec = importlib.util.find_spec("textblob")
|
|
|
if textblob_spec is not None:
|
|
|
try:
|
|
|
textblob = importlib.import_module("textblob")
|
|
|
TextBlob = getattr(textblob, "TextBlob", None)
|
|
|
if TextBlob is not None:
|
|
|
tb = TextBlob(text)
|
|
|
corrected = str(tb.correct())
|
|
|
if corrected != text:
|
|
|
return {"action": "spell_check", "result": corrected, "metadata": {"suggestions": [corrected], "confidence": 0.9}}
|
|
|
except Exception:
|
|
|
pass
|
|
|
except Exception:
|
|
|
pass
|
|
|
return {"action": "spell_check", "result": text, "metadata": {"suggestions": [], "confidence": 0.0}}
|
|
|
|
|
|
|
|
|
def format_code_text(code: str, lang: str = 'python') -> dict:
|
|
|
"""Simple code formatting: attempts to run `black` if available; otherwise returns code unchanged.
|
|
|
"""
|
|
|
if not code:
|
|
|
return {"action": "format_code", "result": code, "metadata": {"lang": lang, "confidence": 0.0}}
|
|
|
try:
|
|
|
try:
|
|
|
try:
|
|
|
import importlib.util
|
|
|
black_spec = importlib.util.find_spec("black")
|
|
|
if black_spec is not None:
|
|
|
black = importlib.import_module("black")
|
|
|
else:
|
|
|
black = None
|
|
|
except ImportError:
|
|
|
black = None
|
|
|
if black is not None:
|
|
|
mode = black.Mode()
|
|
|
formatted = black.format_str(code, mode=mode)
|
|
|
return {"action": "format_code", "result": formatted, "metadata": {"lang": lang, "confidence": 0.95}}
|
|
|
else:
|
|
|
|
|
|
cleaned = '\n'.join([ln.rstrip() for ln in code.splitlines()])
|
|
|
return {"action": "format_code", "result": cleaned, "metadata": {"lang": lang, "confidence": 0.0}}
|
|
|
except Exception:
|
|
|
|
|
|
cleaned = '\n'.join([ln.rstrip() for ln in code.splitlines()])
|
|
|
return {"action": "format_code", "result": cleaned, "metadata": {"lang": lang, "confidence": 0.0}}
|
|
|
except Exception:
|
|
|
return {"action": "format_code", "result": code, "metadata": {"lang": lang, "confidence": 0.0}}
|
|
|
|
|
|
|
|
|
def explain_code_text(code: str, lang: str = 'python') -> dict:
|
|
|
"""Return a basic explanation by summarizing comments and high level function names.
|
|
|
|
|
|
This is intentionally naive; future improvement: pass to an LLM or specialized parser.
|
|
|
"""
|
|
|
if not code:
|
|
|
return {"action": "explain_code", "result": "", "metadata": {"lang": lang}}
|
|
|
try:
|
|
|
|
|
|
funcs = re.findall(r"def\s+(\w+)\s*\(", code)
|
|
|
comments = re.findall(r"#(.+)", code)
|
|
|
summary = []
|
|
|
if funcs:
|
|
|
summary.append(f"Functions: {', '.join(funcs)}")
|
|
|
if comments:
|
|
|
summary.append("Comments: " + "; ".join([c.strip() for c in comments[:3]]))
|
|
|
if not summary:
|
|
|
|
|
|
lines = [l.strip() for l in code.splitlines() if l.strip()]
|
|
|
summary.append(lines[0] if lines else "No content")
|
|
|
return {"action": "explain_code", "result": " | ".join(summary), "metadata": {"lang": lang, "confidence": 0.6}}
|
|
|
except Exception:
|
|
|
return {"action": "explain_code", "result": "", "metadata": {"lang": lang, "confidence": 0.0}}
|
|
|
|
|
|
|
|
|
def ensure_upload_dir():
|
|
|
from config import CONFIG
|
|
|
try:
|
|
|
os.makedirs(CONFIG.UPLOAD_DIR, exist_ok=True)
|
|
|
except Exception:
|
|
|
pass
|
|
|
|
|
|
|
|
|
from typing import Optional
|
|
|
|
|
|
|
|
|
def save_bytes_to_upload(filename: Optional[str], data: bytes) -> dict:
|
|
|
from config import CONFIG
|
|
|
import hashlib, time, uuid
|
|
|
|
|
|
ensure_upload_dir()
|
|
|
_id = str(uuid.uuid4())
|
|
|
safe_name = f"{_id}_{os.path.basename(str(filename or 'uploaded_file'))}"
|
|
|
path = os.path.join(CONFIG.UPLOAD_DIR, safe_name)
|
|
|
try:
|
|
|
with open(path, 'wb') as f:
|
|
|
f.write(data)
|
|
|
size = os.path.getsize(path)
|
|
|
import mimetypes
|
|
|
mime_type = mimetypes.guess_type(path)[0]
|
|
|
return {
|
|
|
'file_id': _id,
|
|
|
'filename': filename,
|
|
|
'path': path,
|
|
|
'mime_type': mime_type,
|
|
|
'size': size,
|
|
|
'uploaded_at': int(time.time()),
|
|
|
}
|
|
|
except Exception as e:
|
|
|
return {'error': str(e)}
|
|
|
|
|
|
|
|
|
def file_read_from_path(path: str, max_bytes: int = 100000) -> str:
|
|
|
try:
|
|
|
if not path or not os.path.exists(path):
|
|
|
return ""
|
|
|
with open(path, 'rb') as f:
|
|
|
b = f.read(max_bytes)
|
|
|
try:
|
|
|
return b.decode('utf-8', errors='replace')
|
|
|
except Exception:
|
|
|
return str(b)
|
|
|
except Exception:
|
|
|
return ""
|
|
|
|
|
|
|
|
|
def universal_tool(args: dict, allow_web_search: bool = True, allow_tools: bool = True, allow_file_tool: bool = True) -> dict:
|
|
|
"""Universal tool: if 'action' is provided, call the corresponding tool; otherwise autodetect using heuristics.
|
|
|
|
|
|
Supported actions: 'calc', 'web_search', 'file_upload', 'file_read'. If the action is not provided, attempt to detect the appropriate tool.
|
|
|
Returns a string result for prompt injection.
|
|
|
"""
|
|
|
if not isinstance(args, dict):
|
|
|
return {"error": "ERROR: invalid args for universal tool"}
|
|
|
|
|
|
action = args.get("action")
|
|
|
query = args.get("query")
|
|
|
|
|
|
if action == "calc":
|
|
|
if not allow_tools:
|
|
|
return {"action": "calc", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}}
|
|
|
expr = args.get("expression") or query
|
|
|
if not expr:
|
|
|
return {"action": "calc", "result": None, "metadata": {"error": "no expression provided", "confidence": 0.0}}
|
|
|
res = calc(str(expr))
|
|
|
return {"action": "calc", "result": str(res), "metadata": {"expression": expr, "confidence": 0.98}}
|
|
|
if action == "web_search":
|
|
|
if not allow_web_search:
|
|
|
return {"action": "web_search", "result": "", "metadata": {"error": "disabled_by_policy", "confidence": 0.0}}
|
|
|
q = args.get("query") or query
|
|
|
if not q:
|
|
|
return {"action": "web_search", "result": "", "metadata": {"confidence": 0.0}}
|
|
|
res = web_search(str(q), int(args.get("top_k") or 3))
|
|
|
return {"action": "web_search", "result": str(res), "metadata": {"query": q, "top_k": int(args.get("top_k") or 3), "confidence": 0.9}}
|
|
|
if action == 'file_read':
|
|
|
if not allow_file_tool:
|
|
|
return {"action": "file_read", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}}
|
|
|
fpath = args.get('path') or args.get('file_path')
|
|
|
if not fpath and args.get('file_id'):
|
|
|
from config import CONFIG
|
|
|
fid = args.get('file_id')
|
|
|
if fid:
|
|
|
candidate = os.path.join(CONFIG.UPLOAD_DIR, os.path.basename(str(fid)))
|
|
|
else:
|
|
|
candidate = None
|
|
|
if candidate and os.path.exists(candidate):
|
|
|
fpath = candidate
|
|
|
if not fpath:
|
|
|
return {"action": "file_read", "result": None, "metadata": {"error": "no_path_or_id", "confidence": 0.0}}
|
|
|
content = file_read_from_path(fpath, int(args.get('max_bytes') or 100000))
|
|
|
return {"action": "file_read", "result": str(content), "metadata": {"path": fpath, "confidence": 0.9}}
|
|
|
if action == 'file_upload':
|
|
|
if not allow_file_tool:
|
|
|
return {"action": "file_upload", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}}
|
|
|
|
|
|
import base64
|
|
|
|
|
|
fname = args.get('filename') or args.get('name') or 'uploaded_file'
|
|
|
content_b64 = args.get('content_base64') or args.get('content')
|
|
|
if not content_b64:
|
|
|
return {"action": "file_upload", "result": None, "metadata": {"error": "no_content", "confidence": 0.0}}
|
|
|
|
|
|
try:
|
|
|
if isinstance(content_b64, str):
|
|
|
b = None
|
|
|
try:
|
|
|
b = base64.b64decode(content_b64, validate=True)
|
|
|
except Exception:
|
|
|
b = str(content_b64).encode('utf-8')
|
|
|
else:
|
|
|
b = content_b64 if isinstance(content_b64, (bytes, bytearray)) else str(content_b64).encode('utf-8')
|
|
|
except Exception:
|
|
|
return {"action": "file_upload", "result": None, "metadata": {"error": "invalid_content", "confidence": 0.0}}
|
|
|
|
|
|
try:
|
|
|
from config import CONFIG
|
|
|
|
|
|
if len(b) > getattr(CONFIG, 'MAX_UPLOAD_SIZE_BYTES', 10 * 1024 * 1024):
|
|
|
return {"action": "file_upload", "result": None, "metadata": {"error": "file_too_large", "confidence": 0.0}}
|
|
|
except Exception:
|
|
|
pass
|
|
|
|
|
|
meta = None
|
|
|
try:
|
|
|
|
|
|
import importlib
|
|
|
app_module = importlib.import_module('app')
|
|
|
if hasattr(app_module, 'upload_file_internal'):
|
|
|
try:
|
|
|
meta = app_module.upload_file_internal(b, filename=fname)
|
|
|
except Exception:
|
|
|
meta = save_bytes_to_upload(fname, b)
|
|
|
|
|
|
try:
|
|
|
if hasattr(app_module, 'UPLOADED_FILES') and isinstance(app_module.UPLOADED_FILES, dict):
|
|
|
app_module.UPLOADED_FILES[meta['file_id']] = meta
|
|
|
except Exception:
|
|
|
pass
|
|
|
else:
|
|
|
meta = save_bytes_to_upload(fname, b)
|
|
|
try:
|
|
|
if hasattr(app_module, 'UPLOADED_FILES') and isinstance(app_module.UPLOADED_FILES, dict):
|
|
|
app_module.UPLOADED_FILES[meta['file_id']] = meta
|
|
|
except Exception:
|
|
|
pass
|
|
|
except Exception:
|
|
|
|
|
|
meta = save_bytes_to_upload(fname, b)
|
|
|
return {"action": "file_upload", "result": meta, "metadata": {"filename": fname, "file_id": meta.get('file_id'), "confidence": 0.9}}
|
|
|
if action == 'fetch_url':
|
|
|
if not allow_web_search:
|
|
|
return {"action": "fetch_url", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}}
|
|
|
url = args.get('url') or query
|
|
|
if not url:
|
|
|
return {"action": "fetch_url", "result": None, "metadata": {"error": "no_url_provided", "confidence": 0.0}}
|
|
|
content = fetch_url(str(url), int(args.get('max_chars') or 20000))
|
|
|
return {"action": "fetch_url", "result": str(content), "metadata": {"url": url, "confidence": 0.9}}
|
|
|
if action == 'summarize':
|
|
|
if not allow_tools:
|
|
|
return {"action": "summarize", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}}
|
|
|
txt = args.get('text') or ''
|
|
|
if not txt and args.get('url'):
|
|
|
try:
|
|
|
txt = fetch_url(str(args.get('url')))
|
|
|
except Exception:
|
|
|
txt = ''
|
|
|
if not txt and query:
|
|
|
txt = query
|
|
|
if not txt:
|
|
|
return {"action": "summarize", "result": None, "metadata": {"error": "no_text_or_url_provided", "confidence": 0.0}}
|
|
|
s = summarize_text(str(txt), int(args.get('max_sentences') or 3))
|
|
|
return {"action": "summarize", "result": s, "metadata": {"confidence": 0.85}}
|
|
|
if action == 'keywords' or action == 'keyword_extraction':
|
|
|
if not allow_tools:
|
|
|
return {"action": "keywords", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}}
|
|
|
txt = args.get('text') or ''
|
|
|
if not txt and args.get('url'):
|
|
|
try:
|
|
|
txt = fetch_url(str(args.get('url')))
|
|
|
except Exception:
|
|
|
txt = ''
|
|
|
if not txt and query:
|
|
|
txt = query
|
|
|
if not txt:
|
|
|
return {"action": "keywords", "result": None, "metadata": {"error": "no_text_or_url_provided", "confidence": 0.0}}
|
|
|
kws = extract_keywords(str(txt), int(args.get('top_k') or 5))
|
|
|
return {"action": "keywords", "result": kws, "metadata": {"confidence": 0.85}}
|
|
|
if action == 'sentiment':
|
|
|
if not allow_tools:
|
|
|
return {"action": "sentiment", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}}
|
|
|
txt = args.get('text') or ''
|
|
|
if not txt and args.get('url'):
|
|
|
try:
|
|
|
txt = fetch_url(str(args.get('url')))
|
|
|
except Exception:
|
|
|
txt = ''
|
|
|
if not txt and query:
|
|
|
txt = query
|
|
|
if not txt:
|
|
|
return {"action": "sentiment", "result": None, "metadata": {"error": "no_text_or_url_provided", "confidence": 0.0}}
|
|
|
res = sentiment_analysis(str(txt))
|
|
|
return {"action": "sentiment", "result": res, "metadata": {"confidence": 0.85}}
|
|
|
if action == 'translate':
|
|
|
if not allow_tools:
|
|
|
return {"action": "translate", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}}
|
|
|
txt = args.get('text') or query or ''
|
|
|
target = args.get('target') or 'en'
|
|
|
res = translate_text(str(txt), str(target))
|
|
|
return {"action": "translate", "result": res.get('result'), "metadata": {"lang": res.get('lang'), "note": res.get('note'), "confidence": 0.5}}
|
|
|
if action == 'spell_check' or action == 'spellcheck':
|
|
|
if not allow_tools:
|
|
|
return {"action": "spell_check", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}}
|
|
|
txt = args.get('text') or query or ''
|
|
|
res = spell_check_text(str(txt))
|
|
|
return {"action": "spell_check", "result": res.get('result'), "metadata": {"corrections": res.get('corrections'), "confidence": 0.5}}
|
|
|
if action == 'format_code' or action == 'format':
|
|
|
if not allow_tools:
|
|
|
return {"action": "format_code", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}}
|
|
|
txt = args.get('text') or query or ''
|
|
|
lang = args.get('language') or args.get('lang') or 'python'
|
|
|
res = format_code_text(txt, lang)
|
|
|
return {"action": "format_code", "result": res.get('result'), "metadata": {"note": res.get('note'), "confidence": 0.6}}
|
|
|
if action == 'explain_code' or action == 'explain':
|
|
|
if not allow_tools:
|
|
|
return {"action": "explain_code", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}}
|
|
|
txt = args.get('text') or query or ''
|
|
|
lang = args.get('language') or args.get('lang') or 'python'
|
|
|
res = explain_code_text(txt, lang)
|
|
|
|
|
|
if isinstance(res, dict):
|
|
|
ds = res.get('docstrings') or []
|
|
|
expl = res.get('explanation') or (ds[0] if isinstance(ds, list) and len(ds) > 0 else '')
|
|
|
else:
|
|
|
expl = str(res)
|
|
|
return {"action": "explain_code", "result": expl, "metadata": {"docstrings": res.get('docstrings'), "confidence": 0.6}}
|
|
|
|
|
|
|
|
|
if query:
|
|
|
|
|
|
if re.search(r"\d+\s*[-+*/%]\s*\d+", str(query)):
|
|
|
if not allow_tools:
|
|
|
return {"action": "calc", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}}
|
|
|
res = calc(str(query))
|
|
|
return {"action": "calc", "result": str(res), "metadata": {"expression": str(query), "confidence": 0.95}}
|
|
|
|
|
|
if re.search(r"https?://\S+", str(query)):
|
|
|
if not allow_web_search:
|
|
|
return {"action": "fetch_url", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}}
|
|
|
content = fetch_url(str(query), int(args.get('max_chars') or 20000))
|
|
|
return {"action": "fetch_url", "result": str(content), "metadata": {"url": str(query), "confidence": 0.9}}
|
|
|
|
|
|
if re.search(r"\btranslate\b.*to\s+([a-z]{2,})", str(query).lower()):
|
|
|
import re as _re
|
|
|
|
|
|
m = _re.search(r"\btranslate\b.*to\s+([a-z]{2,})", str(query).lower())
|
|
|
tgt = m.group(1) if m else 'en'
|
|
|
if not allow_tools:
|
|
|
return {"action": "translate", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}}
|
|
|
res = translate_text(str(query), tgt)
|
|
|
return res
|
|
|
|
|
|
if re.search(r"```[a-zA-Z]*|format code|format this code|pretty print code", str(query).lower()):
|
|
|
if not allow_tools:
|
|
|
return {"action": "format_code", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}}
|
|
|
code = str(query)
|
|
|
res = format_code_text(code)
|
|
|
return res
|
|
|
|
|
|
if re.search(r"\b(summarize|summarise|tl;dr)\b", str(query).lower()):
|
|
|
if not allow_tools:
|
|
|
return {"action": "summarize", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}}
|
|
|
s = summarize_text(str(query))
|
|
|
return {"action": "summarize", "result": s, "metadata": {"confidence": 0.85}}
|
|
|
|
|
|
if re.search(r"\b(keywords|key terms|extract keywords)\b", str(query).lower()):
|
|
|
if not allow_tools:
|
|
|
return {"action": "keywords", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}}
|
|
|
kws = extract_keywords(str(query))
|
|
|
return {"action": "keywords", "result": kws, "metadata": {"confidence": 0.78}}
|
|
|
|
|
|
if re.search(r"\b(sentiment|tone|is this positive|is this negative|what is the sentiment)\b", str(query).lower()):
|
|
|
if not allow_tools:
|
|
|
return {"action": "sentiment", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}}
|
|
|
res = sentiment_analysis(str(query))
|
|
|
return {"action": "sentiment", "result": res, "metadata": {"confidence": 0.8}}
|
|
|
if not allow_web_search:
|
|
|
return {"action": "web_search", "result": "", "metadata": {"error": "disabled_by_policy", "confidence": 0.0}}
|
|
|
res = web_search(str(query), int(args.get("top_k") or 3))
|
|
|
return {"action": "web_search", "result": str(res), "metadata": {"query": str(query), "top_k": int(args.get("top_k") or 3), "confidence": 0.9}}
|
|
|
|
|
|
return {"error": "ERROR: could not determine action for universal tool"}
|
|
|
|
|
|
|
|
|
def bias_mitigation(text: str) -> dict:
|
|
|
"""A light-weight bias mitigation helper.
|
|
|
|
|
|
The goal: detect and neutralize potentially biased, stereotyping, or discriminatory statements.
|
|
|
It's intentionally conservative (favoring suppression) and returns sanitized content and a flag.
|
|
|
"""
|
|
|
import re
|
|
|
if not text or not isinstance(text, str):
|
|
|
return {"sanitized": text, "suppressed": False, "reason": None}
|
|
|
|
|
|
t = text.strip()
|
|
|
|
|
|
|
|
|
protected_terms = [
|
|
|
r"\b(race|religion|ethnicity|gender|sexual orientation|disability)\b",
|
|
|
r"\b(black|white|asian|hispanic|muslim|christian|jewish|gay|lesbian|transgender)\b",
|
|
|
]
|
|
|
sweeping_patterns = [
|
|
|
r"\b(all|always|never|every|none)\b[^.?!]{0,60}\b(is|are|will|should|must)\b",
|
|
|
r"\b(\w+)s?\b[^.?!]{0,60}\b(are|is)\b[^.?!]{0,80}\b(inferior|superior|stupid|lazy|criminal)\b",
|
|
|
]
|
|
|
|
|
|
slurs = [r"\b(slur1|slur2)\b"]
|
|
|
|
|
|
for pattern in sweeping_patterns:
|
|
|
if re.search(pattern, t, flags=re.I):
|
|
|
|
|
|
for pt in protected_terms:
|
|
|
if re.search(pt, t, flags=re.I):
|
|
|
return {"sanitized": "[content suppressed due to potential bias]", "suppressed": True, "reason": "sweeping_generalization_protected_group"}
|
|
|
|
|
|
for s in slurs:
|
|
|
if re.search(s, t, flags=re.I):
|
|
|
return {"sanitized": "[content suppressed due to policy]", "suppressed": True, "reason": "profanity_or_slur"}
|
|
|
|
|
|
|
|
|
if re.search(r"\b(president|prime minister|dictator|election|vote|politician)\b", t, flags=re.I) and re.search(r"\b(is|are|will|should)\b[^.?!]{0,80}\b(incompetent|corrupt|traitor|criminal)\b", t, flags=re.I):
|
|
|
|
|
|
sanitized = re.sub(r"\b(is|are|will|should)\b[^.?!]{0,80}\b(incompetent|corrupt|traitor|criminal)\b", "may have actions that deserve scrutiny", t, flags=re.I)
|
|
|
return {"sanitized": sanitized, "suppressed": False, "reason": "political_neutralization"}
|
|
|
|
|
|
return {"sanitized": text, "suppressed": False, "reason": None}
|
|
|
|