#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ AI 自動化研究系統 V7 ─ 歷史紀錄全面改用 Gradio 原生元件 修正要點: 歷史頁移除 JS fetch / gr.HTML 方案,改用 gr.Dataframe + gr.Dropdown + gr.File 下載透過 Python callback 回傳檔案路徑給 gr.File,穩定可靠 FastAPI 路由保留(備用),但 UI 不再依賴它 """ # ============================================================================= # 0. 自動安裝非標準套件 # ============================================================================= import subprocess, sys def _pip(pkg: str): subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", pkg], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) _REQUIRED = ["gradio>=4.0.0", "gradio-client>=0.8.0", "ddgs", "requests", "beautifulsoup4", "lxml", "fastapi", "uvicorn[standard]", "python-multipart"] print("📦 檢查套件依賴...") for _pkg in _REQUIRED: _name = _pkg.split(">=")[0].split("[")[0].replace("-", "_") try: __import__(_name) except ImportError: print(f" 安裝 {_pkg}...") _pip(_pkg) print("✅ 套件就緒\n") # ============================================================================= # 1-11. 研究引擎(原封不動) # ============================================================================= import os, re, time, hashlib, logging, threading, math, json, zipfile, uuid, queue from typing import List, Dict, Optional, Tuple, Set from datetime import datetime from urllib.parse import urlparse, unquote, parse_qs from concurrent.futures import ThreadPoolExecutor, as_completed from collections import Counter, defaultdict import requests from bs4 import BeautifulSoup try: from ddgs import DDGS DDGS_AVAILABLE = True except ImportError: DDGS_AVAILABLE = False try: from gradio_client import Client GRADIO_AVAILABLE = True except ImportError: GRADIO_AVAILABLE = False # ── 日誌 ────────────────────────────────────────────────────────────────────── _LOG_QUEUE: "queue.Queue[str]" = queue.Queue(maxsize=2000) _SESSION_SENTINEL = "\x00SESSION_START\x00" class _QueueHandler(logging.Handler): def emit(self, record): try: _LOG_QUEUE.put_nowait(self.format(record)) except queue.Full: try: _LOG_QUEUE.get_nowait() except queue.Empty: pass _LOG_QUEUE.put_nowait(self.format(record)) _FMT = logging.Formatter('%(asctime)s %(levelname)s [%(funcName)s] %(message)s', datefmt='%H:%M:%S') _qh = _QueueHandler(); _qh.setFormatter(_FMT) logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - [%(funcName)s] - %(message)s', handlers=[logging.StreamHandler()]) logging.getLogger().addHandler(_qh) logger = logging.getLogger(__name__) # ── Config ──────────────────────────────────────────────────────────────────── class Config: MODEL_POOL = [ {"name": "Llama-3.1-8B", "url": "taide/Llama-3.1-TAIDE-LX-8B-Chat", "priority": 1, "max_tokens": 2048}, {"name": "Llama3-8B-Alpha", "url": "taide/Llama3-TAIDE-LX-8B-Chat-Alpha1", "priority": 2, "max_tokens": 2048}, {"name": "TAIDE-7B", "url": "taide/TAIDE-LX-7B-Chat", "priority": 3, "max_tokens": 2048}, {"name": "Gemma-3-12B", "url": "taide/Gemma-3-TAIDE-12b-Chat", "priority": 4, "max_tokens": 4096}, ] PREFERRED_LARGE_MODEL = "Gemma-3-12B" MAX_CONCURRENT_LLM_CALLS = 2 MAX_CONCURRENT_FETCH = 8 MAX_CONCURRENT_SEARCH = 3 RETRY_ON_FAILURE = 2 ENABLE_ROUND_ROBIN = True NUM_SUBTOPICS = 5 QUERIES_PER_SUBTOPIC = 3 NUM_RESEARCH_QUESTIONS = 8 PAGES_PER_QUERY = 3 MAX_SEARCH_ATTEMPTS = 12 MAX_RESULTS_PER_SEARCH = 25 MIN_BODY_LENGTH = 80 FETCH_RESERVE_RATIO = 3 MIN_FULL_CONTENT_LENGTH = 300 MIN_SNIPPET_FALLBACK_LEN = 200 SOURCE_QUALITY_THRESHOLD = 3 MAX_SOURCES_PER_DOMAIN = 3 MIN_CONTENT_LENGTH = 150 LLM_CALL_INTERVAL = 1.5 SUMMARY_LENGTH = 600 SUMMARY_LENGTH_FLEXIBILITY = 0.2 MAX_NEW_TOKENS = 1800 CONTEXT_MAX_LENGTH = 12000 CHUNK_SIZE = 1000 TOP_K_CHUNKS = 6 SECTION_TARGET_WORDS = 450 SECTION_MAX_TOKENS = 1200 ENABLE_DRAFT_CRITIQUE = False ENABLE_FACT_VALIDATION = True SEARCH_DELAY = 1.0 TIMEOUT_SECONDS = 25 USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" DEDUP_SIMILARITY_THRESHOLD = 0.82 PLAN_TEMPERATURE = 0.9 SUMMARY_TEMPERATURE = 0.7 SCORE_TEMPERATURE = 0.3 REPORT_TEMPERATURE = 0.6 CRITIQUE_TEMPERATURE = 0.5 OUTPUT_DIR = "./sessions" SAVE_INTERMEDIATE = True @classmethod def get_summary_word_range(cls): lo = int(cls.SUMMARY_LENGTH * (1 - cls.SUMMARY_LENGTH_FLEXIBILITY)) hi = int(cls.SUMMARY_LENGTH * (1 + cls.SUMMARY_LENGTH_FLEXIBILITY)) return f"{lo}-{hi} 字" @classmethod def validate(cls): return bool(cls.MODEL_POOL) # ── TokenTracker ────────────────────────────────────────────────────────────── class TokenTracker: def __init__(self): self.total_input = self.total_output = self.call_count = 0 self.model_stats: Dict[str, Dict] = {} self.call_details: List[Dict] = [] self._lock = threading.Lock() def record(self, model, inp, out, call_type, success=True): itok = self._est(inp); otok = self._est(out) if success else 0 with self._lock: self.total_input += itok; self.total_output += otok; self.call_count += 1 s = self.model_stats.setdefault(model, {"calls":0,"input":0,"output":0,"failures":0}) s["calls"] += 1; s["input"] += itok if success: s["output"] += otok else: s["failures"] += 1 self.call_details.append({"model":model,"type":call_type,"in":itok,"out":otok,"ok":success}) def _est(self, text): if not text: return 0 zh = len(re.findall(r'[\u4e00-\u9fff]', text)) en = len(re.findall(r'[a-zA-Z]', text)) return max(1, int(zh/1.5 + en/4 + (len(text)-zh-en)/3)) def total(self): return self.total_input + self.total_output def summary(self): lines = [f"總呼叫:{self.call_count} | 總 Token:{self.total():,}"] for m, s in self.model_stats.items(): lines.append(f" • {m}: {s['input']+s['output']:,} Token (呼叫:{s['calls']} 失敗:{s['failures']})") return "\n".join(lines) def detailed_report(self): lines = ["=== Token 明細 ==="] for i, c in enumerate(self.call_details, 1): lines.append(f"{i}. {'✅' if c['ok'] else '❌'} [{c['model']}/{c['type']}] in:{c['in']:,} out:{c['out']:,}") lines.append(f"\n總計:{self.total():,} Token") return "\n".join(lines) # ── CircuitBreaker & ApiSpec ────────────────────────────────────────────────── import random as _random class _CircuitBreaker: THRESHOLD = 5; RESET_SECS = 90 def __init__(self, name): self.name = name; self._lock = threading.Lock() self._failures = 0; self._open_until = 0.0 @property def is_open(self): with self._lock: if self._open_until == 0: return False if time.time() >= self._open_until: self._failures = 0; self._open_until = 0 logger.info(f"🔄 熔斷器重置:{self.name}"); return False return True def record_success(self): with self._lock: self._failures = 0; self._open_until = 0 def record_failure(self): with self._lock: self._failures += 1 if self._failures >= self.THRESHOLD: self._open_until = time.time() + self.RESET_SECS logger.warning(f"⚡ 熔斷:{self.name} ({self._failures}次)") def failure_count(self): with self._lock: return self._failures class _ApiSpec: API_NAME_CANDIDATES = ["/chat", "/predict", "/run/predict", "/infer"] PARAM_TEMPLATES = [ lambda m,t,x: dict(message=m, temperature=t, max_new_tokens=x), lambda m,t,x: dict(input=m, temperature=t, max_new_tokens=x), lambda m,t,x: dict(message=m), lambda m,t,x: dict(prompt=m, temperature=t, max_tokens=x), lambda m,t,x: (m,), ] def __init__(self, name): self.model_name = name; self._lock = threading.Lock() self._discovered = False; self.api_name = None self.param_fn = None; self.use_positional = False def mark_discovered(self, api_name, param_fn, use_positional=False): with self._lock: self.api_name = api_name; self.param_fn = param_fn self.use_positional = use_positional; self._discovered = True logger.info(f"✅ API 探索:{self.model_name} → {api_name}") @property def ready(self): with self._lock: return self._discovered # ── MultiModelClient ────────────────────────────────────────────────────────── class MultiModelClient: def __init__(self, model_configs, tracker): self.tracker = tracker self._configs = {m["name"]: m for m in model_configs} self._priority = {m["name"]: m["priority"] for m in model_configs} self._max_tokens = {m["name"]: m["max_tokens"] for m in model_configs} self._breakers = {m["name"]: _CircuitBreaker(m["name"]) for m in model_configs} self._specs = {m["name"]: _ApiSpec(m["name"]) for m in model_configs} self._local = threading.local() self._rr_counter = 0; self._rr_lock = threading.Lock() logger.info(f"✅ MultiModelClient 初始化:{len(self._configs)} 個模型") def _thread_client(self, name, force_new=False): if not hasattr(self._local, "clients"): self._local.clients = {} if force_new or name not in self._local.clients: cfg = self._configs.get(name) if not cfg: return None try: c = Client(cfg["url"]); self._local.clients[name] = c; return c except Exception as e: logger.warning(f"⚠️ {name} Client 建立失敗:{e}") self._local.clients.pop(name, None); return None return self._local.clients[name] def _discover_api(self, name, client): spec = self._specs[name] if spec.ready: return True probe = "你好"; discovered = [] try: info = client.view_api(print_info=False, return_format="dict") discovered = [k for k in info if isinstance(info[k], dict)] except Exception: pass candidates = list(dict.fromkeys(discovered + _ApiSpec.API_NAME_CANDIDATES)) for api_name in candidates: for i, pfn in enumerate(_ApiSpec.PARAM_TEMPLATES): try: params = pfn(probe, 0.5, 10) r = client.predict(*params, api_name=api_name) if isinstance(params, tuple) \ else client.predict(**params, api_name=api_name) if r is not None: spec.mark_discovered(api_name, pfn, isinstance(params, tuple)); return True except Exception as e: if "Cannot find" in str(e) or "not found" in str(e).lower(): break return False def _call_once(self, name, msg, temp, max_tok, force_new=False): client = self._thread_client(name, force_new) if not client: return False, "", "connection" spec = self._specs[name] if not spec.ready: if not self._discover_api(name, client): return False, "", "api" try: params = spec.param_fn(msg, temp, max_tok) r = client.predict(*params, api_name=spec.api_name) if spec.use_positional or isinstance(params, tuple) \ else client.predict(**params, api_name=spec.api_name) result = str(r).strip() if r is not None else "" if not result: return False, "", "empty" return True, result, "" except Exception as e: err = str(e) conn = any(k.lower() in err.lower() for k in ["Connection","WebSocket","timeout","SSL","BrokenPipe"]) if conn: self._local.clients.pop(name, None) return False, "", "connection" if conn else "api" def _call_model(self, name, msg, temp, max_tok): for attempt in range(3): ok, result, cat = self._call_once(name, msg, temp, max_tok, attempt > 0) if ok: self._breakers[name].record_success(); return True, result if cat == "api" and attempt == 0: self._specs[name] = _ApiSpec(name) if attempt < 2: time.sleep((2**attempt) + _random.uniform(0, 1)) self._breakers[name].record_failure(); return False, "" def _model_order(self, preferred=None): all_names = sorted(self._priority, key=lambda x: self._priority[x]) healthy = [n for n in all_names if not self._breakers[n].is_open] unhealthy = [n for n in all_names if self._breakers[n].is_open] if not healthy: healthy = sorted(all_names, key=lambda n: self._breakers[n].failure_count()) if preferred and preferred in healthy: order = [preferred] + [n for n in healthy if n != preferred] elif Config.ENABLE_ROUND_ROBIN and len(healthy) > 1: with self._rr_lock: idx = self._rr_counter % len(healthy); self._rr_counter += 1 order = [healthy[idx]] + [n for n in healthy if n != healthy[idx]] else: order = healthy return order + unhealthy def chat(self, message, temperature=0.8, max_tokens=None, call_type="general", preferred=None): with self._rr_lock: now = time.time() if not hasattr(self, '_last_call_time'): self._last_call_time = 0.0 elapsed = now - self._last_call_time if elapsed < Config.LLM_CALL_INTERVAL: time.sleep(Config.LLM_CALL_INTERVAL - elapsed) self._last_call_time = time.time() for name in self._model_order(preferred): if self._breakers[name].is_open: continue mt = max_tokens or self._max_tokens.get(name, Config.MAX_NEW_TOKENS) ok, result = self._call_model(name, message, temperature, mt) self.tracker.record(name, message, result if ok else "", call_type, ok) if ok and result: return result logger.error(f"❌ 所有模型失敗 call_type={call_type}") return "[錯誤] 無法取得回應" def health_report(self): lines = ["=== 模型健康狀態 ==="] for name in sorted(self._priority, key=lambda x: self._priority[x]): cb = self._breakers[name]; sp = self._specs[name] status = "🔴 熔斷中" if cb.is_open else "🟢 正常" api = sp.api_name if sp.ready else "(未探索)" lines.append(f" {status} {name}|{api}|失敗:{cb.failure_count()}") return "\n".join(lines) # ── ContentDeduplicator ─────────────────────────────────────────────────────── class ContentDeduplicator: IGNORED_PARAMS = {'utm_source','utm_medium','ref','fbclid','gclid','si'} def __init__(self): self.seen_fps: Set[str] = set() self.seen_urls: Set[str] = set() self.domain_counts: Dict[str, int] = defaultdict(int) self.filtered = 0; self._lock = threading.Lock() def _norm_url(self, url): try: p = urlparse(url); domain = re.sub(r'^[a-z]{2}\.', '', p.netloc.lower()) path = unquote(p.path.rstrip('/')) if p.query: q = {k:v for part in p.query.split('&') if '=' in part for k,v in [part.split('=',1)] if k not in self.IGNORED_PARAMS} qs = '&'.join(f"{k}={v}" for k,v in sorted(q.items())) else: qs = "" return f"{domain}{path}?{qs}" if qs else f"{domain}{path}" except: return url.lower().strip() def _fp(self, url, title, content): ch = hashlib.md5(re.sub(r'\s+', ' ', content[:300]).lower().encode()).hexdigest()[:10] return f"{self._norm_url(url)}|{re.sub(r'[^\\w\\u4e00-\\u9fff]+','',title.lower())}|{ch}" def _sim(self, f1, f2): p1, p2 = f1.split('|'), f2.split('|') if len(p1) != 3 or len(p2) != 3: return 0.0 u1,t1,c1 = p1; u2,t2,c2 = p2 if u1 == u2: return 1.0 if t1 == t2 and c1 == c2: return 0.95 if c1 == c2: return 0.75 return 0.0 def get_domain(self, url): try: return urlparse(url).netloc.lower() except: return url def is_duplicate(self, url, title, content): with self._lock: norm = url.lower().strip() if norm in self.seen_urls: return True, "URL重複" domain = self.get_domain(url) if self.domain_counts[domain] >= Config.MAX_SOURCES_PER_DOMAIN: return True, "域名超額" new_fp = self._fp(url, title, content) for fp in self.seen_fps: if self._sim(new_fp, fp) >= Config.DEDUP_SIMILARITY_THRESHOLD: return True, "內容重複" if 'wikipedia.org' in url.lower(): m = re.search(r'wikipedia\.org/wiki/([^/#?]+)', url) if m: art = m.group(1).lower() for su in self.seen_urls: sm = re.search(r'wikipedia\.org/wiki/([^/#?]+)', su) if sm and sm.group(1).lower() == art: return True, "維基跨語言" self.seen_fps.add(new_fp); self.seen_urls.add(norm) self.domain_counts[domain] += 1 return False, "" # ── SmartContextBuilder ─────────────────────────────────────────────────────── class SmartContextBuilder: def __init__(self, chunk_size=Config.CHUNK_SIZE): self.chunk_size = chunk_size def _tokenize(self, text): zh = list(re.findall(r'[\u4e00-\u9fff]', text)) en = re.findall(r'[a-zA-Z]{2,}', text.lower()) bg = [zh[i]+zh[i+1] for i in range(len(zh)-1)] return zh + bg + en def _tf(self, toks): cnt = Counter(toks); total = max(len(toks), 1) return {t: c/total for t, c in cnt.items()} def _idf(self, chunks_tokens, all_terms): N = len(chunks_tokens) return {t: math.log((N+1)/(sum(1 for ts in chunks_tokens if t in ts)+1))+1 for t in all_terms} def _chunk(self, text): sentences = re.split(r'[。!?\n]', text) chunks, cur = [], "" for s in sentences: s = s.strip() if not s: continue if len(cur)+len(s) > self.chunk_size and cur: chunks.append(cur.strip()); cur = s else: cur += s + "。" if cur.strip(): chunks.append(cur.strip()) return chunks def build_ranked_context(self, sources, query, top_k=Config.TOP_K_CHUNKS): query_tokens = set(self._tokenize(query)) all_chunks: List[Tuple[str,str,List[str]]] = [] for src in sources: full = src.get("full_content") or src.get("content", "") label = f"[{src.get('title','')[:30]}]" for chunk in self._chunk(full): toks = self._tokenize(chunk) all_chunks.append((chunk, label, toks)) if not all_chunks: return "" all_terms = set(); [all_terms.update(t) for _,_,t in all_chunks] idf = self._idf([t for _,_,t in all_chunks], all_terms) scored = [] for text, label, toks in all_chunks: if len(text) < 50: continue tf = self._tf(toks) score = sum(tf.get(t,0)*idf.get(t,0) for t in query_tokens) scored.append((score, text, label)) scored.sort(key=lambda x: -x[0]) parts = [f"--- 片段{i} {label} ---\n{text}" for i,(sc,text,label) in enumerate(scored[:top_k],1)] return "\n\n".join(parts)[:Config.CONTEXT_MAX_LENGTH] # ── SourceQualityScorer ─────────────────────────────────────────────────────── class SourceQualityScorer: TRUSTED = ['wikipedia.org','britannica.com','reuters.com','bbc.com','nytimes.com', 'myanimelist.net','anilist.co','animenewsnetwork.com','crunchyroll.com'] SPAM = ['click here','buy now','free download','立即購買','免費下載','casino','博弈'] NAV = ['search results','page not found','404','搜尋結果','頁面不存在','login required'] def score_source(self, source, topic): score = 5 content = (source.get("full_content") or source.get("content","")).strip() title = source.get("title","").strip() url = source.get("url","").lower() cl = len(content) if cl >= 3000: score += 2 elif cl >= 1000: score += 1 elif cl < 150: score -= 2 tt = set(re.findall(r'[\w\u4e00-\u9fff]{2,}', topic.lower())) tit = set(re.findall(r'[\w\u4e00-\u9fff]{2,}', title.lower())) score += min(len(tt & tit), 2) ct = set(re.findall(r'[\w\u4e00-\u9fff]{2,}', content.lower())) if len(tt & ct) >= len(tt)*0.8: score += 1 elif len(tt & ct) == 0: score -= 1 if any(d in url for d in self.TRUSTED): score += 1 if any(s in content.lower()[:500] for s in self.SPAM): score -= 3 if any(s in content.lower()[:200] for s in self.NAV): score -= 2 sents = [s for s in re.split(r'[。!?.!?\n]', content) if len(s.strip()) > 20] if len(sents) >= 5: score += 1 return max(0, min(10, score)) def batch_score(self, sources, topic): scored = [(s, self.score_source(s, topic)) for s in sources] high = sum(1 for _,sc in scored if sc >= Config.SOURCE_QUALITY_THRESHOLD) logger.info(f" 📊 評分完成:{len(scored)} 筆,{high} 筆通過閾值") return scored # ── AggressiveSearcher ──────────────────────────────────────────────────────── class AggressiveSearcher: def __init__(self, dedup): self.dedup = dedup; self.ddgs = None if DDGS_AVAILABLE: try: self.ddgs = DDGS(); logger.info("✅ DDGS 初始化") except Exception as e: logger.warning(f"⚠️ DDGS:{e}") def _ddgs_search(self, query, n): if not self.ddgs: return [] for backend in ["html", "lite"]: try: raw = list(self.ddgs.text(query=query, region="us-en", safesearch="off", backend=backend, max_results=n)) out = [] for r in raw: url = r.get("href") or r.get("link") or r.get("url","") title = r.get("title") or r.get("headline","無標題") body = r.get("body") or r.get("snippet") or r.get("description","") if not url or len(body) < Config.MIN_BODY_LENGTH: continue if any(x in url.lower() for x in ['bing.com/search','duckduckgo.com/html']): continue out.append({"url":url,"title":title,"content":body[:3000], "timestamp":datetime.now().isoformat()}) if out: return out except Exception as e: logger.warning(f"[{backend}] {str(e)[:80]}") return [] def _fallback_search(self, query, n): try: r = requests.post("https://html.duckduckgo.com/html/", data={"q":query,"kl":"us-en"}, headers={"User-Agent":Config.USER_AGENT}, timeout=Config.TIMEOUT_SECONDS, verify=True) r.raise_for_status() soup = BeautifulSoup(r.text, "html.parser"); out = [] for item in soup.find_all("div", class_="result")[:n]: ta = item.find("a", class_="result__a"); sa = item.find("a", class_="result__snippet") if not ta: continue title = ta.get_text(strip=True); href = ta.get("href","") if href.startswith("/l/?"): params = parse_qs(urlparse(href).query); url = params.get("uddg",[None])[0] if not url: continue else: url = href body = sa.get_text(strip=True) if sa else "" if not url or len(body) < Config.MIN_BODY_LENGTH: continue out.append({"url":url,"title":title,"content":body[:3000], "timestamp":datetime.now().isoformat()}) return out except Exception as e: logger.error(f"[備用搜尋] {e}"); return [] def fetch_full_content(self, url): try: r = requests.get(url, headers={"User-Agent":Config.USER_AGENT}, timeout=Config.TIMEOUT_SECONDS, verify=True) r.raise_for_status() soup = BeautifulSoup(r.content, "lxml") for tag in soup(['script','style','nav','footer','header','aside']): tag.decompose() main = soup.find('article') or soup.find('main') or \ soup.find('div', class_=re.compile(r'content|article|post|body', re.I)) target = main if main else soup text = target.get_text(separator='\n') lines = [l.strip() for l in text.split('\n') if l.strip() and len(l.strip()) > 10] return '\n'.join(lines[:200]) if lines else None except: return None def search_candidates(self, query, target, label=""): need = target * Config.FETCH_RESERVE_RATIO valid: List[Dict] = []; attempt = 0; seen: Set[str] = set() while len(valid) < need and attempt < Config.MAX_SEARCH_ATTEMPTS: attempt += 1 raw = self._ddgs_search(query, max(need*2, Config.MAX_RESULTS_PER_SEARCH)) if not raw: raw = self._fallback_search(query, Config.MAX_RESULTS_PER_SEARCH) if not raw: time.sleep(Config.SEARCH_DELAY); continue for item in raw: if item['url'].lower() in seen: continue dup, _ = self.dedup.is_duplicate(item['url'], item['title'], item['content']) if dup: continue valid.append(item); seen.add(item['url'].lower()) if len(valid) >= need: break time.sleep(Config.SEARCH_DELAY) return valid def search_with_fetch_fallback(self, query, target, label=""): candidates = self.search_candidates(query, target, label) if not candidates: return [] confirmed: List[Dict] = [] for i, src in enumerate(candidates): if len(confirmed) >= target: break full = self.fetch_full_content(src["url"]) if full and len(full) >= Config.MIN_FULL_CONTENT_LENGTH: src["full_content"] = full; src["fetch_status"] = "full"; confirmed.append(src) logger.info(f" ✅ [{label}] ({len(confirmed)}/{target}) 全文成功 {src['url'][:55]}") elif len(src.get("content","")) >= Config.MIN_SNIPPET_FALLBACK_LEN: src["full_content"] = src["content"]; src["fetch_status"] = "snippet_fallback" confirmed.append(src) logger.info(f" ⚠️ [{label}] ({len(confirmed)}/{target}) snippet降級 {src['url'][:55]}") else: logger.info(f" ❌ [{label}] 跳過,備援池剩 {len(candidates)-i-1}") return confirmed def search_multi_query(self, queries, pages_per_query): all_results: List[Dict] = [] with ThreadPoolExecutor(max_workers=Config.MAX_CONCURRENT_SEARCH) as ex: futures = {ex.submit(self.search_with_fetch_fallback,q,pages_per_query,f"Q{i}"):q for i,q in enumerate(queries)} for f in as_completed(futures): try: all_results.extend(f.result()) except Exception as e: logger.warning(f" ⚠️ 查詢失敗:{e}") return all_results # ── ResearchPlanner ─────────────────────────────────────────────────────────── class ResearchPlanner: def __init__(self, llm): self.llm = llm def generate_plan(self, topic): logger.info("📋 生成研究計劃...") prompt = f"""你是一位專業研究規劃師。請為以下主題制定研究計劃。 研究主題:{topic} 請輸出以下格式(嚴格遵守,每行一條): === 子主題 === [列出 {Config.NUM_SUBTOPICS} 個需要深入研究的子主題,每行一個] === 研究問題 === [列出 {Config.NUM_RESEARCH_QUESTIONS} 個具體的研究問題,每行一個] === 搜尋查詢 === [列出 {Config.NUM_SUBTOPICS * Config.QUERIES_PER_SUBTOPIC} 條多樣化的搜尋查詢(繁體/簡體/英文混合),每行一個]""" raw = self.llm.chat(prompt, temperature=Config.PLAN_TEMPERATURE, max_tokens=2000, call_type="research_plan", preferred=Config.PREFERRED_LARGE_MODEL) return self._parse_plan(raw, topic) def _parse_section(self, text, header): m = re.search(rf'=== {re.escape(header)} ===(.*?)(?====|$)', text, re.DOTALL) if not m: return [] return [l.strip() for l in m.group(1).split('\n') if l.strip() and not l.strip().startswith('[') and len(l.strip()) > 3] def _parse_plan(self, raw, topic): subtopics = self._parse_section(raw, "子主題") questions = self._parse_section(raw, "研究問題") queries = self._parse_section(raw, "搜尋查詢") if not subtopics: subtopics = [f"{topic}的歷史背景",f"{topic}的主要特徵", f"{topic}的影響與意義",f"{topic}的發展現況",f"{topic}的未來展望"] if not questions: questions = [f"{topic}是什麼?",f"{topic}的起源?",f"{topic}的重要特點?",f"{topic}的社會影響?"] if not queries: queries = [topic] + subtopics[:Config.NUM_SUBTOPICS] logger.info(f"✅ 計劃:{len(subtopics)} 子主題,{len(questions)} 問題,{len(queries)} 查詢") return {"subtopics":subtopics,"questions":questions,"queries":queries} # ── ResearchSystem ──────────────────────────────────────────────────────────── class ResearchSystem: def __init__(self): if not GRADIO_AVAILABLE: raise ImportError("gradio_client 未安裝") self.tracker = TokenTracker() self.dedup = ContentDeduplicator() self.llm = MultiModelClient(Config.MODEL_POOL, self.tracker) self.searcher = AggressiveSearcher(self.dedup) self.planner = ResearchPlanner(self.llm) self.scorer = SourceQualityScorer() self.ctx_builder = SmartContextBuilder() self.all_sources: List[Dict] = [] self._src_lock = threading.Lock() def phase0_plan(self, topic): logger.info("="*60); logger.info("📌 PHASE 0:研究規劃") return self.planner.generate_plan(topic) def phase1_search_and_fetch(self, topic, plan): logger.info("="*60); logger.info("📌 PHASE 1:搜尋+備援抓取+評分") all_q = [topic] + plan["queries"] seen_q: Set[str] = set(); uq = [] for q in all_q: if q.lower() not in seen_q: seen_q.add(q.lower()); uq.append(q) logger.info(f" 🔍 並行執行 {len(uq)} 條查詢...") raw = self.searcher.search_multi_query(uq, Config.PAGES_PER_QUERY) logger.info(f" 📄 備援抓取後:{len(raw)} 篇") if not raw: raise RuntimeError("❌ Phase 1 搜尋無結果") scored = self.scorer.batch_score(raw, topic) quality = [(s,sc) for s,sc in scored if sc >= Config.SOURCE_QUALITY_THRESHOLD] quality.sort(key=lambda x: -x[1]) filtered = [s for s,_ in quality] or raw logger.info(f" ✅ 品質過濾:{len(filtered)}/{len(raw)} 篇保留") with self._src_lock: self.all_sources.extend(filtered) return filtered def phase2_subtopic_analysis(self, topic, sources, plan): logger.info("="*60); logger.info("📌 PHASE 2:子主題深度分析") summaries: Dict[str,str] = {} def analyze(st): ctx = self.ctx_builder.build_ranked_context(sources, f"{topic} {st}", top_k=Config.TOP_K_CHUNKS) lo = int(Config.SUMMARY_LENGTH*(1-Config.SUMMARY_LENGTH_FLEXIBILITY)) hi = int(Config.SUMMARY_LENGTH*(1+Config.SUMMARY_LENGTH_FLEXIBILITY)) prompt = f"""你是一位專業研究員。深度分析主題:{topic} ─ 子主題:{st} 【分析要求】 1. 具體論點 + 資料事實支撐 2. 分析深度:{lo}-{hi} 字,繁體中文 3. 結構:背景→核心→細節→小結 4. 指出不同觀點或爭議(如有) 【相關資料片段】 {ctx} 請輸出針對「{st}」的深度分析:""" r = self.llm.chat(prompt, temperature=Config.SUMMARY_TEMPERATURE, max_tokens=Config.MAX_NEW_TOKENS, call_type=f"subtopic_{st[:20]}") logger.info(f" ✅ 子主題完成:{st[:30]} ({len(r)} 字元)") return st, r with ThreadPoolExecutor(max_workers=Config.MAX_CONCURRENT_LLM_CALLS) as ex: futures = {ex.submit(analyze, st): st for st in plan["subtopics"]} for f in as_completed(futures): try: st, r = f.result(); summaries[st] = r except Exception as e: logger.warning(f" ⚠️ {futures[f]}:{e}") logger.info(f"[Phase 2] ✅ 完成 {len(summaries)} 個子主題") return summaries def phase3_fact_validation(self, topic, subtopic_summaries, sources): if not Config.ENABLE_FACT_VALIDATION: return "" logger.info("="*60); logger.info("📌 PHASE 3:事實驗證") top2 = dict(list(subtopic_summaries.items())[:2]) st = "\n\n".join([f"【{k}】\n{v[:600]}" for k,v in top2.items()]) ctx = self.ctx_builder.build_ranked_context(sources, topic, top_k=4) prompt = f"""請核查以下研究摘要的準確性。 主題:{topic} 【待核查內容】 {st} 【原始資料】 {ctx[:2000]} 請列出: 1. 已確認的重要事實(3-5條) 2. 有疑問或需補充的地方(2-3條) 格式:✅ 確認:[事實] / ⚠️ 待查:[問題]""" r = self.llm.chat(prompt, temperature=Config.CRITIQUE_TEMPERATURE, max_tokens=Config.MAX_NEW_TOKENS, call_type="fact_validation") logger.info(f"[Phase 3] ✅ 驗證完成 ({len(r)} 字元)") return r def _build_outline(self, topic, plan): st = plan["subtopics"]; qs = plan["questions"] roman = ["一","二","三","四","五","六","七","八","九","十","十一","十二","十三","十四","十五"] out = [] out.append({"id":"abstract","heading":"## 一、執行摘要","title":"執行摘要","query":topic, "template":f"請為主題「{topic}」撰寫學術性執行摘要。\n涵蓋:研究背景、核心發現、主要結論。\n約 {Config.SECTION_TARGET_WORDS} 字,段落敘述。", "top_k":4,"max_tokens":Config.SECTION_MAX_TOKENS}) out.append({"id":"background","heading":"## 二、研究背景與目的","title":"研究背景與目的", "query":f"{topic} 歷史背景起源", "template":f"請說明「{topic}」的研究背景:\n1. 起源與發展脈絡\n2. 研究重要性\n3. 本報告研究目的\n約 {Config.SECTION_TARGET_WORDS} 字。", "top_k":5,"max_tokens":Config.SECTION_MAX_TOKENS}) for i,s in enumerate(st): rn = roman[i+2] if i+2 < len(roman) else str(i+3) out.append({"id":f"subtopic_{i}","heading":f"## {rn}、{s}","title":s, "query":f"{topic} {s}", "template":f"請深入分析「{topic}」中關於「{s}」的面向。\n1. 具體論點+資料事實\n2. 重要性與影響\n3. 不同觀點(如有)\n約 {Config.SECTION_TARGET_WORDS} 字,學術論文語氣。", "top_k":Config.TOP_K_CHUNKS,"max_tokens":Config.SECTION_MAX_TOKENS}) rn_qa = roman[len(st)+2] if len(st)+2 < len(roman) else "問題" ql = "\n".join([f"• {q}" for q in qs[:5]]) out.append({"id":"qa","heading":f"## {rn_qa}、核心問題解答","title":"核心問題解答","query":topic, "template":f"請針對以下研究問題,逐一簡明回答:\n{ql}\n每題 2-4 句,總計約 {Config.SECTION_TARGET_WORDS} 字。", "top_k":5,"max_tokens":Config.SECTION_MAX_TOKENS}) rn_d = roman[len(st)+3] if len(st)+3 < len(roman) else "討論" out.append({"id":"discussion","heading":f"## {rn_d}、綜合討論","title":"綜合討論", "query":f"{topic} 影響 分析", "template":f"請對「{topic}」各面向進行綜合討論:\n1. 各子主題間相互關係\n2. 研究侷限與未解問題\n3. 與相關領域比較\n約 {Config.SECTION_TARGET_WORDS} 字。", "top_k":5,"max_tokens":Config.SECTION_MAX_TOKENS}) rn_c = roman[len(st)+4] if len(st)+4 < len(roman) else "結論" out.append({"id":"conclusion","heading":f"## {rn_c}、結論與未來展望","title":"結論與未來展望", "query":f"{topic} 結論 未來 展望", "template":f"請撰寫「{topic}」研究的結論:\n1. 核心發現摘要(3-4點)\n2. 未來發展方向\n3. 對讀者的建議\n約 {Config.SECTION_TARGET_WORDS} 字。", "top_k":4,"max_tokens":Config.SECTION_MAX_TOKENS}) return out def _generate_section(self, section, topic, sources, prev_summary, validation_notes): ctx = self.ctx_builder.build_ranked_context(sources, section["query"], top_k=section["top_k"]) ctx = ctx[:Config.CONTEXT_MAX_LENGTH-800] prev = f"\n【前節已涵蓋(本節請勿重複)】\n{prev_summary[:300]}\n" if prev_summary else "" val = f"\n【已驗證關鍵事實】\n{validation_notes[:400]}\n" \ if validation_notes and section["id"] in ("abstract","background") else "" prompt = f"""你是一位專業學術研究員,正在撰寫關於「{topic}」的小論文。 {prev}{val} 【參考資料(請從中提取具體事實)】 {ctx} 【本節撰寫任務】 {section["template"]} 注意: - 使用繁體中文,段落式學術寫作 - 必須引用資料中的具體內容(人名、數字、事件) - 不要重複前節,不要輸出標題行 開始撰寫:""" r = self.llm.chat(prompt, temperature=Config.REPORT_TEMPERATURE, max_tokens=section["max_tokens"], call_type=f"section_{section['id']}") lines = r.strip().split("\n") cleaned = [l for l in lines if not (l.strip().startswith("#") and section["title"] in l)] return "\n".join(cleaned).strip() def phase4_generate_report(self, topic, plan, subtopic_summaries, validation_notes): logger.info("="*60); logger.info("📌 PHASE 4:分節模板式報告生成") outline = self._build_outline(topic, plan); total = len(outline) logger.info(f" 📋 大綱:{total} 節") sections_output: List[Tuple[str,str]] = []; prev = "" for i, sec in enumerate(outline, 1): logger.info(f" ✍️ [{i}/{total}] {sec['title']}") aug = list(self.all_sources) for st, summ in subtopic_summaries.items(): aug.append({"url":f"internal://{st}","title":f"子主題:{st}", "content":summ,"full_content":summ,"fetch_status":"full"}) content = self._generate_section(sec, topic, aug, prev, validation_notes) if not content or content.startswith("[錯誤]"): content = subtopic_summaries.get(sec["title"], f"({sec['title']} 生成失敗)") sections_output.append((sec["heading"], content)) logger.info(f" ✅ [{i}/{total}] {sec['title']} ({len(content)} 字元)") prev = content[:300] body = f"# {topic}:深度研究報告\n" + "".join(f"\n{h}\n\n{c}" for h,c in sections_output) body = self._strip_llm_references(body) logger.info(f"[Phase 4] ✅ {total} 節,{sum(len(c) for _,c in sections_output):,} 字元") return body def _build_references_section(self, sources): seen: Set[str] = set(); unique: List[Dict] = [] for s in sources: if s["url"].lower().strip() not in seen: unique.append(s); seen.add(s["url"].lower().strip()) today = datetime.now().strftime("%Y-%m-%d") lines = ["","---","","## 參考資料","", f"> 本報告共引用 {len(unique)} 筆資料來源,存取日期:{today}",""] for i, src in enumerate(unique, 1): title = src.get("title","(無標題)").strip() url = src.get("url","").strip() try: site = re.sub(r'^www\.', '', urlparse(url).netloc.lower()) except: site = url lines += [f"[{i}] **{title}**", f" - 來源網站:{site}", f" - 連結:<{url}>", f" - 存取日期:{today}", ""] return "\n".join(lines), unique def _strip_llm_references(self, report): patterns = [r'\n#{1,3}\s*(參考文獻|參考資料|References|Bibliography|引用來源)[^\n]*', r'\n\*\*(參考文獻|參考資料|References)\*\*[^\n]*'] earliest = len(report) for pat in patterns: m = re.search(pat, report, re.IGNORECASE) if m and m.start() < earliest: earliest = m.start() return report[:earliest] if earliest < len(report) else report def _save(self, path, content): try: with open(path, 'w', encoding='utf-8') as f: f.write(content) except Exception as e: logger.warning(f"⚠️ 儲存失敗 {path}:{e}") def execute_gui(self, topic: str, work_dir: str) -> Dict: os.makedirs(work_dir, exist_ok=True) start = time.time(); logger.info(f"🚀 開始研究:'{topic}'") try: plan = self.phase0_plan(topic) self._save(os.path.join(work_dir,"phase0_plan.txt"), "# 研究計劃\n\n## 子主題\n"+"\n".join(f"- {s}" for s in plan["subtopics"])+ "\n\n## 研究問題\n"+"\n".join(f"- {q}" for q in plan["questions"])+ "\n\n## 搜尋查詢\n"+"\n".join(f"- {q}" for q in plan["queries"])) sources = self.phase1_search_and_fetch(topic, plan) self._save(os.path.join(work_dir,"phase1_sources.txt"), "\n".join(f"{i}. {s['title'][:60]} | {s['url']}" for i,s in enumerate(sources,1))) subtopic_summaries = self.phase2_subtopic_analysis(topic, sources, plan) self._save(os.path.join(work_dir,"phase2_subtopics.txt"), "\n\n".join(f"## {k}\n{v}" for k,v in subtopic_summaries.items())) validation = self.phase3_fact_validation(topic, subtopic_summaries, sources) if validation: self._save(os.path.join(work_dir,"phase3_validation.txt"), validation) report_body = self.phase4_generate_report(topic, plan, subtopic_summaries, validation) refs_section, unique_sources = self._build_references_section(sources) report = report_body.rstrip() + "\n\n" + refs_section elapsed = time.time() - start total_tok = self.tracker.total() header = (f"# 📊 研究報告:{topic}\n\n" f"> **時間**:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n" f"> **耗時**:{elapsed:.1f} 秒\n" f"> **總 Token**:{total_tok:,}\n" f"> **來源**:{len(unique_sources)} 筆\n" f"> **子主題**:{len(subtopic_summaries)} 個\n\n---\n\n") full_report = header + report report_path = os.path.join(work_dir, "report_final.md") self._save(report_path, full_report) self._save(os.path.join(work_dir,"token_usage.txt"), f"# Token\n{self.tracker.detailed_report()}") logger.info(f"✅ 完成!{elapsed:.1f}s | Token:{total_tok:,}") logger.info(self.llm.health_report()) return {"success":True,"report":full_report,"elapsed":elapsed, "total_tok":total_tok,"sources_count":len(unique_sources), "work_dir":work_dir,"subtopics":list(subtopic_summaries.keys()), "queries":plan.get("queries",[])} except Exception as e: import traceback; tb = traceback.format_exc() logger.critical(f"❌ 終止:{e}\n{tb}") return {"success":False,"error":str(e),"work_dir":work_dir} # ============================================================================= # 12. HistoryManager # ============================================================================= HISTORY_ROOT = "./history" HISTORY_INDEX = os.path.join(HISTORY_ROOT, "index.json") _hist_lock = threading.Lock() def _ensure_history_dir(): os.makedirs(HISTORY_ROOT, exist_ok=True) if not os.path.exists(HISTORY_INDEX): with open(HISTORY_INDEX, 'w', encoding='utf-8') as f: json.dump([], f) def _load_index() -> List[Dict]: _ensure_history_dir() try: with open(HISTORY_INDEX, 'r', encoding='utf-8') as f: return json.load(f) except: return [] def _save_index(index: List[Dict]): _ensure_history_dir() with open(HISTORY_INDEX, 'w', encoding='utf-8') as f: json.dump(index, f, ensure_ascii=False, indent=2) def save_to_history(topic: str, result: Dict): with _hist_lock: index = _load_index() sid = str(uuid.uuid4())[:8] entry = { "id": sid, "topic": topic, "timestamp": datetime.now().isoformat(), "elapsed": result.get("elapsed", 0), "sources": result.get("sources_count", 0), "tok": result.get("total_tok", 0), "work_dir": result.get("work_dir",""), "keywords": _extract_keywords(topic, result), } index.insert(0, entry) _save_index(index) return sid def _extract_keywords(topic: str, result: Dict) -> List[str]: words: Set[str] = set() def _add(text: str): chs = re.findall(r'[\u4e00-\u9fff]', text) words.update(chs) words.update(chs[i]+chs[i+1] for i in range(len(chs)-1)) words.update(w.lower() for w in re.findall(r'[a-zA-Z]{2,}', text)) _add(topic) for st in result.get("subtopics", []): _add(st) for q in result.get("queries", []): _add(q) return sorted(words) def fuzzy_search_history(query: str, threshold: float = 0.3) -> List[Dict]: index = _load_index() if not query.strip(): return index q_words: Set[str] = set() chs = re.findall(r'[\u4e00-\u9fff]', query) q_words.update(chs) q_words.update(chs[i]+chs[i+1] for i in range(len(chs)-1)) q_words.update(w.lower() for w in re.findall(r'[a-zA-Z]{2,}', query)) if not q_words: return index scored = [] for entry in index: kw_set = set(entry.get("keywords", [])) if not kw_set: continue hits = len(q_words & kw_set) coverage = hits / len(q_words) if query.strip() in entry.get("topic", ""): coverage = max(coverage, 1.0) if coverage >= threshold: scored.append((coverage, entry)) scored.sort(key=lambda x: -x[0]) return [e for _, e in scored] def make_zip(work_dirs: List[str], zip_name: str) -> str: os.makedirs("./zips", exist_ok=True) zip_path = f"./zips/{zip_name}.zip" with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zf: for wd in work_dirs: if not os.path.isdir(wd): continue folder_name = os.path.basename(wd) for fname in os.listdir(wd): fpath = os.path.join(wd, fname) if os.path.isfile(fpath): zf.write(fpath, arcname=f"{folder_name}/{fname}") return zip_path # ============================================================================= # 13. ResearchRunner + ResearchQueue # ============================================================================= PENDING_QUEUE_FILE = os.path.join(HISTORY_ROOT, "_pending_queue.json") def _save_pending_to_disk(pending: List[str]): try: _ensure_history_dir() with open(PENDING_QUEUE_FILE, 'w', encoding='utf-8') as f: json.dump({"pending": pending, "saved_at": datetime.now().isoformat()}, f) except Exception as e: logger.warning(f"待辦佇列存檔失敗:{e}") def _load_pending_from_disk() -> List[str]: try: if os.path.exists(PENDING_QUEUE_FILE): with open(PENDING_QUEUE_FILE, 'r', encoding='utf-8') as f: data = json.load(f) pending = data.get("pending", []) if pending: logger.info(f"從磁碟恢復待辦佇列:{len(pending)} 個") return pending except Exception as e: logger.warning(f"待辦佇列讀取失敗:{e}") return [] class ResearchRunner: def __init__(self): self._lock = threading.Lock() self.running = False self.status = "idle" self.result: Optional[Dict] = None self.topic = "" self.zip_path: Optional[str] = None def start(self, topic: str) -> bool: with self._lock: if self.running: return False self.running = True; self.result = None self.status = "running"; self.topic = topic; self.zip_path = None _LOG_QUEUE.put(_SESSION_SENTINEL) threading.Thread(target=self._run, args=(topic,), daemon=True).start() return True def _run(self, topic: str): sid = str(uuid.uuid4())[:8] work_dir = os.path.join(HISTORY_ROOT, f"{sid}_{re.sub(r'[^\\w]','_',topic)[:20]}") try: sys_inst = ResearchSystem() result = sys_inst.execute_gui(topic, work_dir) if result["success"]: save_to_history(topic, result) self.zip_path = make_zip( [work_dir], f"{sid}_{re.sub(r'[^\\w]','_',topic)[:20]}" ) with self._lock: self.result = result self.status = "done" if result["success"] else "error" self.running = False except Exception as e: logger.critical(f"Runner 失敗:{e}") with self._lock: self.result = {"success": False, "error": str(e), "work_dir": work_dir} self.status = "error"; self.running = False def get_state(self) -> Dict: with self._lock: return {"running": self.running, "status": self.status, "result": self.result, "topic": self.topic, "zip_path": self.zip_path} class ResearchQueue: def __init__(self): self._lock = threading.Lock() self._pending: List[str] = _load_pending_from_disk() self._runner = ResearchRunner() threading.Thread(target=self._watcher, daemon=True).start() def enqueue(self, topic: str) -> str: topic = topic.strip() if not topic: return "⚠️ 請輸入研究主題" with self._lock: state = self._runner.get_state() if not state["running"] and not self._pending: self._runner.start(topic) _save_pending_to_disk(self._pending) return f"🟢 直接開始研究:{topic}" else: self._pending.append(topic) _save_pending_to_disk(self._pending) return f"✅ 已加入佇列(第 {len(self._pending)} 位等待):{topic}" def remove(self, idx: int) -> str: with self._lock: if 0 <= idx < len(self._pending): removed = self._pending.pop(idx) _save_pending_to_disk(self._pending) return f"🗑 已移除:{removed}" return "⚠️ 無效的索引" def get_full_state(self) -> Dict: with self._lock: return {"runner": self._runner.get_state(), "pending": list(self._pending)} def _watcher(self): while True: time.sleep(2) with self._lock: state = self._runner.get_state() if not state["running"] and self._pending: next_topic = self._pending.pop(0) _save_pending_to_disk(self._pending) self._runner.start(next_topic) _queue = ResearchQueue() # ============================================================================= # 14. 歷史紀錄操作函式(純 Python,給 Gradio 回呼用) # ============================================================================= def _entry_to_row(e: Dict) -> List: """把 index entry 轉成 Dataframe 一行""" ts = e.get("timestamp","")[:16].replace("T"," ") tok = e.get("tok", 0) tok_str = f"{tok:,}" if tok else "0" return [ e.get("id",""), e.get("topic",""), ts, e.get("sources", 0), f"{round(e.get('elapsed', 0))}s", tok_str, ] def _build_dropdown_choices(entries: List[Dict]) -> List[str]: return [f"[{e.get('id','')}] {e.get('topic','')}" for e in entries] def _id_from_choice(choice: str) -> str: """從 '[abc1] 主題' 提取 id""" m = re.match(r'^\[([^\]]+)\]', choice or "") return m.group(1) if m else "" # ── Tab 2 回呼 ──────────────────────────────────────────────────────────────── def cb_hist_load(search_query: str = ""): """載入/搜尋歷史,更新 Dataframe 與 Dropdown""" entries = fuzzy_search_history(search_query) if search_query.strip() else _load_index() rows = [_entry_to_row(e) for e in entries] choices = _build_dropdown_choices(entries) status = f"共 **{len(entries)}** 筆紀錄" + (f"(搜尋:{search_query})" if search_query.strip() else "") return ( gr.update(value=rows if rows else []), gr.update(choices=choices, value=None), gr.update(value=status), gr.update(value=""), # 清空預覽 gr.update(visible=False), # 隱藏下載 ) def cb_hist_preview(choice: str): """預覽選取的報告""" entry_id = _id_from_choice(choice) if not entry_id: return gr.update(value="⚠️ 請先在下拉選單中選擇一筆紀錄"), gr.update(visible=False) index = _load_index() entry = next((e for e in index if e.get("id") == entry_id), None) if not entry: return gr.update(value="⚠️ 找不到此紀錄"), gr.update(visible=False) wd = entry.get("work_dir","") rep_path = os.path.join(wd, "report_final.md") if wd else "" if not rep_path or not os.path.exists(rep_path): return gr.update(value="⚠️ 找不到報告檔案"), gr.update(visible=False) with open(rep_path, 'r', encoding='utf-8') as f: content = f.read() return gr.update(value=content), gr.update(visible=False) def cb_hist_download_one(choice: str): """下載單筆 ZIP,回傳檔案路徑給 gr.File""" entry_id = _id_from_choice(choice) if not entry_id: return gr.update(visible=False, value=None) index = _load_index() entry = next((e for e in index if e.get("id") == entry_id), None) if not entry: return gr.update(visible=False, value=None) wd = entry.get("work_dir","") if not wd or not os.path.isdir(wd): return gr.update(visible=False, value=None) topic_safe = re.sub(r'[^\w]','_', entry.get("topic","unknown"))[:20] zip_path = make_zip([wd], f"{entry_id}_{topic_safe}") return gr.update(visible=True, value=zip_path) def cb_hist_download_all(): """打包全部歷史為 ZIP""" entries = _load_index() work_dirs = [e.get("work_dir","") for e in entries if e.get("work_dir","")] if not work_dirs: return gr.update(visible=False, value=None) stamp = datetime.now().strftime('%Y%m%d_%H%M%S') zip_path = make_zip(work_dirs, f"all_history_{stamp}") return gr.update(visible=True, value=zip_path) # ============================================================================= # 15. Gradio UI # ============================================================================= import gradio as gr DARK_CSS = """ body, .gradio-container { background-color: #0d1117 !important; color: #c9d1d9 !important; font-family: 'Inter', 'Segoe UI', sans-serif; } .tab-nav button { background: #161b22 !important; color: #8b949e !important; border: 1px solid #30363d !important; border-radius: 8px 8px 0 0 !important; font-size: 14px !important; padding: 8px 20px !important; } .tab-nav button.selected, .tab-nav button:hover { background: #1f6feb !important; color: #fff !important; border-color: #1f6feb !important; } .gr-box, .gradio-box, .block, .form { background: #161b22 !important; border: 1px solid #30363d !important; border-radius: 10px !important; } input[type=text], textarea, .gr-input, .gr-textarea { background: #0d1117 !important; color: #c9d1d9 !important; border: 1px solid #30363d !important; border-radius: 8px !important; } input[type=text]:focus, textarea:focus { border-color: #1f6feb !important; box-shadow: 0 0 0 2px rgba(31,111,235,0.25) !important; } button.primary, .gr-button-primary { background: linear-gradient(135deg,#1f6feb,#388bfd) !important; color: #fff !important; border: none !important; border-radius: 8px !important; font-weight: 600 !important; padding: 10px 22px !important; } button.primary:hover { opacity:.88; } button.secondary, .gr-button-secondary { background: #21262d !important; color: #c9d1d9 !important; border: 1px solid #30363d !important; border-radius: 8px !important; } button.secondary:hover { border-color: #388bfd !important; } .gr-markdown, .prose { background: #161b22 !important; color: #c9d1d9 !important; padding: 16px !important; border-radius: 10px !important; border: 1px solid #30363d !important; max-height: 520px; overflow-y: auto; line-height: 1.7; } .prose h1,.prose h2,.prose h3 { color: #79c0ff !important; } .prose a { color: #58a6ff !important; } .prose code { background:#0d1117 !important; color:#ffa657 !important; border-radius:4px; padding:2px 6px; } .prose blockquote { border-left:3px solid #388bfd; padding-left:12px; color:#8b949e; } .prose hr { border-color:#30363d; } #log_box textarea { background: #010409 !important; color: #3fb950 !important; font-family: 'Fira Code','Consolas',monospace !important; font-size: 12px !important; border: 1px solid #1a7f37 !important; border-radius: 8px !important; line-height: 1.5; } #app-header { text-align:center; padding:20px 0 10px; } #app-header h1 { color:#e6edf3; font-size:26px; font-weight:700; margin:0; } #app-header p { color:#8b949e; font-size:13px; margin:4px 0 0; } """ # ── 輔助函式 ────────────────────────────────────────────────────────────────── def _queue_current_text(runner: Dict) -> str: s = runner.get("status", "idle") t = runner.get("topic", "") if s == "running": return f"🔄 **研究中:** {t}\n\n> 研究在背景執行,關閉頁面後仍會繼續,可回來查看結果。" elif s == "done" and t: return f"✅ **完成:** {t}" elif s == "error" and t: return f"❌ **錯誤:** {t}" return "⚪ 待命中(無進行中的研究)" def _pending_choices(pending: List[str]) -> List[str]: return [f"[{i+1}] {t}" for i, t in enumerate(pending)] def _drain_log_queue() -> List[str]: lines = [] while True: try: lines.append(_LOG_QUEUE.get_nowait()) except queue.Empty: break return lines # ── Tab 1 回呼 ──────────────────────────────────────────────────────────────── def cb_enqueue(topic: str): msg = _queue.enqueue(topic) full = _queue.get_full_state() runner, pending = full["runner"], full["pending"] return ( gr.update(value=f"💬 {msg}"), gr.update(value=_queue_current_text(runner)), gr.update(choices=_pending_choices(pending), value=None), gr.update(value=""), ) def cb_delete_pending(selected_label: str): if not selected_label: full = _queue.get_full_state() return (gr.update(value="⚠️ 請先在下拉選單中選取要刪除的項目"), gr.update(value=_queue_current_text(full["runner"])), gr.update(choices=_pending_choices(full["pending"]), value=None)) try: idx = int(selected_label.split("]")[0].lstrip("[")) - 1 msg = _queue.remove(idx) except (ValueError, IndexError): msg = "⚠️ 刪除失敗,請重試" full = _queue.get_full_state() runner = full["runner"] return (gr.update(value=f"💬 {msg}"), gr.update(value=_queue_current_text(runner)), gr.update(choices=_pending_choices(full["pending"]), value=None)) def cb_poll(): full = _queue.get_full_state() runner = full["runner"] pending = full["pending"] current_upd = gr.update(value=_queue_current_text(runner)) pending_upd = gr.update(choices=_pending_choices(pending), value=None) if runner["status"] == "running": return (current_upd, pending_upd, gr.update(value=f"🟢 **研究中:{runner['topic']}** — 請稍候..."), gr.update(), gr.update(visible=False), gr.update(visible=False)) elif runner["status"] == "done" and runner.get("result"): r = runner["result"] stat = (f"✅ **完成!** | 耗時 {r.get('elapsed',0):.0f}s " f"| 來源 {r.get('sources_count',0)} 篇 " f"| Token {r.get('total_tok',0):,}") zip_p = runner.get("zip_path") return (current_upd, pending_upd, gr.update(value=stat), gr.update(value=r.get("report","")), gr.update(visible=bool(zip_p), value=zip_p), gr.update(visible=True)) elif runner["status"] == "error": err = (runner.get("result") or {}).get("error","未知錯誤") return (current_upd, pending_upd, gr.update(value=f"🔴 **錯誤:** {err}"), gr.update(), gr.update(visible=False), gr.update(visible=False)) else: return (current_upd, pending_upd, gr.update(), gr.update(), gr.update(visible=False), gr.update(visible=False)) def cb_load_latest(): index = _load_index() if not index: return (gr.update(value="⚠️ 尚無歷史紀錄"), gr.update(), gr.update(visible=False), gr.update(visible=False)) latest = index[0] work_dir = latest.get("work_dir","") rep_path = os.path.join(work_dir, "report_final.md") if work_dir else "" if not rep_path or not os.path.exists(rep_path): return (gr.update(value="⚠️ 找不到最新報告檔案,請前往「歷史紀錄」分頁"), gr.update(), gr.update(visible=False), gr.update(visible=False)) with open(rep_path, 'r', encoding='utf-8') as f: report_content = f.read() topic = latest.get("topic","未知主題") status_txt = (f"📂 **載入歷史:{topic}** | " f"耗時 {latest.get('elapsed',0):.0f}s | " f"來源 {latest.get('sources',0)} 篇") sid = latest.get("id","unknown") topic_safe = re.sub(r'[^\w]','_',topic)[:20] zip_path = make_zip([work_dir], f"{sid}_{topic_safe}") return (gr.update(value=status_txt), gr.update(value=report_content), gr.update(visible=True, value=zip_path), gr.update(visible=True)) def cb_clear_report(): return (gr.update(value="⚪ 待命中"), gr.update(value=""), gr.update(visible=False), gr.update(visible=False)) # ── Tab 3 回呼 ──────────────────────────────────────────────────────────────── _log_buffer: List[str] = [] def cb_refresh_log(): global _log_buffer for line in _drain_log_queue(): if line == _SESSION_SENTINEL: _log_buffer.clear() else: _log_buffer.append(line) if len(_log_buffer) > 600: _log_buffer = _log_buffer[-600:] full = _queue.get_full_state() runner = full["runner"] pending_cnt = len(full["pending"]) badge_map = {"running":"🟢 研究中","done":"🔵 完成","error":"🔴 錯誤","idle":"⚪ 待命"} badge = badge_map.get(runner["status"],"?") status_txt = f"系統狀態:{badge}" if pending_cnt > 0: status_txt += f" | 等待佇列:{pending_cnt} 個" return gr.update(value="\n".join(_log_buffer)), gr.update(value=status_txt) def cb_clear_log(): global _log_buffer _log_buffer.clear() while True: try: _LOG_QUEUE.get_nowait() except queue.Empty: break return gr.update(value="") # ============================================================================= # 建立 Gradio Blocks # ============================================================================= _HIST_DF_HEADERS = ["ID", "主題", "時間", "來源數", "耗時", "Token"] _HIST_DF_TYPES = ["str", "str", "str", "number", "str", "str"] with gr.Blocks(css=DARK_CSS, theme=gr.themes.Base(), title="AI 研究系統 V7") as demo: gr.HTML("""

