Rwkv-xd / utils.py
Ksjsjjdj's picture
Upload 4 files
1d08aca verified
import re, os, threading, queue, requests, time
from typing import List, Optional, Union
from pydantic import BaseModel, Field
from pydantic_settings import BaseSettings
from api_types import ChatMessage
def parse_think_response(full_response: str):
think_start = full_response.find("<think")
if think_start == -1:
return None, full_response.strip()
think_end = full_response.find("</think>")
if think_end == -1: # 未闭合的情况
reasoning = full_response[think_start:].strip()
content = ""
else:
reasoning = full_response[think_start : think_end + 9].strip() # +9包含完整标签
content = full_response[think_end + 9 :].strip()
# 清理标签保留内容
reasoning_content = reasoning.replace("<think", "").replace("</think>", "").strip()
return reasoning_content, content
def cleanMessages(messages: List[ChatMessage], removeThinkingContent: bool = False):
promptStrList = []
for message in messages:
content = message.content.strip()
content = re.sub(r"\n+", "\n", content)
promptStrList.append(
f"{message.role.strip().lower().capitalize()}: {content if message.role.strip().lower().capitalize()!='Assistant' or not removeThinkingContent else remove_nested_think_tags_stack(content)}"
)
return "\n\n".join(promptStrList)
def remove_nested_think_tags_stack(text):
stack = []
result = ""
i = 0
while i < len(text):
if text[i : i + 7] == "<think>":
stack.append("<think>")
i += 7
elif text[i : i + 8] == "</think>":
if stack and stack[-1] == "<think>":
stack.pop()
i += 8
else:
result += text[i : i + 8]
i += 8
elif not stack:
result += text[i]
i += 1
else:
i += 1
return result
def format_bytes(size):
power = 2**10
n = 0
power_labels = {0: "", 1: "K", 2: "M", 3: "G", 4: "T"}
while size > power:
size /= power
n += 1
return f"{size:.4f}{power_labels[n]+'B'}"
LOGGER_QUEUE = queue.Queue(int(os.environ.get('LOGGER_QUEUE_SIZE', 100)))
def logger():
"""Background thread to post logs to LOG_PORT. Uses blocking get so the thread
will wait for items and won't spin when queue empty. Any errors are swallowed
to avoid crashing the logger thread.
"""
print("enable")
while True:
try:
item = LOGGER_QUEUE.get()
except Exception:
# If queue is unexpectedly closed or an error occurs, keep running
time.sleep(0.1)
continue
try:
LOG_PORT = os.environ.get("LOG_PORT")
if LOG_PORT:
# Best-effort; ignore any network error
requests.post(
LOG_PORT,
headers={"Content-Type": "application/json"},
json=item,
timeout=5,
)
except Exception:
# never let log failures escape to the main thread
pass
if os.environ.get("LOG_PORT"):
# make the logger thread a daemon so it won't block process exit
t = threading.Thread(target=logger, daemon=True)
t.start()
def log(item):
try:
LOGGER_QUEUE.put_nowait(item)
except queue.Full:
# Queue is full: drop the log (best-effort). Avoid raising to keep the
# application responsive; optionally print a fallback log to console
try:
# Use a short, non-blocking print so at least something is recorded
print("LOG DROP: queue full, dropping log item")
except Exception:
pass
def web_search(query: str, top_k: int = 3) -> str:
"""Perform a simple web search via DuckDuckGo HTML and return top_k results as a combined string.
This is a lightweight fallback search that does not call external model services —
it queries a public search endpoint, parses titles/snippets/urls and returns them as
formatted text to be included into the model's prompt context.
"""
if not query or query.strip() == "":
return ""
try:
from bs4 import BeautifulSoup
except Exception:
return ""
try:
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"}
q = query.strip()
resp = requests.get("https://duckduckgo.com/html/", params={"q": q}, headers=headers, timeout=10)
soup = BeautifulSoup(resp.text, "html.parser")
# DuckDuckGo's html structure: results are in `div.result` containers.
results = []
for r in soup.find_all("div", class_="result", limit=top_k):
a = r.find("a", class_="result__a") or r.find("a", href=True)
title = a.get_text(strip=True) if a else ""
href = a.get("href") if a else ""
snippet = ""
s = r.find("a", class_="result__snippet") or r.find("div", class_="result__snippet")
if s:
snippet = s.get_text(strip=True)
results.append(f"{title} - {snippet} - {href}")
return "\n".join(results)
except Exception:
return ""
def calc(expr: str) -> str:
"""Safely evaluate a simple arithmetic expression and return the result as string.
This uses ast parsing to disallow attributes and only permit arithmetic operators.
"""
try:
import ast, operator as op
# supported operators
allowed_ops = {
ast.Add: op.add,
ast.Sub: op.sub,
ast.Mult: op.mul,
ast.Div: op.truediv,
ast.Pow: op.pow,
ast.BitXor: op.xor,
ast.USub: op.neg,
ast.Mod: op.mod,
ast.FloorDiv: op.floordiv,
}
def _eval(node):
if isinstance(node, ast.Num): # <number>
return node.n
elif isinstance(node, ast.BinOp):
left = _eval(node.left)
right = _eval(node.right)
op_type = type(node.op)
if op_type in allowed_ops:
return allowed_ops[op_type](left, right)
else:
raise ValueError("Unsupported operator")
elif isinstance(node, ast.UnaryOp):
operand = _eval(node.operand)
op_type = type(node.op)
if op_type in allowed_ops:
return allowed_ops[op_type](operand)
raise ValueError("Unsupported unary op")
else:
raise ValueError("Unsupported expression type")
node = ast.parse(expr, mode='eval')
result = _eval(node.body)
return str(result)
except Exception as e:
return f"ERROR: {e}"
def detect_tools_and_reasoning(text_or_messages) -> dict:
"""Detects whether web_search, calc, or reasoning are likely needed based on heuristics.
Accepts either a single string prompt or a list of ChatMessage. Returns a dict with booleans and detected tools list.
"""
if isinstance(text_or_messages, list):
try:
text = "\n\n".join([m.get('content', '') if isinstance(m, dict) else (getattr(m, 'content', '') or '') for m in text_or_messages if m])
except Exception:
text = ""
else:
text = str(text_or_messages or "")
t = text.lower()
# Simple heuristics
need_calc = False
need_web_search = False
need_reasoning = False
need_universal = False
need_fetch_url = False
need_summarize = False
need_keywords = False
need_sentiment = False
need_translate = False
need_spell_check = False
need_format_code = False
need_explain_code = False
detected_tools = []
# Heuristic for calc: presence of operators AND numbers OR keywords 'calculate/compute' plus numeric tokens
if (re.search(r"\d+\s*[-+*/%]\s*\d+", t) or (re.search(r"\b(calculate|compute|solve|evaluate|sum|add|subtract|multiply|divide)\b", t) and re.search(r"\d", t))):
need_calc = True
# Try to extract a most-likely arithmetic expression from the text
# Accept digits, parentheses and operators
m = re.search(r"([\d\(\)\s+\-*/%^.]+)", text)
expr = m.group(0).strip() if m else None
# only keep if it includes an operator
if expr and not re.search(r"[-+*/%]", expr):
expr = None
detected_tools.append({"name": "calc", "args": {"expression": expr, "confidence": 0.95 if expr else 0.5}})
# Heuristic for web search: 'who is', 'what is', 'current', 'latest', 'news', or question words with facts
# Heuristic for web search: question words + facts or 'current/latest' signals; avoid math queries
if (
re.search(r"\b(who is|who's|what is|what's|when is|where is|current|latest|news|is the president|president of|population of|capital of|how many|GDP of)\b", t)
and not re.search(r"\d+\s*[-+*/%]\s*\d+", t)
):
need_web_search = True
detected_tools.append({"name": "web_search", "args": {"query": text, "confidence": 0.9}})
# Heuristic for reasoning: words like 'explain', 'why', 'reason', 'prove', 'derive', 'compare'
if re.search(r"\b(explain|why|because|reason|prove|derive|compare|analysis|analysis:|evaluate|argue|consequence|trade-offs)\b", t):
need_reasoning = True
# Heuristic for universal tool: requests to "use tool", "execute tool", or generic function-call language
if re.search(r"\b(use (a )?tool|execute (a )?tool|call (a )?tool|function call|run tool|do this via a tool|invoke tool|call tool)\b", t):
need_universal = True
# detect fetch_url: a URL string or request to 'open' the link
if re.search(r"https?://\S+", t) or re.search(r"\b(open|visit)\s+(https?://|www\.)", t):
need_fetch_url = True
m_url = re.search(r'https?://\S+', text)
url_val = m_url.group(0) if m_url else text
detected_tools.append({"name": "fetch_url", "args": {"url": url_val, "confidence": 0.85}})
# detect translate requests: 'translate to es' or 'traducir a español'
if re.search(r"\btranslate\b.*to\s+([a-z]{2,})|\btraducir\b.*a\s+([a-z]{2,})", t):
need_translate = True
m = re.search(r"\btranslate\b.*to\s+([a-z]{2,})|\btraducir\b.*a\s+([a-z]{2,})", t)
tgt = (m.group(1) if m and m.group(1) else (m.group(2) if m and len(m.groups()) > 1 else 'en'))
detected_tools.append({"name": "translate", "args": {"text": text, "target_lang": tgt, "confidence": 0.85}})
# detect summarize requests ('summarize', 'tl;dr', 'summarise')
if re.search(r"\b(summarize|summarise|tl;dr|tl;dr:)\b", t):
need_summarize = True
detected_tools.append({"name": "summarize", "args": {"text": text, "max_sentences": 3, "confidence": 0.8}})
# detect keyword extraction requests
if re.search(r"\b(keywords|key words|key terms|extract keywords)\b", t):
need_keywords = True
detected_tools.append({"name": "keywords", "args": {"text": text, "top_k": 5, "confidence": 0.78}})
# detect sentiment analysis requests
if re.search(r"\b(sentiment|tone|is this positive|is this negative|what is the sentiment)\b", t):
need_sentiment = True
detected_tools.append({"name": "sentiment", "args": {"text": text, "confidence": 0.8}})
# detect code-format and explain: '```', 'explain code', 'what does this function do'
if re.search(r"```[a-zA-Z]*|format code|format this code|pretty print code", t):
need_format_code = True
detected_tools.append({"name": "format_code", "args": {"code": text, "language": "python", "confidence": 0.8}})
if re.search(r"\bexplain( this)? code\b|what does this (function|method|snippet) do", t):
need_explain_code = True
detected_tools.append({"name": "explain_code", "args": {"code": text, "language": "python", "confidence": 0.75}})
# detect spellcheck requests
if re.search(r"\b(spell check|spellcheck|check spelling|corregir ortografía|revisar ortografía)\b", t):
need_spell_check = True
detected_tools.append({"name": "spell_check", "args": {"text": text, "confidence": 0.6}})
if re.search(r"\b(sentiment|tone|is this positive|is this negative|what is the sentiment)\b", t):
need_sentiment = True
detected_tools.append({"name": "sentiment", "args": {"text": text, "confidence": 0.8}})
# compute confidence summary
# For now, we use a simple heuristic: reasoning >0.8 if key words present; web_search 0.9; calc 0.95 if numeric
confs = {
"calc_confidence": 0.95 if need_calc else 0.0,
"web_search_confidence": 0.9 if need_web_search else 0.0,
"reasoning_confidence": 0.85 if need_reasoning else 0.0,
"universal_confidence": 0.65 if need_universal else 0.0,
"translate_confidence": 0.85 if need_translate else 0.0,
"spell_check_confidence": 0.6 if need_spell_check else 0.0,
"format_code_confidence": 0.7 if need_format_code else 0.0,
"explain_code_confidence": 0.7 if need_explain_code else 0.0,
}
return {
"need_calc": need_calc,
"need_web_search": need_web_search,
"need_reasoning": need_reasoning,
"need_universal": need_universal,
"need_fetch_url": need_fetch_url,
"need_summarize": need_summarize,
"need_keywords": need_keywords,
"need_sentiment": need_sentiment,
"need_translate": need_translate,
"need_spell_check": need_spell_check,
"need_format_code": need_format_code,
"need_explain_code": need_explain_code,
"detected_tools": detected_tools,
"confidence": confs,
}
def fetch_url(url: str, max_chars: int = 20000) -> str:
"""Fetch the content of a URL and return cleaned text (strip HTML tags).
Returns a truncated plain-text string of up to `max_chars` characters.
"""
if not url:
return ""
try:
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"}
resp = requests.get(url, headers=headers, timeout=10)
if not resp.ok:
return ""
text = resp.text
# remove scripts/styles and HTML tags
try:
from bs4 import BeautifulSoup
soup = BeautifulSoup(text, "html.parser")
for s in soup(["script", "style"]):
s.decompose()
body = soup.get_text(separator=" \n ")
cleaned = re.sub(r"\s+", " ", body).strip()
return cleaned[:max_chars]
except Exception:
# fallback: naive strip
cleaned = re.sub(r"<[^>]+>", "", text)
cleaned = re.sub(r"\s+", " ", cleaned)
return cleaned[:max_chars]
except Exception:
return ""
def summarize_text(text: str, max_sentences: int = 3) -> str:
"""Naive summary by selecting the leading sentences (simple extractive summarizer).
This is intentionally simple to avoid heavy dependencies.
"""
if not text or not isinstance(text, str):
return ""
sents = re.split(r"(?<=[.!?])\s+", text.strip())
if len(sents) <= max_sentences:
return " ".join(sents).strip()
return " ".join(sents[:max_sentences]).strip()
def extract_keywords(text: str, top_k: int = 5) -> List[str]:
"""Return top_k frequent non-stopword tokens from text (naive extraction).
"""
if not text:
return []
try:
tokens = re.findall(r"\w+", text.lower())
stopwords = set(["the", "and", "is", "in", "to", "a", "an", "of", "for", "with", "on", "that", "this", "it", "as", "are"])
filtered = [t for t in tokens if t not in stopwords and len(t) > 2]
freq = {}
for t in filtered:
freq[t] = freq.get(t, 0) + 1
items = sorted(freq.items(), key=lambda x: -x[1])[:top_k]
return [k for k, v in items]
except Exception:
return []
def sentiment_analysis(text: str) -> dict:
"""Very basic lexicon-based sentiment analysis.
Returns an opinion: {sentiment: 'positive'/'neutral'/'negative', 'score': float }.
"""
if not text:
return {"sentiment": "neutral", "score": 0.0}
pos = set(["good", "great", "excellent", "positive", "success", "love", "like", "happy", "best"])
neg = set(["bad", "horrible", "poor", "negative", "hate", "dislike", "sad", "worst", "angry"])
tokens = re.findall(r"\w+", text.lower())
score = 0
for t in tokens:
if t in pos:
score += 1
elif t in neg:
score -= 1
if score > 0:
return {"sentiment": "positive", "score": float(score)}
if score < 0:
return {"sentiment": "negative", "score": float(score)}
return {"sentiment": "neutral", "score": 0.0}
# removed earlier naive duplicates in favor of featureful versions below
def translate_text(text: str, target_lang: str = 'en') -> dict:
"""Translate text to target language using `googletrans` if available; otherwise return a no-op dict indicating translation is unavailable.
This is intentionally conservative; prefer server-side libraries if available.
"""
if not text:
return {"action": "translate", "result": "", "metadata": {"lang": target_lang, "confidence": 0.0}}
try:
import importlib.util
googletrans_spec = importlib.util.find_spec("googletrans")
if googletrans_spec is not None:
# Only attempt import if googletrans is available
try:
import importlib
googletrans_spec = importlib.util.find_spec("googletrans")
if googletrans_spec is not None:
googletrans = importlib.import_module("googletrans")
Translator = getattr(googletrans, 'Translator', None)
if Translator:
t = Translator()
res = t.translate(text, dest=target_lang)
return {"action": "translate", "result": res.text, "metadata": {"lang": target_lang, "confidence": 0.9}}
except Exception:
pass
# Fallback: return an annotated prefix indicating translation was requested but not performed
return {"action": "translate", "result": f"[translated to {target_lang}]: {text}", "metadata": {"lang": target_lang, "confidence": 0.0}}
except Exception:
return {"action": "translate", "result": f"[translated to {target_lang}]: {text}", "metadata": {"lang": target_lang, "confidence": 0.0}}
def spell_check_text(text: str) -> dict:
"""Naive spell check that returns the original text and a no-op list of suggestions.
If libraries like `textblob` are installed, would provide suggestions; fallback to identity.
"""
if not text:
return {"action": "spell_check", "result": text, "metadata": {"suggestions": [], "confidence": 0.0}}
try:
import importlib.util
textblob_spec = importlib.util.find_spec("textblob")
if textblob_spec is not None:
try:
textblob = importlib.import_module("textblob")
TextBlob = getattr(textblob, "TextBlob", None)
if TextBlob is not None:
tb = TextBlob(text)
corrected = str(tb.correct())
if corrected != text:
return {"action": "spell_check", "result": corrected, "metadata": {"suggestions": [corrected], "confidence": 0.9}}
except Exception:
pass
except Exception:
pass
return {"action": "spell_check", "result": text, "metadata": {"suggestions": [], "confidence": 0.0}}
def format_code_text(code: str, lang: str = 'python') -> dict:
"""Simple code formatting: attempts to run `black` if available; otherwise returns code unchanged.
"""
if not code:
return {"action": "format_code", "result": code, "metadata": {"lang": lang, "confidence": 0.0}}
try:
try:
try:
import importlib.util
black_spec = importlib.util.find_spec("black")
if black_spec is not None:
black = importlib.import_module("black")
else:
black = None
except ImportError:
black = None
if black is not None:
mode = black.Mode()
formatted = black.format_str(code, mode=mode)
return {"action": "format_code", "result": formatted, "metadata": {"lang": lang, "confidence": 0.95}}
else:
# fallback: naive indentation/strip
cleaned = '\n'.join([ln.rstrip() for ln in code.splitlines()])
return {"action": "format_code", "result": cleaned, "metadata": {"lang": lang, "confidence": 0.0}}
except Exception:
# fallback: naive indentation/strip
cleaned = '\n'.join([ln.rstrip() for ln in code.splitlines()])
return {"action": "format_code", "result": cleaned, "metadata": {"lang": lang, "confidence": 0.0}}
except Exception:
return {"action": "format_code", "result": code, "metadata": {"lang": lang, "confidence": 0.0}}
def explain_code_text(code: str, lang: str = 'python') -> dict:
"""Return a basic explanation by summarizing comments and high level function names.
This is intentionally naive; future improvement: pass to an LLM or specialized parser.
"""
if not code:
return {"action": "explain_code", "result": "", "metadata": {"lang": lang}}
try:
# Extract function names and top-level comments
funcs = re.findall(r"def\s+(\w+)\s*\(", code)
comments = re.findall(r"#(.+)", code)
summary = []
if funcs:
summary.append(f"Functions: {', '.join(funcs)}")
if comments:
summary.append("Comments: " + "; ".join([c.strip() for c in comments[:3]]))
if not summary:
# fallback: first non-empty line
lines = [l.strip() for l in code.splitlines() if l.strip()]
summary.append(lines[0] if lines else "No content")
return {"action": "explain_code", "result": " | ".join(summary), "metadata": {"lang": lang, "confidence": 0.6}}
except Exception:
return {"action": "explain_code", "result": "", "metadata": {"lang": lang, "confidence": 0.0}}
def ensure_upload_dir():
from config import CONFIG
try:
os.makedirs(CONFIG.UPLOAD_DIR, exist_ok=True)
except Exception:
pass
from typing import Optional
def save_bytes_to_upload(filename: Optional[str], data: bytes) -> dict:
from config import CONFIG
import hashlib, time, uuid
ensure_upload_dir()
_id = str(uuid.uuid4())
safe_name = f"{_id}_{os.path.basename(str(filename or 'uploaded_file'))}"
path = os.path.join(CONFIG.UPLOAD_DIR, safe_name)
try:
with open(path, 'wb') as f:
f.write(data)
size = os.path.getsize(path)
import mimetypes
mime_type = mimetypes.guess_type(path)[0]
return {
'file_id': _id,
'filename': filename,
'path': path,
'mime_type': mime_type,
'size': size,
'uploaded_at': int(time.time()),
}
except Exception as e:
return {'error': str(e)}
def file_read_from_path(path: str, max_bytes: int = 100000) -> str:
try:
if not path or not os.path.exists(path):
return ""
with open(path, 'rb') as f:
b = f.read(max_bytes)
try:
return b.decode('utf-8', errors='replace')
except Exception:
return str(b)
except Exception:
return ""
def universal_tool(args: dict, allow_web_search: bool = True, allow_tools: bool = True, allow_file_tool: bool = True) -> dict:
"""Universal tool: if 'action' is provided, call the corresponding tool; otherwise autodetect using heuristics.
Supported actions: 'calc', 'web_search', 'file_upload', 'file_read'. If the action is not provided, attempt to detect the appropriate tool.
Returns a string result for prompt injection.
"""
if not isinstance(args, dict):
return {"error": "ERROR: invalid args for universal tool"}
action = args.get("action")
query = args.get("query")
# explicit action
if action == "calc":
if not allow_tools:
return {"action": "calc", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}}
expr = args.get("expression") or query
if not expr:
return {"action": "calc", "result": None, "metadata": {"error": "no expression provided", "confidence": 0.0}}
res = calc(str(expr))
return {"action": "calc", "result": str(res), "metadata": {"expression": expr, "confidence": 0.98}}
if action == "web_search":
if not allow_web_search:
return {"action": "web_search", "result": "", "metadata": {"error": "disabled_by_policy", "confidence": 0.0}}
q = args.get("query") or query
if not q:
return {"action": "web_search", "result": "", "metadata": {"confidence": 0.0}}
res = web_search(str(q), int(args.get("top_k") or 3))
return {"action": "web_search", "result": str(res), "metadata": {"query": q, "top_k": int(args.get("top_k") or 3), "confidence": 0.9}}
if action == 'file_read':
if not allow_file_tool:
return {"action": "file_read", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}}
fpath = args.get('path') or args.get('file_path')
if not fpath and args.get('file_id'):
from config import CONFIG
fid = args.get('file_id')
if fid:
candidate = os.path.join(CONFIG.UPLOAD_DIR, os.path.basename(str(fid)))
else:
candidate = None
if candidate and os.path.exists(candidate):
fpath = candidate
if not fpath:
return {"action": "file_read", "result": None, "metadata": {"error": "no_path_or_id", "confidence": 0.0}}
content = file_read_from_path(fpath, int(args.get('max_bytes') or 100000))
return {"action": "file_read", "result": str(content), "metadata": {"path": fpath, "confidence": 0.9}}
if action == 'file_upload':
if not allow_file_tool:
return {"action": "file_upload", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}}
# Expect either base64 content or raw bytes/text in args
import base64
fname = args.get('filename') or args.get('name') or 'uploaded_file'
content_b64 = args.get('content_base64') or args.get('content')
if not content_b64:
return {"action": "file_upload", "result": None, "metadata": {"error": "no_content", "confidence": 0.0}}
# If the content looks like base64 (contains only b64 chars, padded), decode; else try to treat it as plaintext
try:
if isinstance(content_b64, str):
b = None
try:
b = base64.b64decode(content_b64, validate=True)
except Exception:
b = str(content_b64).encode('utf-8')
else:
b = content_b64 if isinstance(content_b64, (bytes, bytearray)) else str(content_b64).encode('utf-8')
except Exception:
return {"action": "file_upload", "result": None, "metadata": {"error": "invalid_content", "confidence": 0.0}}
# Check size against configuration
try:
from config import CONFIG
if len(b) > getattr(CONFIG, 'MAX_UPLOAD_SIZE_BYTES', 10 * 1024 * 1024):
return {"action": "file_upload", "result": None, "metadata": {"error": "file_too_large", "confidence": 0.0}}
except Exception:
pass
# Save file
meta = None
try:
# If app exposes an internal API to register uploads, prefer that so model checks happen in one place
import importlib
app_module = importlib.import_module('app')
if hasattr(app_module, 'upload_file_internal'):
try:
meta = app_module.upload_file_internal(b, filename=fname)
except Exception:
meta = save_bytes_to_upload(fname, b)
# fallback: attempt to register in app's UPLOADED_FILES if present
try:
if hasattr(app_module, 'UPLOADED_FILES') and isinstance(app_module.UPLOADED_FILES, dict):
app_module.UPLOADED_FILES[meta['file_id']] = meta
except Exception:
pass
else:
meta = save_bytes_to_upload(fname, b)
try:
if hasattr(app_module, 'UPLOADED_FILES') and isinstance(app_module.UPLOADED_FILES, dict):
app_module.UPLOADED_FILES[meta['file_id']] = meta
except Exception:
pass
except Exception:
# fallback to local save and skip register
meta = save_bytes_to_upload(fname, b)
return {"action": "file_upload", "result": meta, "metadata": {"filename": fname, "file_id": meta.get('file_id'), "confidence": 0.9}}
if action == 'fetch_url':
if not allow_web_search:
return {"action": "fetch_url", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}}
url = args.get('url') or query
if not url:
return {"action": "fetch_url", "result": None, "metadata": {"error": "no_url_provided", "confidence": 0.0}}
content = fetch_url(str(url), int(args.get('max_chars') or 20000))
return {"action": "fetch_url", "result": str(content), "metadata": {"url": url, "confidence": 0.9}}
if action == 'summarize':
if not allow_tools:
return {"action": "summarize", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}}
txt = args.get('text') or ''
if not txt and args.get('url'):
try:
txt = fetch_url(str(args.get('url')))
except Exception:
txt = ''
if not txt and query:
txt = query
if not txt:
return {"action": "summarize", "result": None, "metadata": {"error": "no_text_or_url_provided", "confidence": 0.0}}
s = summarize_text(str(txt), int(args.get('max_sentences') or 3))
return {"action": "summarize", "result": s, "metadata": {"confidence": 0.85}}
if action == 'keywords' or action == 'keyword_extraction':
if not allow_tools:
return {"action": "keywords", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}}
txt = args.get('text') or ''
if not txt and args.get('url'):
try:
txt = fetch_url(str(args.get('url')))
except Exception:
txt = ''
if not txt and query:
txt = query
if not txt:
return {"action": "keywords", "result": None, "metadata": {"error": "no_text_or_url_provided", "confidence": 0.0}}
kws = extract_keywords(str(txt), int(args.get('top_k') or 5))
return {"action": "keywords", "result": kws, "metadata": {"confidence": 0.85}}
if action == 'sentiment':
if not allow_tools:
return {"action": "sentiment", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}}
txt = args.get('text') or ''
if not txt and args.get('url'):
try:
txt = fetch_url(str(args.get('url')))
except Exception:
txt = ''
if not txt and query:
txt = query
if not txt:
return {"action": "sentiment", "result": None, "metadata": {"error": "no_text_or_url_provided", "confidence": 0.0}}
res = sentiment_analysis(str(txt))
return {"action": "sentiment", "result": res, "metadata": {"confidence": 0.85}}
if action == 'translate':
if not allow_tools:
return {"action": "translate", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}}
txt = args.get('text') or query or ''
target = args.get('target') or 'en'
res = translate_text(str(txt), str(target))
return {"action": "translate", "result": res.get('result'), "metadata": {"lang": res.get('lang'), "note": res.get('note'), "confidence": 0.5}}
if action == 'spell_check' or action == 'spellcheck':
if not allow_tools:
return {"action": "spell_check", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}}
txt = args.get('text') or query or ''
res = spell_check_text(str(txt))
return {"action": "spell_check", "result": res.get('result'), "metadata": {"corrections": res.get('corrections'), "confidence": 0.5}}
if action == 'format_code' or action == 'format':
if not allow_tools:
return {"action": "format_code", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}}
txt = args.get('text') or query or ''
lang = args.get('language') or args.get('lang') or 'python'
res = format_code_text(txt, lang)
return {"action": "format_code", "result": res.get('result'), "metadata": {"note": res.get('note'), "confidence": 0.6}}
if action == 'explain_code' or action == 'explain':
if not allow_tools:
return {"action": "explain_code", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}}
txt = args.get('text') or query or ''
lang = args.get('language') or args.get('lang') or 'python'
res = explain_code_text(txt, lang)
# Return a small extracted explanation string if available
if isinstance(res, dict):
ds = res.get('docstrings') or []
expl = res.get('explanation') or (ds[0] if isinstance(ds, list) and len(ds) > 0 else '')
else:
expl = str(res)
return {"action": "explain_code", "result": expl, "metadata": {"docstrings": res.get('docstrings'), "confidence": 0.6}}
# Removed duplicate action handlers for translate, spell_check, format_code, explain_code
# auto-detect based on query content
if query:
# if expression - use calc
if re.search(r"\d+\s*[-+*/%]\s*\d+", str(query)):
if not allow_tools:
return {"action": "calc", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}}
res = calc(str(query))
return {"action": "calc", "result": str(res), "metadata": {"expression": str(query), "confidence": 0.95}}
# fetch_url auto-detect when a URL present
if re.search(r"https?://\S+", str(query)):
if not allow_web_search:
return {"action": "fetch_url", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}}
content = fetch_url(str(query), int(args.get('max_chars') or 20000))
return {"action": "fetch_url", "result": str(content), "metadata": {"url": str(query), "confidence": 0.9}}
# translate/detect: e.g., 'translate to spanish: <text>'
if re.search(r"\btranslate\b.*to\s+([a-z]{2,})", str(query).lower()):
import re as _re
m = _re.search(r"\btranslate\b.*to\s+([a-z]{2,})", str(query).lower())
tgt = m.group(1) if m else 'en'
if not allow_tools:
return {"action": "translate", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}}
res = translate_text(str(query), tgt)
return res
# format_code auto-detect: presence of ``` or 'format code' text
if re.search(r"```[a-zA-Z]*|format code|format this code|pretty print code", str(query).lower()):
if not allow_tools:
return {"action": "format_code", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}}
code = str(query)
res = format_code_text(code)
return res
# summarize auto-detect
if re.search(r"\b(summarize|summarise|tl;dr)\b", str(query).lower()):
if not allow_tools:
return {"action": "summarize", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}}
s = summarize_text(str(query))
return {"action": "summarize", "result": s, "metadata": {"confidence": 0.85}}
# keywords auto-detect
if re.search(r"\b(keywords|key terms|extract keywords)\b", str(query).lower()):
if not allow_tools:
return {"action": "keywords", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}}
kws = extract_keywords(str(query))
return {"action": "keywords", "result": kws, "metadata": {"confidence": 0.78}}
# sentiment auto-detect
if re.search(r"\b(sentiment|tone|is this positive|is this negative|what is the sentiment)\b", str(query).lower()):
if not allow_tools:
return {"action": "sentiment", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}}
res = sentiment_analysis(str(query))
return {"action": "sentiment", "result": res, "metadata": {"confidence": 0.8}}
if not allow_web_search:
return {"action": "web_search", "result": "", "metadata": {"error": "disabled_by_policy", "confidence": 0.0}}
res = web_search(str(query), int(args.get("top_k") or 3))
return {"action": "web_search", "result": str(res), "metadata": {"query": str(query), "top_k": int(args.get("top_k") or 3), "confidence": 0.9}}
return {"error": "ERROR: could not determine action for universal tool"}
def bias_mitigation(text: str) -> dict:
"""A light-weight bias mitigation helper.
The goal: detect and neutralize potentially biased, stereotyping, or discriminatory statements.
It's intentionally conservative (favoring suppression) and returns sanitized content and a flag.
"""
import re
if not text or not isinstance(text, str):
return {"sanitized": text, "suppressed": False, "reason": None}
t = text.strip()
# Simple checks for sweeping generalizations towards protected groups
# This is a naive approach and can be adapted with an ML classifier.
protected_terms = [
r"\b(race|religion|ethnicity|gender|sexual orientation|disability)\b",
r"\b(black|white|asian|hispanic|muslim|christian|jewish|gay|lesbian|transgender)\b",
]
sweeping_patterns = [
r"\b(all|always|never|every|none)\b[^.?!]{0,60}\b(is|are|will|should|must)\b",
r"\b(\w+)s?\b[^.?!]{0,60}\b(are|is)\b[^.?!]{0,80}\b(inferior|superior|stupid|lazy|criminal)\b",
]
# Simple profanity or slurs (non-exhaustive) - block
slurs = [r"\b(slur1|slur2)\b"] # placeholder; real app should use a curated list
for pattern in sweeping_patterns:
if re.search(pattern, t, flags=re.I):
# ensure it references a protected group before suppressing
for pt in protected_terms:
if re.search(pt, t, flags=re.I):
return {"sanitized": "[content suppressed due to potential bias]", "suppressed": True, "reason": "sweeping_generalization_protected_group"}
# If contains slurs -> suppress
for s in slurs:
if re.search(s, t, flags=re.I):
return {"sanitized": "[content suppressed due to policy]", "suppressed": True, "reason": "profanity_or_slur"}
# For political content with strong claims, favor neutralization
if re.search(r"\b(president|prime minister|dictator|election|vote|politician)\b", t, flags=re.I) and re.search(r"\b(is|are|will|should)\b[^.?!]{0,80}\b(incompetent|corrupt|traitor|criminal)\b", t, flags=re.I):
# return a neutral paraphrase where we avoid strong unfounded claims
sanitized = re.sub(r"\b(is|are|will|should)\b[^.?!]{0,80}\b(incompetent|corrupt|traitor|criminal)\b", "may have actions that deserve scrutiny", t, flags=re.I)
return {"sanitized": sanitized, "suppressed": False, "reason": "political_neutralization"}
return {"sanitized": text, "suppressed": False, "reason": None}