import re, os, threading, queue, requests from typing import List, Optional, Union from pydantic import BaseModel, Field from pydantic_settings import BaseSettings from api_types import ChatMessage def parse_think_response(full_response: str): think_start = full_response.find("") if think_end == -1: # 未闭合的情况 reasoning = full_response[think_start:].strip() content = "" else: reasoning = full_response[think_start : think_end + 9].strip() # +9包含完整标签 content = full_response[think_end + 9 :].strip() # 清理标签保留内容 reasoning_content = reasoning.replace("", "").strip() return reasoning_content, content def cleanMessages(messages: List[ChatMessage], removeThinkingContent: bool = False): promptStrList = [] for message in messages: content = message.content.strip() content = re.sub(r"\n+", "\n", content) promptStrList.append( f"{message.role.strip().lower().capitalize()}: {content if message.role.strip().lower().capitalize()!='Assistant' or not removeThinkingContent else remove_nested_think_tags_stack(content)}" ) return "\n\n".join(promptStrList) def remove_nested_think_tags_stack(text): stack = [] result = "" i = 0 while i < len(text): if text[i : i + 7] == "": stack.append("") i += 7 elif text[i : i + 8] == "": if stack and stack[-1] == "": stack.pop() i += 8 else: result += text[i : i + 8] i += 8 elif not stack: result += text[i] i += 1 else: i += 1 return result def format_bytes(size): power = 2**10 n = 0 power_labels = {0: "", 1: "K", 2: "M", 3: "G", 4: "T"} while size > power: size /= power n += 1 return f"{size:.4f}{power_labels[n]+'B'}" LOGGER_QUEUE = queue.Queue(5) def logger(): print("enable") while True: item = LOGGER_QUEUE.get() try: LOG_PORT = os.environ.get("LOG_PORT") if LOG_PORT: requests.post( LOG_PORT, headers={"Content-Type": "application/json"}, json=item, ) except Exception: pass if os.environ.get("LOG_PORT"): threading.Thread(target=logger).start() def log(item): LOGGER_QUEUE.put_nowait(item) def web_search(query: str, top_k: int = 3) -> str: """Perform a simple web search via DuckDuckGo HTML and return top_k results as a combined string. This is a lightweight fallback search that does not call external model services — it queries a public search endpoint, parses titles/snippets/urls and returns them as formatted text to be included into the model's prompt context. """ if not query or query.strip() == "": return "" try: from bs4 import BeautifulSoup except Exception: return "" try: headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"} q = query.strip() resp = requests.get("https://duckduckgo.com/html/", params={"q": q}, headers=headers, timeout=10) soup = BeautifulSoup(resp.text, "html.parser") # DuckDuckGo's html structure: results are in `div.result` containers. results = [] for r in soup.find_all("div", class_="result", limit=top_k): a = r.find("a", class_="result__a") or r.find("a", href=True) title = a.get_text(strip=True) if a else "" href = a.get("href") if a else "" snippet = "" s = r.find("a", class_="result__snippet") or r.find("div", class_="result__snippet") if s: snippet = s.get_text(strip=True) results.append(f"{title} - {snippet} - {href}") return "\n".join(results) except Exception: return "" def calc(expr: str) -> str: """Safely evaluate a simple arithmetic expression and return the result as string. This uses ast parsing to disallow attributes and only permit arithmetic operators. """ try: import ast, operator as op # supported operators allowed_ops = { ast.Add: op.add, ast.Sub: op.sub, ast.Mult: op.mul, ast.Div: op.truediv, ast.Pow: op.pow, ast.BitXor: op.xor, ast.USub: op.neg, ast.Mod: op.mod, ast.FloorDiv: op.floordiv, } def _eval(node): if isinstance(node, ast.Num): # return node.n elif isinstance(node, ast.BinOp): left = _eval(node.left) right = _eval(node.right) op_type = type(node.op) if op_type in allowed_ops: return allowed_ops[op_type](left, right) else: raise ValueError("Unsupported operator") elif isinstance(node, ast.UnaryOp): operand = _eval(node.operand) op_type = type(node.op) if op_type in allowed_ops: return allowed_ops[op_type](operand) raise ValueError("Unsupported unary op") else: raise ValueError("Unsupported expression type") node = ast.parse(expr, mode='eval') result = _eval(node.body) return str(result) except Exception as e: return f"ERROR: {e}" def detect_tools_and_reasoning(text_or_messages) -> dict: """Detects whether web_search, calc, or reasoning are likely needed based on heuristics. Accepts either a single string prompt or a list of ChatMessage. Returns a dict with booleans and detected tools list. """ if isinstance(text_or_messages, list): try: text = "\n\n".join([m.get('content', '') if isinstance(m, dict) else (getattr(m, 'content', '') or '') for m in text_or_messages if m]) except Exception: text = "" else: text = str(text_or_messages or "") t = text.lower() # Simple heuristics need_calc = False need_web_search = False need_reasoning = False need_universal = False detected_tools = [] # Heuristic for calc: presence of operators AND numbers OR keywords 'calculate/compute' plus numeric tokens if (re.search(r"\d+\s*[-+*/%]\s*\d+", t) or (re.search(r"\b(calculate|compute|solve|evaluate|sum|add|subtract|multiply|divide)\b", t) and re.search(r"\d", t))): need_calc = True # Try to extract a most-likely arithmetic expression from the text # Accept digits, parentheses and operators m = re.search(r"([\d\(\)\s+\-*/%^.]+)", text) expr = m.group(0).strip() if m else None # only keep if it includes an operator if expr and not re.search(r"[-+*/%]", expr): expr = None detected_tools.append({"name": "calc", "args": {"expression": expr, "confidence": 0.95 if expr else 0.5}}) # Heuristic for web search: 'who is', 'what is', 'current', 'latest', 'news', or question words with facts # Heuristic for web search: question words + facts or 'current/latest' signals; avoid math queries if ( re.search(r"\b(who is|who's|what is|what's|when is|where is|current|latest|news|is the president|president of|population of|capital of|how many|GDP of)\b", t) and not re.search(r"\d+\s*[-+*/%]\s*\d+", t) ): need_web_search = True detected_tools.append({"name": "web_search", "args": {"query": text, "confidence": 0.9}}) # Heuristic for reasoning: words like 'explain', 'why', 'reason', 'prove', 'derive', 'compare' if re.search(r"\b(explain|why|because|reason|prove|derive|compare|analysis|analysis:|evaluate|argue|consequence|trade-offs)\b", t): need_reasoning = True # Heuristic for universal tool: requests to "use tool", "execute tool", or generic function-call language if re.search(r"\b(use (a )?tool|execute (a )?tool|call (a )?tool|function call|run tool|do this via a tool|invoke tool|call tool)\b", t): need_universal = True # compute confidence summary # For now, we use a simple heuristic: reasoning >0.8 if key words present; web_search 0.9; calc 0.95 if numeric confs = { "calc_confidence": 0.95 if need_calc else 0.0, "web_search_confidence": 0.9 if need_web_search else 0.0, "reasoning_confidence": 0.85 if need_reasoning else 0.0, "universal_confidence": 0.65 if need_universal else 0.0, } return { "need_calc": need_calc, "need_web_search": need_web_search, "need_reasoning": need_reasoning, "need_universal": need_universal, "detected_tools": detected_tools, "confidence": confs, } def ensure_upload_dir(): from config import CONFIG try: os.makedirs(CONFIG.UPLOAD_DIR, exist_ok=True) except Exception: pass from typing import Optional def save_bytes_to_upload(filename: Optional[str], data: bytes) -> dict: from config import CONFIG import hashlib, time, uuid ensure_upload_dir() _id = str(uuid.uuid4()) safe_name = f"{_id}_{os.path.basename(str(filename or 'uploaded_file'))}" path = os.path.join(CONFIG.UPLOAD_DIR, safe_name) try: with open(path, 'wb') as f: f.write(data) size = os.path.getsize(path) import mimetypes mime_type = mimetypes.guess_type(path)[0] return { 'file_id': _id, 'filename': filename, 'path': path, 'mime_type': mime_type, 'size': size, 'uploaded_at': int(time.time()), } except Exception as e: return {'error': str(e)} def file_read_from_path(path: str, max_bytes: int = 100000) -> str: try: if not path or not os.path.exists(path): return "" with open(path, 'rb') as f: b = f.read(max_bytes) try: return b.decode('utf-8', errors='replace') except Exception: return str(b) except Exception: return "" def universal_tool(args: dict, allow_web_search: bool = True, allow_tools: bool = True, allow_file_tool: bool = True) -> dict: """Universal tool: if 'action' is provided, call the corresponding tool; otherwise autodetect using heuristics. Supported actions: 'calc', 'web_search'. If the action is not provided, attempt to detect the appropriate tool. Returns a string result for prompt injection. """ if not isinstance(args, dict): return {"error": "ERROR: invalid args for universal tool"} action = args.get("action") query = args.get("query") # explicit action if action == "calc": if not allow_tools: return {"action": "calc", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}} expr = args.get("expression") or query if not expr: return {"action": "calc", "result": None, "metadata": {"error": "no expression provided", "confidence": 0.0}} res = calc(str(expr)) return {"action": "calc", "result": str(res), "metadata": {"expression": expr, "confidence": 0.98}} if action == "web_search": if not allow_web_search: return {"action": "web_search", "result": "", "metadata": {"error": "disabled_by_policy", "confidence": 0.0}} q = args.get("query") or query if not q: return {"action": "web_search", "result": "", "metadata": {"confidence": 0.0}} res = web_search(str(q), int(args.get("top_k") or 3)) return {"action": "web_search", "result": str(res), "metadata": {"query": q, "top_k": int(args.get("top_k") or 3), "confidence": 0.9}} if action == 'file_read': if not allow_file_tool: return {"action": "file_read", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}} fpath = args.get('path') or args.get('file_path') if not fpath and args.get('file_id'): from config import CONFIG fid = args.get('file_id') if fid: candidate = os.path.join(CONFIG.UPLOAD_DIR, os.path.basename(str(fid))) else: candidate = None if candidate and os.path.exists(candidate): fpath = candidate if not fpath: return {"action": "file_read", "result": None, "metadata": {"error": "no_path_or_id", "confidence": 0.0}} content = file_read_from_path(fpath, int(args.get('max_bytes') or 100000)) return {"action": "file_read", "result": str(content), "metadata": {"path": fpath, "confidence": 0.9}} # auto-detect based on query content if query: # if expression - use calc if re.search(r"\d+\s*[-+*/%]\s*\d+", str(query)): if not allow_tools: return {"action": "calc", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}} res = calc(str(query)) return {"action": "calc", "result": str(res), "metadata": {"expression": str(query), "confidence": 0.95}} # else, web_search if not allow_web_search: return {"action": "web_search", "result": "", "metadata": {"error": "disabled_by_policy", "confidence": 0.0}} res = web_search(str(query), int(args.get("top_k") or 3)) return {"action": "web_search", "result": str(res), "metadata": {"query": str(query), "top_k": int(args.get("top_k") or 3), "confidence": 0.9}} return {"error": "ERROR: could not determine action for universal tool"}