🔬 AI 自動化研究系統 V7

研究在伺服器背景執行 · 關閉頁面後仍繼續 · 歷史紀錄全面改用 Gradio 原生元件(穩定可靠)

""") with gr.Tabs(): # ════════════════════════════════════════════════════════════ # TAB 1 — 研究 + 佇列管理 # ════════════════════════════════════════════════════════════ with gr.Tab("🔍 研究", id="tab_research"): with gr.Row(): with gr.Column(scale=5): t1_topic = gr.Textbox( label="研究主題", placeholder="輸入主題後按「加入佇列」,可連續排程多個主題...", lines=1, ) with gr.Column(scale=1, min_width=130): t1_enqueue_btn = gr.Button("➕ 加入佇列", variant="primary", size="lg") t1_msg = gr.Markdown(value="") with gr.Row(): with gr.Column(scale=1, min_width=260): gr.Markdown("#### 📡 研究佇列") t1_current_md = gr.Markdown( value="⚪ 待命中(無進行中的研究)", label="目前狀態", ) t1_pending_dd = gr.Dropdown( label="⏳ 等待中的研究(選取後可刪除)", choices=[], value=None, interactive=True, ) t1_del_btn = gr.Button("🗑 刪除選取的等待項目", variant="secondary", size="sm") with gr.Column(scale=3): t1_status_md = gr.Markdown(value="⚪ 待命中") with gr.Row(): t1_load_latest_btn = gr.Button( "📂 載入最新歷史結果", variant="secondary", size="sm" ) t1_clear_btn = gr.Button( "🔄 清除報告區", variant="secondary", size="sm", visible=False ) t1_download = gr.File( label="📦 下載本次研究包(ZIP)", visible=False, interactive=False, ) t1_report = gr.Markdown(value="", label="研究報告", elem_classes=["prose"]) t1_timer = gr.Timer(value=3) _enqueue_outputs = [t1_msg, t1_current_md, t1_pending_dd, t1_topic] t1_enqueue_btn.click(cb_enqueue, inputs=[t1_topic], outputs=_enqueue_outputs) t1_topic.submit(cb_enqueue, inputs=[t1_topic], outputs=_enqueue_outputs) t1_del_btn.click( cb_delete_pending, inputs=[t1_pending_dd], outputs=[t1_msg, t1_current_md, t1_pending_dd], ) t1_timer.tick( cb_poll, outputs=[t1_current_md, t1_pending_dd, t1_status_md, t1_report, t1_download, t1_clear_btn], ) t1_load_latest_btn.click( cb_load_latest, outputs=[t1_status_md, t1_report, t1_download, t1_clear_btn], ) t1_clear_btn.click( cb_clear_report, outputs=[t1_status_md, t1_report, t1_download, t1_clear_btn], ) # ════════════════════════════════════════════════════════════ # TAB 2 — 歷史紀錄 # ★ 完全使用 Gradio 原生元件 + Python 回呼 ★ # ★ 不再依賴 JS fetch / gr.HTML / FastAPI 路由 ★ # ════════════════════════════════════════════════════════════ with gr.Tab("📚 歷史紀錄", id="tab_history"): # ── 搜尋列 ────────────────────────────────────────────── with gr.Row(): hist_search_input = gr.Textbox( label="關鍵字搜尋", placeholder="例如:量子力學 / anime / climate...", lines=1, scale=4, ) hist_search_btn = gr.Button("🔎 搜尋", variant="primary", scale=1, min_width=90) hist_refresh_btn = gr.Button("🔄 重新整理", variant="secondary", scale=1, min_width=100) hist_dl_all_btn = gr.Button("📦 下載全部", variant="secondary", scale=1, min_width=100) hist_status_md = gr.Markdown(value="點擊「重新整理」載入歷史紀錄") # ── 歷史表格(Dataframe,唯讀) ───────────────────────── hist_df = gr.Dataframe( headers=_HIST_DF_HEADERS, datatype=_HIST_DF_TYPES, value=[], interactive=False, wrap=False, label="歷史紀錄列表", row_count=(8, "dynamic"), col_count=(len(_HIST_DF_HEADERS), "fixed"), ) gr.Markdown("---\n#### 🔎 選擇一筆紀錄進行預覽 / 下載") # ── 選取 + 操作 ───────────────────────────────────────── with gr.Row(): hist_select_dd = gr.Dropdown( label="選擇紀錄(ID + 主題)", choices=[], value=None, interactive=True, scale=4, ) hist_preview_btn = gr.Button("👁 預覽報告", variant="primary", scale=1, min_width=100) hist_dl_one_btn = gr.Button("⬇ 下載 ZIP", variant="secondary", scale=1, min_width=100) # ── 下載輸出(gr.File,由 Python callback 填入) ───────── hist_dl_file = gr.File( label="📦 ZIP 下載(點擊即下載)", visible=False, interactive=False, ) # ── 報告預覽 ───────────────────────────────────────────── hist_preview_md = gr.Markdown( value="", label="報告預覽", elem_classes=["prose"], ) # ── 事件綁定 ───────────────────────────────────────────── _hist_load_outputs = [hist_df, hist_select_dd, hist_status_md, hist_preview_md, hist_dl_file] hist_refresh_btn.click( fn=lambda: cb_hist_load(""), outputs=_hist_load_outputs, ) hist_search_btn.click( fn=cb_hist_load, inputs=[hist_search_input], outputs=_hist_load_outputs, ) hist_search_input.submit( fn=cb_hist_load, inputs=[hist_search_input], outputs=_hist_load_outputs, ) hist_preview_btn.click( fn=cb_hist_preview, inputs=[hist_select_dd], outputs=[hist_preview_md, hist_dl_file], ) hist_dl_one_btn.click( fn=cb_hist_download_one, inputs=[hist_select_dd], outputs=[hist_dl_file], ) hist_dl_all_btn.click( fn=cb_hist_download_all, outputs=[hist_dl_file], ) # 切換到此 Tab 時自動刷新 demo.load( fn=lambda: cb_hist_load(""), outputs=_hist_load_outputs, ) # ════════════════════════════════════════════════════════════ # TAB 3 — 執行日誌 # ════════════════════════════════════════════════════════════ with gr.Tab("📡 執行日誌", id="tab_log"): with gr.Row(): t3_status_label = gr.Markdown(value="系統狀態:⚪ 待命") t3_clear_btn = gr.Button("🗑 清除日誌", variant="secondary", size="sm") t3_log = gr.Textbox( label="即時日誌(每次新研究開始自動清除舊 session)", lines=30, max_lines=30, interactive=False, elem_id="log_box", autoscroll=True, ) gr.HTML("""""") t3_timer = gr.Timer(value=2) t3_timer.tick(cb_refresh_log, outputs=[t3_log, t3_status_label]) t3_clear_btn.click(cb_clear_log, outputs=[t3_log]) # ============================================================================= # 16. 啟動(保留 FastAPI 備用路由,但 UI 不再依賴它) # ============================================================================= from fastapi import FastAPI from fastapi.responses import JSONResponse, FileResponse, Response from fastapi import Query as FastQuery fastapi_app = FastAPI(title="AI Research API") @fastapi_app.get("/api/history") async def api_get_history(q: str = FastQuery(default="")): try: entries = fuzzy_search_history(q) if q.strip() else _load_index() slim = [{"id":e.get("id",""),"topic":e.get("topic",""), "timestamp":e.get("timestamp","")[:16].replace("T"," "), "sources":e.get("sources",0),"elapsed":round(e.get("elapsed",0)), "tok":e.get("tok",0)} for e in entries] return JSONResponse(slim) except Exception as e: return JSONResponse({"error": str(e)}, status_code=500) @fastapi_app.get("/api/download/{entry_id}") async def api_download_entry(entry_id: str): try: index = _load_index() entry = next((e for e in index if e.get("id") == entry_id), None) if not entry: return Response("找不到此筆紀錄", status_code=404) wd = entry.get("work_dir","") if not wd or not os.path.isdir(wd): return Response("工作目錄不存在", status_code=404) topic_safe = re.sub(r'[^\w]','_', entry.get("topic","unknown"))[:20] zip_path = make_zip([wd], f"{entry_id}_{topic_safe}") fname = f"{entry_id}_{topic_safe}.zip" return FileResponse(zip_path, media_type="application/zip", filename=fname, headers={"Content-Disposition": f'attachment; filename="{fname}"'}) except Exception as e: return Response(f"下載失敗:{e}", status_code=500) import gradio as gr mounted_app = gr.mount_gradio_app(fastapi_app, demo, path="/") if __name__ == "__main__": import uvicorn _ensure_history_dir() os.makedirs("./sessions", exist_ok=True) os.makedirs("./zips", exist_ok=True) if not DDGS_AVAILABLE: print("⚠️ ddgs 套件未安裝,搜尋功能受限") if not GRADIO_AVAILABLE: print("⚠️ gradio_client 未安裝,LLM 呼叫受限") print("🚀 啟動中:http://0.0.0.0:7860") uvicorn.run(mounted_app, host="0.0.0.0", port=7860, log_level="warning")