Hhh / utils.py
Ksjsjjdj's picture
Upload 42 files
aed88a2 verified
import re, os, threading, queue, requests
from typing import List, Optional, Union
from pydantic import BaseModel, Field
from pydantic_settings import BaseSettings
from api_types import ChatMessage
def parse_think_response(full_response: str):
think_start = full_response.find("<think")
if think_start == -1:
return None, full_response.strip()
think_end = full_response.find("</think>")
if think_end == -1: # 未闭合的情况
reasoning = full_response[think_start:].strip()
content = ""
else:
reasoning = full_response[think_start : think_end + 9].strip() # +9包含完整标签
content = full_response[think_end + 9 :].strip()
# 清理标签保留内容
reasoning_content = reasoning.replace("<think", "").replace("</think>", "").strip()
return reasoning_content, content
def cleanMessages(messages: List[ChatMessage], removeThinkingContent: bool = False):
promptStrList = []
for message in messages:
content = message.content.strip()
content = re.sub(r"\n+", "\n", content)
promptStrList.append(
f"{message.role.strip().lower().capitalize()}: {content if message.role.strip().lower().capitalize()!='Assistant' or not removeThinkingContent else remove_nested_think_tags_stack(content)}"
)
return "\n\n".join(promptStrList)
def remove_nested_think_tags_stack(text):
stack = []
result = ""
i = 0
while i < len(text):
if text[i : i + 7] == "<think>":
stack.append("<think>")
i += 7
elif text[i : i + 8] == "</think>":
if stack and stack[-1] == "<think>":
stack.pop()
i += 8
else:
result += text[i : i + 8]
i += 8
elif not stack:
result += text[i]
i += 1
else:
i += 1
return result
def format_bytes(size):
power = 2**10
n = 0
power_labels = {0: "", 1: "K", 2: "M", 3: "G", 4: "T"}
while size > power:
size /= power
n += 1
return f"{size:.4f}{power_labels[n]+'B'}"
LOGGER_QUEUE = queue.Queue(5)
def logger():
print("enable")
while True:
item = LOGGER_QUEUE.get()
try:
LOG_PORT = os.environ.get("LOG_PORT")
if LOG_PORT:
requests.post(
LOG_PORT,
headers={"Content-Type": "application/json"},
json=item,
)
except Exception:
pass
if os.environ.get("LOG_PORT"):
threading.Thread(target=logger).start()
def log(item):
LOGGER_QUEUE.put_nowait(item)
def web_search(query: str, top_k: int = 3) -> str:
"""Perform a simple web search via DuckDuckGo HTML and return top_k results as a combined string.
This is a lightweight fallback search that does not call external model services —
it queries a public search endpoint, parses titles/snippets/urls and returns them as
formatted text to be included into the model's prompt context.
"""
if not query or query.strip() == "":
return ""
try:
from bs4 import BeautifulSoup
except Exception:
return ""
try:
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"}
q = query.strip()
resp = requests.get("https://duckduckgo.com/html/", params={"q": q}, headers=headers, timeout=10)
soup = BeautifulSoup(resp.text, "html.parser")
# DuckDuckGo's html structure: results are in `div.result` containers.
results = []
for r in soup.find_all("div", class_="result", limit=top_k):
a = r.find("a", class_="result__a") or r.find("a", href=True)
title = a.get_text(strip=True) if a else ""
href = a.get("href") if a else ""
snippet = ""
s = r.find("a", class_="result__snippet") or r.find("div", class_="result__snippet")
if s:
snippet = s.get_text(strip=True)
results.append(f"{title} - {snippet} - {href}")
return "\n".join(results)
except Exception:
return ""
def calc(expr: str) -> str:
"""Safely evaluate a simple arithmetic expression and return the result as string.
This uses ast parsing to disallow attributes and only permit arithmetic operators.
"""
try:
import ast, operator as op
# supported operators
allowed_ops = {
ast.Add: op.add,
ast.Sub: op.sub,
ast.Mult: op.mul,
ast.Div: op.truediv,
ast.Pow: op.pow,
ast.BitXor: op.xor,
ast.USub: op.neg,
ast.Mod: op.mod,
ast.FloorDiv: op.floordiv,
}
def _eval(node):
if isinstance(node, ast.Num): # <number>
return node.n
elif isinstance(node, ast.BinOp):
left = _eval(node.left)
right = _eval(node.right)
op_type = type(node.op)
if op_type in allowed_ops:
return allowed_ops[op_type](left, right)
else:
raise ValueError("Unsupported operator")
elif isinstance(node, ast.UnaryOp):
operand = _eval(node.operand)
op_type = type(node.op)
if op_type in allowed_ops:
return allowed_ops[op_type](operand)
raise ValueError("Unsupported unary op")
else:
raise ValueError("Unsupported expression type")
node = ast.parse(expr, mode='eval')
result = _eval(node.body)
return str(result)
except Exception as e:
return f"ERROR: {e}"
def detect_tools_and_reasoning(text_or_messages) -> dict:
"""Detects whether web_search, calc, or reasoning are likely needed based on heuristics.
Accepts either a single string prompt or a list of ChatMessage. Returns a dict with booleans and detected tools list.
"""
if isinstance(text_or_messages, list):
try:
text = "\n\n".join([m.get('content', '') if isinstance(m, dict) else (getattr(m, 'content', '') or '') for m in text_or_messages if m])
except Exception:
text = ""
else:
text = str(text_or_messages or "")
t = text.lower()
# Simple heuristics
need_calc = False
need_web_search = False
need_reasoning = False
need_universal = False
detected_tools = []
# Heuristic for calc: presence of operators AND numbers OR keywords 'calculate/compute' plus numeric tokens
if (re.search(r"\d+\s*[-+*/%]\s*\d+", t) or (re.search(r"\b(calculate|compute|solve|evaluate|sum|add|subtract|multiply|divide)\b", t) and re.search(r"\d", t))):
need_calc = True
# Try to extract a most-likely arithmetic expression from the text
# Accept digits, parentheses and operators
m = re.search(r"([\d\(\)\s+\-*/%^.]+)", text)
expr = m.group(0).strip() if m else None
# only keep if it includes an operator
if expr and not re.search(r"[-+*/%]", expr):
expr = None
detected_tools.append({"name": "calc", "args": {"expression": expr, "confidence": 0.95 if expr else 0.5}})
# Heuristic for web search: 'who is', 'what is', 'current', 'latest', 'news', or question words with facts
# Heuristic for web search: question words + facts or 'current/latest' signals; avoid math queries
if (
re.search(r"\b(who is|who's|what is|what's|when is|where is|current|latest|news|is the president|president of|population of|capital of|how many|GDP of)\b", t)
and not re.search(r"\d+\s*[-+*/%]\s*\d+", t)
):
need_web_search = True
detected_tools.append({"name": "web_search", "args": {"query": text, "confidence": 0.9}})
# Heuristic for reasoning: words like 'explain', 'why', 'reason', 'prove', 'derive', 'compare'
if re.search(r"\b(explain|why|because|reason|prove|derive|compare|analysis|analysis:|evaluate|argue|consequence|trade-offs)\b", t):
need_reasoning = True
# Heuristic for universal tool: requests to "use tool", "execute tool", or generic function-call language
if re.search(r"\b(use (a )?tool|execute (a )?tool|call (a )?tool|function call|run tool|do this via a tool|invoke tool|call tool)\b", t):
need_universal = True
# compute confidence summary
# For now, we use a simple heuristic: reasoning >0.8 if key words present; web_search 0.9; calc 0.95 if numeric
confs = {
"calc_confidence": 0.95 if need_calc else 0.0,
"web_search_confidence": 0.9 if need_web_search else 0.0,
"reasoning_confidence": 0.85 if need_reasoning else 0.0,
"universal_confidence": 0.65 if need_universal else 0.0,
}
return {
"need_calc": need_calc,
"need_web_search": need_web_search,
"need_reasoning": need_reasoning,
"need_universal": need_universal,
"detected_tools": detected_tools,
"confidence": confs,
}
def ensure_upload_dir():
from config import CONFIG
try:
os.makedirs(CONFIG.UPLOAD_DIR, exist_ok=True)
except Exception:
pass
from typing import Optional
def save_bytes_to_upload(filename: Optional[str], data: bytes) -> dict:
from config import CONFIG
import hashlib, time, uuid
ensure_upload_dir()
_id = str(uuid.uuid4())
safe_name = f"{_id}_{os.path.basename(str(filename or 'uploaded_file'))}"
path = os.path.join(CONFIG.UPLOAD_DIR, safe_name)
try:
with open(path, 'wb') as f:
f.write(data)
size = os.path.getsize(path)
import mimetypes
mime_type = mimetypes.guess_type(path)[0]
return {
'file_id': _id,
'filename': filename,
'path': path,
'mime_type': mime_type,
'size': size,
'uploaded_at': int(time.time()),
}
except Exception as e:
return {'error': str(e)}
def file_read_from_path(path: str, max_bytes: int = 100000) -> str:
try:
if not path or not os.path.exists(path):
return ""
with open(path, 'rb') as f:
b = f.read(max_bytes)
try:
return b.decode('utf-8', errors='replace')
except Exception:
return str(b)
except Exception:
return ""
def universal_tool(args: dict, allow_web_search: bool = True, allow_tools: bool = True, allow_file_tool: bool = True) -> dict:
"""Universal tool: if 'action' is provided, call the corresponding tool; otherwise autodetect using heuristics.
Supported actions: 'calc', 'web_search'. If the action is not provided, attempt to detect the appropriate tool.
Returns a string result for prompt injection.
"""
if not isinstance(args, dict):
return {"error": "ERROR: invalid args for universal tool"}
action = args.get("action")
query = args.get("query")
# explicit action
if action == "calc":
if not allow_tools:
return {"action": "calc", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}}
expr = args.get("expression") or query
if not expr:
return {"action": "calc", "result": None, "metadata": {"error": "no expression provided", "confidence": 0.0}}
res = calc(str(expr))
return {"action": "calc", "result": str(res), "metadata": {"expression": expr, "confidence": 0.98}}
if action == "web_search":
if not allow_web_search:
return {"action": "web_search", "result": "", "metadata": {"error": "disabled_by_policy", "confidence": 0.0}}
q = args.get("query") or query
if not q:
return {"action": "web_search", "result": "", "metadata": {"confidence": 0.0}}
res = web_search(str(q), int(args.get("top_k") or 3))
return {"action": "web_search", "result": str(res), "metadata": {"query": q, "top_k": int(args.get("top_k") or 3), "confidence": 0.9}}
if action == 'file_read':
if not allow_file_tool:
return {"action": "file_read", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}}
fpath = args.get('path') or args.get('file_path')
if not fpath and args.get('file_id'):
from config import CONFIG
fid = args.get('file_id')
if fid:
candidate = os.path.join(CONFIG.UPLOAD_DIR, os.path.basename(str(fid)))
else:
candidate = None
if candidate and os.path.exists(candidate):
fpath = candidate
if not fpath:
return {"action": "file_read", "result": None, "metadata": {"error": "no_path_or_id", "confidence": 0.0}}
content = file_read_from_path(fpath, int(args.get('max_bytes') or 100000))
return {"action": "file_read", "result": str(content), "metadata": {"path": fpath, "confidence": 0.9}}
# auto-detect based on query content
if query:
# if expression - use calc
if re.search(r"\d+\s*[-+*/%]\s*\d+", str(query)):
if not allow_tools:
return {"action": "calc", "result": None, "metadata": {"error": "disabled_by_policy", "confidence": 0.0}}
res = calc(str(query))
return {"action": "calc", "result": str(res), "metadata": {"expression": str(query), "confidence": 0.95}}
# else, web_search
if not allow_web_search:
return {"action": "web_search", "result": "", "metadata": {"error": "disabled_by_policy", "confidence": 0.0}}
res = web_search(str(query), int(args.get("top_k") or 3))
return {"action": "web_search", "result": str(res), "metadata": {"query": str(query), "top_k": int(args.get("top_k") or 3), "confidence": 0.9}}
return {"error": "ERROR: could not determine action for universal tool"}