import re, os, threading, queue, requests, time from typing import List, Optional, Union from pydantic import BaseModel, Field from pydantic_settings import BaseSettings from api_types import ChatMessage def parse_think_response(full_response: str): think_start = full_response.find("") if think_end == -1: # 未闭合的情况 reasoning = full_response[think_start:].strip() content = "" else: reasoning = full_response[think_start : think_end + 9].strip() # +9包含完整标签 content = full_response[think_end + 9 :].strip() # 清理标签保留内容 reasoning_content = reasoning.replace("", "").strip() return reasoning_content, content def cleanMessages(messages: List[ChatMessage], removeThinkingContent: bool = False): promptStrList = [] for message in messages: content = message.content.strip() content = re.sub(r"\n+", "\n", content) promptStrList.append( f"{message.role.strip().lower().capitalize()}: {content if message.role.strip().lower().capitalize()!='Assistant' or not removeThinkingContent else remove_nested_think_tags_stack(content)}" ) return "\n\n".join(promptStrList) def remove_nested_think_tags_stack(text): stack = [] result = "" i = 0 while i < len(text): if text[i : i + 7] == "": stack.append("") i += 7 elif text[i : i + 8] == "": if stack and stack[-1] == "": stack.pop() i += 8 else: result += text[i : i + 8] i += 8 elif not stack: result += text[i] i += 1 else: i += 1 return result def format_bytes(size): power = 2**10 n = 0 power_labels = {0: "", 1: "K", 2: "M", 3: "G", 4: "T"} while size > power: size /= power n += 1 return f"{size:.4f}{power_labels[n]+'B'}" LOGGER_QUEUE = queue.Queue(int(os.environ.get('LOGGER_QUEUE_SIZE', 100))) def logger(): """Background thread to post logs to LOG_PORT. Uses blocking get so the thread will wait for items and won't spin when queue empty. Any errors are swallowed to avoid crashing the logger thread. """ print("enable") while True: try: item = LOGGER_QUEUE.get() except Exception: # If queue is unexpectedly closed or an error occurs, keep running time.sleep(0.1) continue try: LOG_PORT = os.environ.get("LOG_PORT") if LOG_PORT: # Best-effort; ignore any network error requests.post( LOG_PORT, headers={"Content-Type": "application/json"}, json=item, timeout=5, ) except Exception: # never let log failures escape to the main thread pass if os.environ.get("LOG_PORT"): # make the logger thread a daemon so it won't block process exit t = threading.Thread(target=logger, daemon=True) t.start() def log(item): try: LOGGER_QUEUE.put_nowait(item) except queue.Full: # Queue is full: drop the log (best-effort). Avoid raising to keep the # application responsive; optionally print a fallback log to console try: # Use a short, non-blocking print so at least something is recorded print("LOG DROP: queue full, dropping log item") except Exception: pass def web_search(query: str, top_k: int = 3) -> str: """Perform a simple web search via DuckDuckGo HTML and return top_k results as a combined string. This is a lightweight fallback search that does not call external model services — it queries a public search endpoint, parses titles/snippets/urls and returns them as formatted text to be included into the model's prompt context. """ if not query or query.strip() == "": return "" try: from bs4 import BeautifulSoup except Exception: return "" try: headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"} q = query.strip() resp = requests.get("https://duckduckgo.com/html/", params={"q": q}, headers=headers, timeout=10) soup = BeautifulSoup(resp.text, "html.parser") # DuckDuckGo's html structure: results are in `div.result` containers. results = [] for r in soup.find_all("div", class_="result", limit=top_k): a = r.find("a", class_="result__a") or r.find("a", href=True) title = a.get_text(strip=True) if a else "" href = a.get("href") if a else "" snippet = "" s = r.find("a", class_="result__snippet") or r.find("div", class_="result__snippet") if s: snippet = s.get_text(strip=True) results.append(f"{title} - {snippet} - {href}") return "\n".join(results) except Exception: return "" def calc(expr: str) -> str: """Safely evaluate a simple arithmetic expression and return the result as string. This uses ast parsing to disallow attributes and only permit arithmetic operators. """ try: import ast, operator as op # supported operators allowed_ops = { ast.Add: op.add, ast.Sub: op.sub, ast.Mult: op.mul, ast.Div: op.truediv, ast.Pow: op.pow, ast.BitXor: op.xor, ast.USub: op.neg, ast.Mod: op.mod, ast.FloorDiv: op.floordiv, } def _eval(node): if isinstance(node, ast.Num): # return node.n elif isinstance(node, ast.BinOp): left = _eval(node.left) right = _eval(node.right) op_type = type(node.op) if op_type in allowed_ops: return allowed_ops[op_type](left, right) else: raise ValueError("Unsupported operator") elif isinstance(node, ast.UnaryOp): operand = _eval(node.operand) op_type = type(node.op) if op_type in allowed_ops: return allowed_ops[op_type](operand) raise ValueError("Unsupported unary op") else: raise ValueError("Unsupported expression type") node = ast.parse(expr, mode='eval') result = _eval(node.body) return str(result) except Exception as e: return f"ERROR: {e}" def detect_tools_and_reasoning(text_or_messages) -> dict: """Detects whether web_search, calc, or reasoning are likely needed based on heuristics. Accepts either a single string prompt or a list of ChatMessage. Returns a dict with booleans and detected tools list. """ if isinstance(text_or_messages, list): try: text = "\n\n".join([m.get('content', '') if isinstance(m, dict) else (getattr(m, 'content', '') or '') for m in text_or_messages if m]) except Exception: text = "" else: text = str(text_or_messages or "") t = text.lower() # Simple heuristics need_calc = False need_web_search = False need_reasoning = False need_universal = False need_fetch_url = False need_summarize = False need_keywords = False need_sentiment = False need_translate = False need_spell_check = False need_format_code = False need_explain_code = False detected_tools = [] # Heuristic for calc: presence of operators AND numbers OR keywords 'calculate/compute' plus numeric tokens if (re.search(r"\d+\s*[-+*/%]\s*\d+", t) or (re.search(r"\b(calculate|compute|solve|evaluate|sum|add|subtract|multiply|divide)\b", t) and re.search(r"\d", t))): need_calc = True # Try to extract a most-likely arithmetic expression from the text # Accept digits, parentheses and operators m = re.search(r"([\d\(\)\s+\-*/%^.]+)", text) expr = m.group(0).strip() if m else None # only keep if it includes an operator if expr and not re.search(r"[-+*/%]", expr): expr = None detected_tools.append({"name": "calc", "args": {"expression": expr, "confidence": 0.95 if expr else 0.5}}) # Heuristic for web search: 'who is', 'what is', 'current', 'latest', 'news', or question words with facts # Heuristic for web search: question words + facts or 'current/latest' signals; avoid math queries if ( re.search(r"\b(who is|who's|what is|what's|when is|where is|current|latest|news|is the president|president of|population of|capital of|how many|GDP of)\b", t) and not re.search(r"\d+\s*[-+*/%]\s*\d+", t) ): need_web_search = True detected_tools.append({"name": "web_search", "args": {"query": text, "confidence": 0.9}}) # Heuristic for reasoning: words like 'explain', 'why', 'reason', 'prove', 'derive', 'compare' if re.search(r"\b(explain|why|because|reason|prove|derive|compare|analysis|analysis:|evaluate|argue|consequence|trade-offs)\b", t): need_reasoning = True # Heuristic for universal tool: requests to "use tool", "execute tool", or generic function-call language if re.search(r"\b(use (a )?tool|execute (a )?tool|call (a )?tool|function call|run tool|do this via a tool|invoke tool|call tool)\b", t): need_universal = True # detect fetch_url: a URL string or request to 'open' the link if re.search(r"https?://\S+", t) or re.search(r"\b(open|visit)\s+(https?://|www\.)", t): need_fetch_url = True m_url = re.search(r'https?://\S+', text) url_val = m_url.group(0) if m_url else text detected_tools.append({"name": "fetch_url", "args": {"url": url_val, "confidence": 0.85}}) # detect translate requests: 'translate to es' or 'traducir a español' if re.search(r"\btranslate\b.*to\s+([a-z]{2,})|\btraducir\b.*a\s+([a-z]{2,})", t): need_translate = True m = re.search(r"\btranslate\b.*to\s+([a-z]{2,})|\btraducir\b.*a\s+([a-z]{2,})", t) tgt = (m.group(1) if m and m.group(1) else (m.group(2) if m and len(m.groups()) > 1 else 'en')) detected_tools.append({"name": "translate", "args": {"text": text, "target_lang": tgt, "confidence": 0.85}}) # detect summarize requests ('summarize', 'tl;dr', 'summarise') if re.search(r"\b(summarize|summarise|tl;dr|tl;dr:)\b", t): need_summarize = True detected_tools.append({"name": "summarize", "args": {"text": text, "max_sentences": 3, "confidence": 0.8}}) # detect keyword extraction requests if re.search(r"\b(keywords|key words|key terms|extract keywords)\b", t): need_keywords = True detected_tools.append({"name": "keywords", "args": {"text": text, "top_k": 5, "confidence": 0.78}}) # detect sentiment analysis requests if re.search(r"\b(sentiment|tone|is this positive|is this negative|what is the sentiment)\b", t): need_sentiment = True detected_tools.append({"name": "sentiment", "args": {"text": text, "confidence": 0.8}}) # detect code-format and explain: '```', 'explain code', 'what does this function do' if re.search(r"```[a-zA-Z]*|format code|format this code|pretty print code", t): need_format_code = True detected_tools.append({"name": "format_code", "args": {"code": text, "language": "python", "confidence": 0.8}}) if re.search(r"\bexplain( this)? code\b|what does this (function|method|snippet) do", t): need_explain_code = True detected_tools.append({"name": "explain_code", "args": {"code": text, "language": "python", "confidence": 0.75}}) # detect spellcheck requests if re.search(r"\b(spell check|spellcheck|check spelling|corregir ortografía|revisar ortografía)\b", t): need_spell_check = True detected_tools.append({"name": "spell_check", "args": {"text": text, "confidence": 0.6}}) if re.search(r"\b(sentiment|tone|is this positive|is this negative|what is the sentiment)\b", t): need_sentiment = True detected_tools.append({"name": "sentiment", "args": {"text": text, "confidence": 0.8}}) # compute confidence summary # For now, we use a simple heuristic: reasoning >0.8 if key words present; web_search 0.9; calc 0.95 if numeric confs = { "calc_confidence": 0.95 if need_calc else 0.0, "web_search_confidence": 0.9 if need_web_search else 0.0, "reasoning_confidence": 0.85 if need_reasoning else 0.0, "universal_confidence": 0.65 if need_universal else 0.0, "translate_confidence": 0.85 if need_translate else 0.0, "spell_check_confidence": 0.6 if need_spell_check else 0.0, "format_code_confidence": 0.7 if need_format_code else 0.0, "explain_code_confidence": 0.7 if need_explain_code else 0.0, } return { "need_calc": need_calc, "need_web_search": need_web_search, "need_reasoning": need_reasoning, "need_universal": need_universal, "need_fetch_url": need_fetch_url, "need_summarize": need_summarize, "need_keywords": need_keywords, "need_sentiment": need_sentiment, "need_translate": need_translate, "need_spell_check": need_spell_check, "need_format_code": need_format_code, "need_explain_code": need_explain_code, "detected_tools": detected_tools, "confidence": confs, } def fetch_url(url: str, max_chars: int = 20000) -> str: """Fetch the content of a URL and return cleaned text (strip HTML tags). Returns a truncated plain-text string of up to `max_chars` characters. """ if not url: return "" try: headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"} resp = requests.get(url, headers=headers, timeout=10) if not resp.ok: return "" text = resp.text # remove scripts/styles and HTML tags try: from bs4 import BeautifulSoup soup = BeautifulSoup(text, "html.parser") for s in soup(["script", "style"]): s.decompose() body = soup.get_text(separator=" \n ") cleaned = re.sub(r"\s+", " ", body).strip() return cleaned[:max_chars] except Exception: # fallback: naive strip cleaned = re.sub(r"<[^>]+>", "", text) cleaned = re.sub(r"\s+", " ", cleaned) return cleaned[:max_chars] except Exception: return "" def summarize_text(text: str, max_sentences: int = 3) -> str: """Naive summary by selecting the leading sentences (simple extractive summarizer). This is intentionally simple to avoid heavy dependencies. """ if not text or not isinstance(text, str): return "" sents = re.split(r"(?<=[.!?])\s+", text.strip()) if len(sents) <= max_sentences: return " ".join(sents).strip() return " ".join(sents[:max_sentences]).strip() def extract_keywords(text: str, top_k: int = 5) -> List[str]: """Return top_k frequent non-stopword tokens from text (naive extraction). """ if not text: return [] try: tokens = re.findall(r"\w+", text.lower()) stopwords = set(["the", "and", "is", "in", "to", "a", "an", "of", "for", "with", "on", "that", "this", "it", "as", "are"]) filtered = [t for t in tokens if t not in stopwords and len(t) > 2] freq = {} for t in filtered: freq[t] = freq.get(t, 0) + 1 items = sorted(freq.items(), key=lambda x: -x[1])[:top_k] return [k for k, v in items] except Exception: return [] def sentiment_analysis(text: str) -> dict: """Very basic lexicon-based sentiment analysis. Returns an opinion: {sentiment: 'positive'/'neutral'/'negative', 'score': float }. """ if not text: return {"sentiment": "neutral", "score": 0.0} pos = set(["good", "great", "excellent", "positive", "success", "love", "like", "happy", "best"]) neg = set(["bad", "horrible", "poor", "negative", "hate", "dislike", "sad", "worst", "angry"]) tokens = re.findall(r"\w+", text.lower()) score = 0 for t in tokens: if t in pos: score += 1 elif t in neg: score -= 1 if score > 0: return {"sentiment": "positive", "score": float(score)} if score < 0: return {"sentiment": "negative", "score": float(score)} return {"sentiment": "neutral", "score": 0.0} # removed earlier naive duplicates in favor of featureful versions below def translate_text(text: str, target_lang: str = 'en') -> dict: """Translate text to target language using `googletrans` if available; otherwise return a no-op dict indicating translation is unavailable. This is intentionally conservative; prefer server-side libraries if available. """ if not text: return {"action": "translate", "result": "", "metadata": {"lang": target_lang, "confidence": 0.0}} try: import importlib.util googletrans_spec = importlib.util.find_spec("googletrans") if googletrans_spec is not None: # Only attempt import if googletrans is available try: import importlib googletrans_spec = importlib.util.find_spec("googletrans") if googletrans_spec is not None: googletrans = importlib.import_module("googletrans") Translator = getattr(googletrans, 'Translator', None) if Translator: t = Translator() res = t.translate(text, dest=target_lang) return {"action": "translate", "result": res.text, "metadata": {"lang": target_lang, "confidence": 0.9}} except Exception: pass # Fallback: return an annotated prefix indicating translation was requested but not performed return {"action": "translate", "result": f"[translated to {target_lang}]: {text}", "metadata": {"lang": target_lang, "confidence": 0.0}} except Exception: return {"action": "translate", "result": f"[translated to {target_lang}]: {text}", "metadata": {"lang": target_lang, "confidence": 0.0}} def spell_check_text(text: str) -> dict: """Naive spell check that returns the original text and a no-op list of suggestions. If libraries like `textblob` are installed, would provide suggestions; fallback to identity. """ if not text: return {"action": "spell_check", "result": text, "metadata": {"suggestions": [], "confidence": 0.0}} try: import importlib.util textblob_spec = importlib.util.find_spec("textblob") if textblob_spec is not None: try: textblob = importlib.import_module("textblob") TextBlob = getattr(textblob, "TextBlob", None) if TextBlob is not None: tb = TextBlob(text) corrected = str(tb.correct()) if corrected != text: return {"action": "spell_check", "result": corrected, "metadata": {"suggestions": [corrected], "confidence": 0.9}} except Exception: pass except Exception: pass return {"action": "spell_check", "result": text, "metadata": {"suggestions": [], "confidence": 0.0}} def format_code_text(code: str, lang: str = 'python') -> dict: """Simple code formatting: attempts to run `black` if available; otherwise returns code unchanged. """ if not code: return {"action": "format_code", "result": code, "metadata": {"lang": lang, "confidence": 0.0}} try: try: try: import importlib.util black_spec = importlib.util.find_spec("black") if black_spec is not None: black = importlib.import_module("black") else: black = None except ImportError: black = None if black is not None: mode = black.Mode() formatted = black.format_str(code, mode=mode) return {"action": "format_code", "result": formatted, "metadata": {"lang": lang, "confidence": 0.95}} else: # fallback: naive indentation/strip cleaned = '\n'.join([ln.rstrip() for ln in code.splitlines()]) return {"action": "format_code", "result": cleaned, "metadata": {"lang": lang, "confidence": 0.0}} except Exception: # fallback: naive indentation/strip cleaned = '\n'.join([ln.rstrip() for ln in code.splitlines()]) return {"action": "format_code", "result": cleaned, "metadata": {"lang": lang, "confidence": 0.0}} except Exception: return {"action": "format_code", "result": code, "metadata": {"lang": lang, "confidence": 0.0}} def explain_code_text(code: str, lang: str = 'python') -> dict: """Return a basic explanation by summarizing comments and high level function names. This is intentionally naive; future improvement: pass to an LLM or specialized parser. """ if not code: return {"action": "explain_code", "result": "", "metadata": {"lang": lang}} try: # Extract function names and top-level comments funcs = re.findall(r"def\s+(\w+)\s*\(", code) comments = re.findall(r"#(.+)", code) summary = [] if funcs: summary.append(f"Functions: {', '.join(funcs)}") if comments: summary.append("Comments: " + "; ".join([c.strip() for c in comments[:3]])) if not summary: # fallback: first non-empty line lines = [l.strip() for l in code.splitlines() if l.strip()] summary.append(lines[0] if lines else "No content") return {"action": "explain_code", "result": " | ".join(summary), "metadata": {"lang": lang, "confidence": 0.6}} except Exception: return {"action": "explain_code", "result": "", "metadata": {"lang": lang, "confidence": 0.0}} def ensure_upload_dir(): from config import CONFIG try: os.makedirs(CONFIG.UPLOAD_DIR, exist_ok=True) except Exception: pass from typing import Optional def save_bytes_to_upload(filename: Optional[str], data: bytes) -> dict: from config import CONFIG import hashlib, time, uuid ensure_upload_dir() _id = str(uuid.uuid4()) safe_name = f"{_id}_{os.path.basename(str(filename or 'uploaded_file'))}" path = os.path.join(CONFIG.UPLOAD_DIR, safe_name) try: with open(path, 'wb') as f: f.write(data) size = os.path.getsize(path) import mimetypes mime_type = mimetypes.guess_type(path)[0] return { 'file_id': _id, 'filename': filename, 'path': path, 'mime_type': mime_type, 'size': size, 'uploaded_at': int(time.time()), } except Exception as e: return {'error': str(e)} def file_read_from_path(path: str, max_bytes: int = 100000) -> str: try: if not path or not os.path.exists(path): return "" with open(path, 'rb') as f: b = f.read(max_bytes) try: return b.decode('utf-8', errors='replace') except Exception: return str(b) except Exception: return "" def universal_tool(args: dict, allow_web_search: bool = True, allow_tools: bool = True, allow_file_tool: bool = True) -> dict: """Universal tool: if 'action' is provided, call the corresponding tool; otherwise autodetect using heuristics. Supported actions: 'calc', 'web_search', 'file_upload', 'file_read'. If the action is not provided, attempt to detect the appropriate tool. Returns a string result for prompt injection. """ if not isinstance(args, dict): return {"error": "ERROR: invalid args for universal tool"} action = args.get("action") query = args.get("query") # explicit action if action == "calc": if not allow_tools: return {"action": "calc", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}} expr = args.get("expression") or query if not expr: return {"action": "calc", "result": None, "metadata": {"error": "no expression provided", "confidence": 0.0}} res = calc(str(expr)) return {"action": "calc", "result": str(res), "metadata": {"expression": expr, "confidence": 0.98}} if action == "web_search": if not allow_web_search: return {"action": "web_search", "result": "", "metadata": {"error": "disabled_by_policy", "confidence": 0.0}} q = args.get("query") or query if not q: return {"action": "web_search", "result": "", "metadata": {"confidence": 0.0}} res = web_search(str(q), int(args.get("top_k") or 3)) return {"action": "web_search", "result": str(res), "metadata": {"query": q, "top_k": int(args.get("top_k") or 3), "confidence": 0.9}} if action == 'file_read': if not allow_file_tool: return {"action": "file_read", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}} fpath = args.get('path') or args.get('file_path') if not fpath and args.get('file_id'): from config import CONFIG fid = args.get('file_id') if fid: candidate = os.path.join(CONFIG.UPLOAD_DIR, os.path.basename(str(fid))) else: candidate = None if candidate and os.path.exists(candidate): fpath = candidate if not fpath: return {"action": "file_read", "result": None, "metadata": {"error": "no_path_or_id", "confidence": 0.0}} content = file_read_from_path(fpath, int(args.get('max_bytes') or 100000)) return {"action": "file_read", "result": str(content), "metadata": {"path": fpath, "confidence": 0.9}} if action == 'file_upload': if not allow_file_tool: return {"action": "file_upload", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}} # Expect either base64 content or raw bytes/text in args import base64 fname = args.get('filename') or args.get('name') or 'uploaded_file' content_b64 = args.get('content_base64') or args.get('content') if not content_b64: return {"action": "file_upload", "result": None, "metadata": {"error": "no_content", "confidence": 0.0}} # If the content looks like base64 (contains only b64 chars, padded), decode; else try to treat it as plaintext try: if isinstance(content_b64, str): b = None try: b = base64.b64decode(content_b64, validate=True) except Exception: b = str(content_b64).encode('utf-8') else: b = content_b64 if isinstance(content_b64, (bytes, bytearray)) else str(content_b64).encode('utf-8') except Exception: return {"action": "file_upload", "result": None, "metadata": {"error": "invalid_content", "confidence": 0.0}} # Check size against configuration try: from config import CONFIG if len(b) > getattr(CONFIG, 'MAX_UPLOAD_SIZE_BYTES', 10 * 1024 * 1024): return {"action": "file_upload", "result": None, "metadata": {"error": "file_too_large", "confidence": 0.0}} except Exception: pass # Save file meta = None try: # If app exposes an internal API to register uploads, prefer that so model checks happen in one place import importlib app_module = importlib.import_module('app') if hasattr(app_module, 'upload_file_internal'): try: meta = app_module.upload_file_internal(b, filename=fname) except Exception: meta = save_bytes_to_upload(fname, b) # fallback: attempt to register in app's UPLOADED_FILES if present try: if hasattr(app_module, 'UPLOADED_FILES') and isinstance(app_module.UPLOADED_FILES, dict): app_module.UPLOADED_FILES[meta['file_id']] = meta except Exception: pass else: meta = save_bytes_to_upload(fname, b) try: if hasattr(app_module, 'UPLOADED_FILES') and isinstance(app_module.UPLOADED_FILES, dict): app_module.UPLOADED_FILES[meta['file_id']] = meta except Exception: pass except Exception: # fallback to local save and skip register meta = save_bytes_to_upload(fname, b) return {"action": "file_upload", "result": meta, "metadata": {"filename": fname, "file_id": meta.get('file_id'), "confidence": 0.9}} if action == 'fetch_url': if not allow_web_search: return {"action": "fetch_url", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}} url = args.get('url') or query if not url: return {"action": "fetch_url", "result": None, "metadata": {"error": "no_url_provided", "confidence": 0.0}} content = fetch_url(str(url), int(args.get('max_chars') or 20000)) return {"action": "fetch_url", "result": str(content), "metadata": {"url": url, "confidence": 0.9}} if action == 'summarize': if not allow_tools: return {"action": "summarize", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}} txt = args.get('text') or '' if not txt and args.get('url'): try: txt = fetch_url(str(args.get('url'))) except Exception: txt = '' if not txt and query: txt = query if not txt: return {"action": "summarize", "result": None, "metadata": {"error": "no_text_or_url_provided", "confidence": 0.0}} s = summarize_text(str(txt), int(args.get('max_sentences') or 3)) return {"action": "summarize", "result": s, "metadata": {"confidence": 0.85}} if action == 'keywords' or action == 'keyword_extraction': if not allow_tools: return {"action": "keywords", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}} txt = args.get('text') or '' if not txt and args.get('url'): try: txt = fetch_url(str(args.get('url'))) except Exception: txt = '' if not txt and query: txt = query if not txt: return {"action": "keywords", "result": None, "metadata": {"error": "no_text_or_url_provided", "confidence": 0.0}} kws = extract_keywords(str(txt), int(args.get('top_k') or 5)) return {"action": "keywords", "result": kws, "metadata": {"confidence": 0.85}} if action == 'sentiment': if not allow_tools: return {"action": "sentiment", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}} txt = args.get('text') or '' if not txt and args.get('url'): try: txt = fetch_url(str(args.get('url'))) except Exception: txt = '' if not txt and query: txt = query if not txt: return {"action": "sentiment", "result": None, "metadata": {"error": "no_text_or_url_provided", "confidence": 0.0}} res = sentiment_analysis(str(txt)) return {"action": "sentiment", "result": res, "metadata": {"confidence": 0.85}} if action == 'translate': if not allow_tools: return {"action": "translate", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}} txt = args.get('text') or query or '' target = args.get('target') or 'en' res = translate_text(str(txt), str(target)) return {"action": "translate", "result": res.get('result'), "metadata": {"lang": res.get('lang'), "note": res.get('note'), "confidence": 0.5}} if action == 'spell_check' or action == 'spellcheck': if not allow_tools: return {"action": "spell_check", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}} txt = args.get('text') or query or '' res = spell_check_text(str(txt)) return {"action": "spell_check", "result": res.get('result'), "metadata": {"corrections": res.get('corrections'), "confidence": 0.5}} if action == 'format_code' or action == 'format': if not allow_tools: return {"action": "format_code", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}} txt = args.get('text') or query or '' lang = args.get('language') or args.get('lang') or 'python' res = format_code_text(txt, lang) return {"action": "format_code", "result": res.get('result'), "metadata": {"note": res.get('note'), "confidence": 0.6}} if action == 'explain_code' or action == 'explain': if not allow_tools: return {"action": "explain_code", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}} txt = args.get('text') or query or '' lang = args.get('language') or args.get('lang') or 'python' res = explain_code_text(txt, lang) # Return a small extracted explanation string if available if isinstance(res, dict): ds = res.get('docstrings') or [] expl = res.get('explanation') or (ds[0] if isinstance(ds, list) and len(ds) > 0 else '') else: expl = str(res) return {"action": "explain_code", "result": expl, "metadata": {"docstrings": res.get('docstrings'), "confidence": 0.6}} # Removed duplicate action handlers for translate, spell_check, format_code, explain_code # auto-detect based on query content if query: # if expression - use calc if re.search(r"\d+\s*[-+*/%]\s*\d+", str(query)): if not allow_tools: return {"action": "calc", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}} res = calc(str(query)) return {"action": "calc", "result": str(res), "metadata": {"expression": str(query), "confidence": 0.95}} # fetch_url auto-detect when a URL present if re.search(r"https?://\S+", str(query)): if not allow_web_search: return {"action": "fetch_url", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}} content = fetch_url(str(query), int(args.get('max_chars') or 20000)) return {"action": "fetch_url", "result": str(content), "metadata": {"url": str(query), "confidence": 0.9}} # translate/detect: e.g., 'translate to spanish: ' if re.search(r"\btranslate\b.*to\s+([a-z]{2,})", str(query).lower()): import re as _re m = _re.search(r"\btranslate\b.*to\s+([a-z]{2,})", str(query).lower()) tgt = m.group(1) if m else 'en' if not allow_tools: return {"action": "translate", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}} res = translate_text(str(query), tgt) return res # format_code auto-detect: presence of ``` or 'format code' text if re.search(r"```[a-zA-Z]*|format code|format this code|pretty print code", str(query).lower()): if not allow_tools: return {"action": "format_code", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}} code = str(query) res = format_code_text(code) return res # summarize auto-detect if re.search(r"\b(summarize|summarise|tl;dr)\b", str(query).lower()): if not allow_tools: return {"action": "summarize", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}} s = summarize_text(str(query)) return {"action": "summarize", "result": s, "metadata": {"confidence": 0.85}} # keywords auto-detect if re.search(r"\b(keywords|key terms|extract keywords)\b", str(query).lower()): if not allow_tools: return {"action": "keywords", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}} kws = extract_keywords(str(query)) return {"action": "keywords", "result": kws, "metadata": {"confidence": 0.78}} # sentiment auto-detect if re.search(r"\b(sentiment|tone|is this positive|is this negative|what is the sentiment)\b", str(query).lower()): if not allow_tools: return {"action": "sentiment", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}} res = sentiment_analysis(str(query)) return {"action": "sentiment", "result": res, "metadata": {"confidence": 0.8}} if not allow_web_search: return {"action": "web_search", "result": "", "metadata": {"error": "disabled_by_policy", "confidence": 0.0}} res = web_search(str(query), int(args.get("top_k") or 3)) return {"action": "web_search", "result": str(res), "metadata": {"query": str(query), "top_k": int(args.get("top_k") or 3), "confidence": 0.9}} return {"error": "ERROR: could not determine action for universal tool"} def bias_mitigation(text: str) -> dict: """A light-weight bias mitigation helper. The goal: detect and neutralize potentially biased, stereotyping, or discriminatory statements. It's intentionally conservative (favoring suppression) and returns sanitized content and a flag. """ import re if not text or not isinstance(text, str): return {"sanitized": text, "suppressed": False, "reason": None} t = text.strip() # Simple checks for sweeping generalizations towards protected groups # This is a naive approach and can be adapted with an ML classifier. protected_terms = [ r"\b(race|religion|ethnicity|gender|sexual orientation|disability)\b", r"\b(black|white|asian|hispanic|muslim|christian|jewish|gay|lesbian|transgender)\b", ] sweeping_patterns = [ r"\b(all|always|never|every|none)\b[^.?!]{0,60}\b(is|are|will|should|must)\b", r"\b(\w+)s?\b[^.?!]{0,60}\b(are|is)\b[^.?!]{0,80}\b(inferior|superior|stupid|lazy|criminal)\b", ] # Simple profanity or slurs (non-exhaustive) - block slurs = [r"\b(slur1|slur2)\b"] # placeholder; real app should use a curated list for pattern in sweeping_patterns: if re.search(pattern, t, flags=re.I): # ensure it references a protected group before suppressing for pt in protected_terms: if re.search(pt, t, flags=re.I): return {"sanitized": "[content suppressed due to potential bias]", "suppressed": True, "reason": "sweeping_generalization_protected_group"} # If contains slurs -> suppress for s in slurs: if re.search(s, t, flags=re.I): return {"sanitized": "[content suppressed due to policy]", "suppressed": True, "reason": "profanity_or_slur"} # For political content with strong claims, favor neutralization if re.search(r"\b(president|prime minister|dictator|election|vote|politician)\b", t, flags=re.I) and re.search(r"\b(is|are|will|should)\b[^.?!]{0,80}\b(incompetent|corrupt|traitor|criminal)\b", t, flags=re.I): # return a neutral paraphrase where we avoid strong unfounded claims sanitized = re.sub(r"\b(is|are|will|should)\b[^.?!]{0,80}\b(incompetent|corrupt|traitor|criminal)\b", "may have actions that deserve scrutiny", t, flags=re.I) return {"sanitized": sanitized, "suppressed": False, "reason": "political_neutralization"} return {"sanitized": text, "suppressed": False, "reason": None}