diff --git a/.env.example b/.env.example new file mode 100644 index 0000000000000000000000000000000000000000..6e8e325b246907dee937fe855f3cf8c95ff7abf5 --- /dev/null +++ b/.env.example @@ -0,0 +1,93 @@ +# ====================================== +# GitHub Agent Demo - 环境变量配置 +# ====================================== + +# --- LLM 供应商选择 --- +# 支持: openai, deepseek, anthropic, gemini +# 默认: deepseek +LLM_PROVIDER=deepseek + +# --- API Keys (根据选择的供应商配置对应的 Key) --- + +# OpenAI (如果 LLM_PROVIDER=openai) +OPENAI_API_KEY= +# OPENAI_BASE_URL= # 可选: 自定义端点 (如 Azure OpenAI) + +# DeepSeek (如果 LLM_PROVIDER=deepseek) +DEEPSEEK_API_KEY= +# DEEPSEEK_BASE_URL=https://api.deepseek.com # 可选: 默认值 + +# Anthropic Claude (如果 LLM_PROVIDER=anthropic) +ANTHROPIC_API_KEY= + +# Google Gemini (如果 LLM_PROVIDER=gemini) +GEMINI_API_KEY= +# GEMINI_BASE_URL= # 可选: OpenAI 兼容端点 + +# --- 模型配置 --- +# 如果不指定,将使用各供应商的默认模型: +# - openai: gpt-4o-mini +# - deepseek: deepseek-chat +# - anthropic: claude-3-5-sonnet-20241022 +# - gemini: gemini-1.5-flash +# MODEL_NAME=deepseek-chat + +# --- GitHub Token --- +# 用于访问 GitHub API,提高请求限制 +GITHUB_TOKEN= + +# --- Embedding 服务 --- +# SiliconFlow API Key (用于 BGE-M3 Embedding) +SILICON_API_KEY= + +# --- Langfuse 追踪配置 (可选) --- +# LANGFUSE_ENABLED=true +# LANGFUSE_HOST=http://localhost:3000 +# LANGFUSE_PUBLIC_KEY= +# LANGFUSE_SECRET_KEY= + +# --- Qdrant 向量数据库配置 --- +# 模式选择: "local" | "server" | "cloud" +# - local: 本地嵌入式存储 (开发环境, 单 Worker) +# - server: Qdrant Server Docker (生产环境, 多 Worker) +# - cloud: Qdrant Cloud 托管服务 +QDRANT_MODE=local +QDRANT_LOCAL_PATH=data/qdrant_db + +# Server 模式: 连接 Qdrant Server (Docker) +# QDRANT_MODE=server +# QDRANT_URL=http://localhost:6333 +# 或分开配置: +# QDRANT_HOST=localhost +# QDRANT_PORT=6333 + +# Cloud 模式: 连接 Qdrant Cloud +# QDRANT_MODE=cloud +# QDRANT_URL=https://xxx.qdrant.tech +# QDRANT_API_KEY=your-api-key + +# 向量维度 (BGE-M3 = 1024) +# QDRANT_VECTOR_SIZE=1024 + +# --- Gunicorn Worker 配置 --- +# 2核2G服务器建议设为 2 +# 4核8G服务器可设为 4 +GUNICORN_WORKERS=2 + +# --- 分布式锁配置 --- +# 锁后端: "memory" | "file" | "redis" +# - memory: 内存锁 (单进程) +# - file: 文件锁 (多 Worker 单节点) +# - redis: Redis 分布式锁 (多节点) +LOCK_BACKEND=file +LOCK_DIR=data/locks +# REDIS_URL=redis://localhost:6379/0 + +# --- 服务配置 --- +HOST=0.0.0.0 +PORT=8000 + +# --- LLM 参数 (可选) --- +# LLM_TEMPERATURE=0.1 +# LLM_MAX_TOKENS=4096 +# LLM_TIMEOUT=600 \ No newline at end of file diff --git a/.github/workflows/sync_to_hub.yml b/.github/workflows/sync_to_hub.yml new file mode 100644 index 0000000000000000000000000000000000000000..a07b8c3c4bb7f452273787d9d1ebe3f62d23e4d4 --- /dev/null +++ b/.github/workflows/sync_to_hub.yml @@ -0,0 +1,58 @@ +name: Sync to Hugging Face hub +on: + push: + branches: [main] + workflow_dispatch: + +jobs: + sync-to-hub: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + with: + fetch-depth: 0 + + - name: Push to hub + env: + HF_TOKEN: ${{ secrets.HF_TOKEN }} + HF_USERNAME: realdexter + SPACE_NAME: RepoReaper + run: | + echo "🚀 Starting deployment to Hugging Face..." + + # 1. 配置 Git + git config --global user.email "bot@github.com" + git config --global user.name "GitHub Actions Bot" + + # 2. 【核心魔法】动态生成 Hugging Face 专用的 README + # 这一步会在发送给 HF 之前,强行在 README.md 顶部插入配置头 + # GitHub 本地的文件不会受影响,依然保持干净漂亮 + echo "---" > hf_header.yml + echo "title: RepoReaper" >> hf_header.yml + echo "emoji: 💀" >> hf_header.yml + echo "colorFrom: blue" >> hf_header.yml + echo "colorTo: indigo" >> hf_header.yml + echo "sdk: docker" >> hf_header.yml + echo "pinned: false" >> hf_header.yml + echo "app_port: 8000" >> hf_header.yml # 👈 关键:这里指定端口,你就不用改代码了 + echo "---" >> hf_header.yml + echo "" >> hf_header.yml + + # 将配置头和原 README 内容拼接 + cat hf_header.yml README.md > README_temp.md + mv README_temp.md README.md + + # 3. 清理不需要的文件 + rm -rf docs/ + rm -f *.jpg *.png *.gif hf_header.yml + rm -rf .git + + # 4. 初始化新仓库并推送 + git init -b main + git add . + git commit -m "deploy: auto-inject hf config & sync" + + git remote add space https://$HF_USERNAME:$HF_TOKEN@huggingface.co/spaces/$HF_USERNAME/$SPACE_NAME + git push --force space main + + echo "✅ Deployment successful! Config header injected on-the-fly." \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..e1f6e29f6e3f6d7b89de136deb4bb2e5215dc444 --- /dev/null +++ b/.gitignore @@ -0,0 +1,43 @@ +# .gitignore +__pycache__/ +*.py[cod] +.env +.venv/ +venv/ +.DS_Store +data/ +# Vue 构建输出 +#frontend-dist/ +frontend-vue/node_modules/ +frontend-vue/dist/ + +# 锁文件目录 +data/locks/ + +# 日志 +logs/ +*.log + +# IDE +.idea/ +.vscode/ +*.swp + +# 临时文件 +*.tmp +*.bak +QUICKSTART.md +docs/INTERVIEW_QA.md +docs/ROADMAP.md +docs/TECHNICAL_REPORT.md +evaluation/000_START_HERE.md +evaluation/golden_dataset.json +evaluation/HIGH_QUALITY_QUESTIONS.md + +evaluation/README_EVALUATION_SYSTEM.md +evaluation/ragas_eval_dataset.json +evaluation/sft_data/eval_results.jsonl +evaluation/sft_data/negative_samples.jsonl +evaluation/sft_data/positive_samples.jsonl +evaluation/sft_data/skipped_samples.jsonl +evaluation/sft_data/cleaned/rejected_20260128_010745.jsonl diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..67ffb714e5c9bf3a0006a52635d7ea0c479c8e1a --- /dev/null +++ b/Dockerfile @@ -0,0 +1,45 @@ +# 1. 基础镜像:选择 Python 3.10 的轻量版 (Slim) +FROM python:3.10-slim + +# 2. 设置环境变量 +ENV PYTHONDONTWRITEBYTECODE=1 \ + PYTHONUNBUFFERED=1 \ + # 默认 LLM 供应商 (可通过 docker run -e 覆盖) + LLM_PROVIDER=deepseek + +# 3. 设置工作目录 +WORKDIR /app + +# 4. 安装系统级依赖 +# build-essential: ChromaDB 编译需要 +# curl: 健康检查 +# git: 某些 pip 包可能需要 +RUN apt-get update && apt-get install -y --no-install-recommends \ + build-essential \ + curl \ + git \ + && rm -rf /var/lib/apt/lists/* \ + && apt-get clean + +# 5. 复制依赖文件并安装 (利用 Docker 层缓存) +COPY requirements.txt . + +# 6. 安装 Python 依赖 +RUN pip install --no-cache-dir --upgrade pip && \ + pip install --no-cache-dir -r requirements.txt + +# 7. 复制项目代码 +COPY . . + +# 8. 创建数据目录 (Qdrant 本地存储 + 上下文缓存) +RUN mkdir -p /app/data/qdrant_db /app/data/contexts + +# 9. 暴露端口 +EXPOSE 8000 + +# 10. 健康检查 +HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \ + CMD curl -f http://localhost:8000/health || exit 1 + +# 11. 启动命令 +CMD ["gunicorn", "-c", "gunicorn_conf.py", "app.main:app"] \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000000000000000000000000000000000000..242fa6488fcde1ae993f6959de666741a3f71eab --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2025 tzzp1224 + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000000000000000000000000000000000000..05f8c1528f6176026d9e06e4eec497f5086b488b --- /dev/null +++ b/README.md @@ -0,0 +1,224 @@ +--- +title: RepoReaper +emoji: 💀 +colorFrom: blue +colorTo: indigo +sdk: docker +pinned: false +app_port: 8000 +--- + +
+ + RepoReaper Logo + +

RepoReaper

+ +

💀 Harvest Logic. Dissect Architecture. Chat with Code.

+ +

+ English • + 简体中文 +

+ + + License + + Python Version + DeepSeek Powered + Agent Architecture + +
+ + RAG + Qdrant + FastAPI + Vue 3 + Docker + +
+
+ +

+ 👇 Live Demo / 在线体验 👇 +

+

+ + Global Demo + +     + + China Demo + +

+ +

+ + ⚠️ Public demos use shared API quotas. Deploy locally for the best experience. + +

+ +
+ + RepoReaper Demo + +
+
+ +--- + +An autonomous Agent that dissects any GitHub repository. It maps code architecture, warms up semantic cache, and answers questions with Just-In-Time context retrieval. + +--- + +## ✨ Key Features + +| Feature | Description | +|:--------|:------------| +| **Multi-Language AST Parsing** | Python AST + Regex patterns for Java, TypeScript, Go, Rust, etc. | +| **Hybrid Search** | Qdrant vectors + BM25 with RRF fusion | +| **JIT Context Loading** | Auto-fetches missing files during Q&A | +| **Query Rewrite** | Translates natural language to code keywords | +| **End-to-End Tracing** | Langfuse integration for observability | +| **Auto Evaluation** | LLM-as-Judge scoring pipeline | + +--- + +## 🏗 Architecture + +``` +┌─────────────────────────────────────────────────────────────┐ +│ Vue 3 Frontend (SSE Streaming + Mermaid Diagrams) │ +└─────────────────────┬───────────────────────────────────────┘ + │ +┌─────────────────────▼───────────────────────────────────────┐ +│ FastAPI Backend │ +│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │ +│ │ Agent │ │ Chat │ │ Evaluation │ │ +│ │ Service │ │ Service │ │ Framework │ │ +│ └──────┬──────┘ └──────┬──────┘ └─────────────────────┘ │ +│ │ │ │ +│ ┌──────▼───────────────▼──────┐ ┌─────────────────────┐ │ +│ │ Vector Service (Qdrant+BM25)│ │ Tracing (Langfuse) │ │ +│ └─────────────────────────────┘ └─────────────────────┘ │ +└─────────────────────────────────────────────────────────────┘ +``` + +--- + +## 🛠 Tech Stack + +**Backend:** Python 3.10+ · FastAPI · AsyncIO · Qdrant · BM25 +**Frontend:** Vue 3 · Pinia · Mermaid.js · SSE +**LLM:** DeepSeek V3 · SiliconFlow BGE-M3 +**Ops:** Docker · Gunicorn · Langfuse + +--- + +## 🏁 Quick Start + +**Prerequisites:** Python 3.10+ · (Optional) Node 18+ for rebuilding frontend · GitHub Token (recommended) · LLM API Key (required) + +```bash +# Clone & Setup +git clone https://github.com/tzzp1224/RepoReaper.git && cd RepoReaper +python -m venv venv && source venv/bin/activate +pip install -r requirements.txt + +# Configure .env (copy from example and fill in your keys) +cp .env.example .env +# Required: set LLM_PROVIDER and the matching *_API_KEY +# Recommended: GITHUB_TOKEN and SILICON_API_KEY (embeddings) + +# (Optional) Build frontend (repo already contains frontend-dist) +cd frontend-vue +npm install +npm run build +cd .. + +# Run +python -m app.main +``` + +Open `http://localhost:8000` and paste any GitHub repo URL. + +**Docker (single container, local Qdrant):** +```bash +cp .env.example .env +docker build -t reporeaper . +docker run -d -p 8000:8000 --env-file .env reporeaper +``` + +**Docker Compose (recommended, with Qdrant Server):** +```bash +cp .env.example .env +# Set QDRANT_MODE=server and QDRANT_URL=http://qdrant:6333 in .env +docker compose up -d --build +``` + + + + + +## 📊 Evaluation & Tracing Status + +| Component | Status | Notes | +|:----------|:------:|:------| +| **Self-built Eval Engine** | ✅ Working | 4-layer metrics (QueryRewrite / Retrieval / Generation / Agentic), LLM-as-Judge | +| **Auto Evaluation** | ✅ Working | Triggers after every `/chat`, async, writes to `evaluation/sft_data/` | +| **Data Routing (SFT)** | ✅ Working | Auto-grades Gold/Silver/Bronze/Rejected → JSONL files | +| **Eval API Endpoints** | ✅ Working | `/evaluate`, `/evaluation/stats`, `/dashboard/*`, `/auto-eval/*` (7 endpoints) | +| **Offline Retrieval Eval** | ✅ Working | `test_retrieval.py` — Hit Rate, Recall@K, Precision@K, MRR | +| **Langfuse Tracing** | ⚠️ Partial | Framework + 14 call sites wired in agent/chat services; falls back to local JSON logs (`logs/traces/`) when Langfuse unavailable | +| **Ragas Integration** | ❌ Placeholder | `use_ragas=False` by default; `_ragas_eval()` API call doesn't match latest Ragas SDK | +| **Langfuse ↔ Eval** | ❌ Not connected | Eval results only write JSONL, not reported to Langfuse Scores API | + +> **Overall completion: ~65%** — the self-built eval loop is production-ready; Ragas and Langfuse integrations are scaffolded but not functional. + +--- + +## ⚠️ Known Issues + +1. **Python 3.14 + Langfuse import error** + `pydantic.V1.errors.ConfigError: unable to infer type for attribute "description"` — Langfuse 3.x internally uses `pydantic.v1` compat layer which breaks on Python 3.14. + **Workaround:** set `LANGFUSE_ENABLED=false` in `.env`, or use Python 3.10–3.12. + +2. **Langfuse Server not included in `docker-compose.yml`** + Even if the import works, you need a running Langfuse instance. Add it yourself or use [app.langfuse.com](https://app.langfuse.com). + +3. **Trace spans are not linked** + `tracing_service` records spans/events but doesn't pass `trace_id` to Langfuse API calls — the Langfuse UI will show isolated events instead of a connected trace tree. + +4. **Ragas `_ragas_eval()` uses outdated API** + Passes a plain dict to `ragas.evaluate()`, but latest Ragas requires a `Dataset` object. The `ragas_eval_dataset.json` export exists but no script consumes it. + +5. **Golden dataset has no reference answers** + All 26 test cases have `expected_answer: ""` — generation quality cannot be compared against ground truth. + +6. **Heuristic fallback is coarse** + When no LLM client is available, `faithfulness` uses keyword overlap + 0.2 baseline; `completeness` is purely length-based. + +--- + +## 🗺 Roadmap + +- [ ] **Fix Langfuse compat** — pin `langfuse`/`pydantic` versions or gate import behind Python version check +- [ ] **Add Langfuse to `docker-compose.yml`** — one-command local observability +- [ ] **Wire trace_id through spans** — enable full trace tree in Langfuse UI +- [ ] **Integrate Ragas properly** — update `_ragas_eval()` to use `ragas.evaluate(Dataset(...))`, add a standalone eval script +- [ ] **Enrich golden dataset** — add `expected_answer` for generation benchmarking, expand to 50+ cases +- [ ] **Eval dashboard frontend** — Vue component to visualize quality distribution and bad cases +- [ ] **CI regression baseline** — run `test_retrieval.py` in GitHub Actions, fail on metric regression +- [ ] **Export to Langfuse Datasets** — push eval results to Langfuse Scores/Datasets API for unified observability + +--- + +## 📈 Star History + + + + + + Star History Chart + + \ No newline at end of file diff --git a/README_zh.md b/README_zh.md new file mode 100644 index 0000000000000000000000000000000000000000..647001b61060e425ccf3d93f487f9816e4ea3db1 --- /dev/null +++ b/README_zh.md @@ -0,0 +1,212 @@ +
+ + RepoReaper Logo + +

RepoReaper

+ +

💀 Harvest Logic. Dissect Architecture. Chat with Code.

+ +

+ English • + 简体中文 +

+ + + License + + Python Version + DeepSeek Powered + Agent Architecture + +
+ + RAG + Qdrant + FastAPI + Vue 3 + Docker + +
+
+ +

+ 👇 在线体验 👇 +

+

+ + Global Demo + +     + + China Demo + +

+ +

+ + ⚠️ 中国用户请使用 Seoul Server。如遇限流,建议本地部署。 + +

+ +
+ + RepoReaper Demo + +
+
+ +--- + +自治型代码审计 Agent:解析任意 GitHub 仓库架构,构建语义缓存,支持即时上下文检索问答。 + +--- + +## ✨ 核心特性 + +| 特性 | 说明 | +|:----|:----| +| **多语言 AST 解析** | Python AST + 正则适配 Java / TS / Go / Rust 等 | +| **混合检索** | Qdrant 向量 + BM25 关键词,RRF 融合排序 | +| **JIT 动态加载** | 问答时自动拉取缺失文件 | +| **查询重写** | 自然语言 → 代码检索关键词 | +| **端到端追踪** | Langfuse 集成,全链路可观测 | +| **自动评估** | LLM-as-Judge 质量评分 | + +--- + +## 🏗 系统架构 + +``` +┌─────────────────────────────────────────────────────────────┐ +│ Vue 3 前端 (SSE 流式 + Mermaid 架构图) │ +└─────────────────────┬───────────────────────────────────────┘ + │ +┌─────────────────────▼───────────────────────────────────────┐ +│ FastAPI 后端 │ +│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │ +│ │ Agent │ │ Chat │ │ Evaluation │ │ +│ │ Service │ │ Service │ │ Framework │ │ +│ └──────┬──────┘ └──────┬──────┘ └─────────────────────┘ │ +│ │ │ │ +│ ┌──────▼───────────────▼──────┐ ┌─────────────────────┐ │ +│ │ Vector Service (Qdrant+BM25)│ │ Tracing (Langfuse) │ │ +│ └─────────────────────────────┘ └─────────────────────┘ │ +└─────────────────────────────────────────────────────────────┘ +``` + +--- + +## 🛠 技术栈 + +**后端:** Python 3.10+ · FastAPI · AsyncIO · Qdrant · BM25 +**前端:** Vue 3 · Pinia · Mermaid.js · SSE +**模型:** DeepSeek V3 · SiliconFlow BGE-M3 +**运维:** Docker · Gunicorn · Langfuse + +--- + +## 🏁 快速开始 + +**前置要求:** Python 3.10+ ·(可选)Node 18+ 用于重新构建前端 · GitHub Token(推荐)· LLM API Key(必需) + +```bash +# 克隆 & 安装 +git clone https://github.com/tzzp1224/RepoReaper.git && cd RepoReaper +python -m venv venv && source venv/bin/activate +pip install -r requirements.txt + +# 配置 .env(建议从示例复制) +cp .env.example .env +# 必需:设置 LLM_PROVIDER 以及对应的 *_API_KEY +# 推荐:GITHUB_TOKEN 和 SILICON_API_KEY(Embedding) + +# (可选)构建前端(仓库已包含 frontend-dist) +cd frontend-vue +npm install +npm run build +cd .. + +# 启动 +python -m app.main +``` + +访问 `http://localhost:8000`,输入任意 GitHub 仓库地址开始审计。 + +**Docker(单容器,本地 Qdrant):** +```bash +cp .env.example .env +docker build -t reporeaper . +docker run -d -p 8000:8000 --env-file .env reporeaper +``` + +**Docker Compose(推荐,包含 Qdrant Server):** +```bash +cp .env.example .env +# 在 .env 中设置 QDRANT_MODE=server 与 QDRANT_URL=http://qdrant:6333 +docker compose up -d --build +``` + +--- + +## 📊 评估与追踪现状 + +| 组件 | 状态 | 说明 | +|:----|:----:|:----| +| **自研评估引擎** | ✅ 可用 | 四层指标(QueryRewrite / Retrieval / Generation / Agentic),LLM-as-Judge 判分 | +| **在线自动评估** | ✅ 可用 | 每次 `/chat` 结束后异步触发,结果写入 `evaluation/sft_data/` | +| **数据路由 (SFT)** | ✅ 可用 | 按评分自动分流 Gold/Silver/Bronze/Rejected → JSONL 文件 | +| **评估 API** | ✅ 可用 | `/evaluate`、`/evaluation/stats`、`/dashboard/*`、`/auto-eval/*` 共 7 个端点 | +| **离线检索评估** | ✅ 可用 | `test_retrieval.py` — Hit Rate、Recall@K、Precision@K、MRR | +| **Langfuse 追踪** | ⚠️ 部分完成 | 框架 + 14 处埋点已就位(agent/chat service);不可用时自动降级为本地日志 `logs/traces/` | +| **Ragas 集成** | ❌ 占位 | 默认 `use_ragas=False`;`_ragas_eval()` 调用方式与最新 Ragas SDK 不兼容 | +| **Langfuse ↔ 评估** | ❌ 未打通 | 评估结果仅写 JSONL,未上报 Langfuse Scores API | + +> **综合完成度约 65%**:自研评估链路已闭环可用;Ragas 与 Langfuse 集成均为半成品。 + +--- + +## ⚠️ 已知问题 + +1. **Python 3.14 + Langfuse 导入报错** + `pydantic.V1.errors.ConfigError: unable to infer type for attribute "description"` — Langfuse 3.x 内部依赖 `pydantic.v1` 兼容层,在 Python 3.14 下不兼容。 + **临时方案:** 在 `.env` 中设置 `LANGFUSE_ENABLED=false`,或使用 Python 3.10–3.12。 + +2. **`docker-compose.yml` 未包含 Langfuse 服务** + 即使导入成功,仍需运行中的 Langfuse 实例。请自行添加或使用 [app.langfuse.com](https://app.langfuse.com)。 + +3. **Trace 链路未关联** + `tracing_service` 记录了 span/event,但调用 Langfuse API 时未传 `trace_id`,Langfuse UI 中只能看到孤立事件而非完整链路树。 + +4. **Ragas `_ragas_eval()` API 过时** + 当前向 `ragas.evaluate()` 传递 dict,最新 Ragas 要求 `Dataset` 对象。已导出 `ragas_eval_dataset.json` 但无脚本消费它。 + +5. **黄金数据集缺少标准答案** + 26 条测试用例的 `expected_answer` 均为空,无法做生成质量的 ground truth 对比。 + +6. **启发式降级较粗糙** + 无 LLM client 时,`faithfulness` 用关键词重叠 + 0.2 基础分;`completeness` 纯粹按字数判断。 + +--- + +## 🗺 路线图 + +- [ ] **修复 Langfuse 兼容性** — 固定 `langfuse`/`pydantic` 版本或按 Python 版本门控导入 +- [ ] **`docker-compose.yml` 加入 Langfuse** — 一键启动本地可观测平台 +- [ ] **串联 trace_id** — 让 Langfuse UI 展示完整链路树 +- [ ] **正式接入 Ragas** — 更新 `_ragas_eval()` 使用 `ragas.evaluate(Dataset(...))`,新增独立评估脚本 +- [ ] **丰富黄金数据集** — 补充 `expected_answer`,扩展至 50+ 条用例 +- [ ] **评估仪表盘前端** — Vue 组件可视化质量分布与 Bad Case +- [ ] **CI 回归基线** — 在 GitHub Actions 中运行 `test_retrieval.py`,指标回退时失败 +- [ ] **对接 Langfuse Datasets** — 将评估结果推送到 Langfuse Scores/Datasets API,统一可观测 + +--- + +## 📈 Star History + + + + + + Star History Chart + + diff --git a/app/core/config.py b/app/core/config.py new file mode 100644 index 0000000000000000000000000000000000000000..dfdc2f70187b44d11fe3cf99cbd2664061a7dd4f --- /dev/null +++ b/app/core/config.py @@ -0,0 +1,246 @@ +# 文件路径: app/core/config.py +""" +应用配置模块 - 统一配置中心 + +支持多 LLM 供应商配置: +- OpenAI (GPT-4, GPT-4o 等) +- DeepSeek (deepseek-chat 等) +- Anthropic (Claude 系列) +- Google Gemini (gemini-3-flash-preview 等) +""" +import os +from dataclasses import dataclass, field +from typing import Optional, Tuple +from dotenv import load_dotenv + +# 加载 .env 文件 +load_dotenv() + + +# ============================================================ +# Agent 分析配置 +# ============================================================ + +@dataclass +class AgentAnalysisConfig: + """Agent 分析引擎配置""" + # Repo Map 配置 + initial_map_limit: int = 25 # 初始 Repo Map 文件数量 (提高精度) + max_symbols_per_file: int = 40 # 每文件最大符号数 (提高精度) + + # 分析轮次配置 + max_rounds: int = 4 # 最大分析轮数 (提高精度,因为报告可复用) + files_per_round: int = 5 # 每轮选择文件数 (提高精度) + max_context_length: int = 20000 # 上下文最大长度 (提高精度) + + # 优先级配置 + priority_exts: Tuple[str, ...] = ( + '.py', '.java', '.go', '.js', '.ts', '.tsx', '.cpp', '.cs', '.rs' + ) + priority_keywords: Tuple[str, ...] = ( + 'main', 'app', 'core', 'api', 'service', 'utils', 'controller', 'model', 'config' + ) + + +# ============================================================ +# 向量服务配置 +# ============================================================ + +@dataclass +class VectorServiceConfig: + """向量服务配置""" + # 数据目录 + data_dir: str = "data" + context_dir: str = "data/contexts" + cache_version: str = "2.0" + + # Embedding 配置 + embedding_api_url: str = "https://api.siliconflow.cn/v1" + embedding_model: str = "BAAI/bge-m3" + embedding_batch_size: int = 50 + embedding_max_length: int = 8000 + embedding_concurrency: int = 5 + embedding_dimensions: int = 1024 + + # BM25 配置 + tokenize_regex: str = r'[^a-zA-Z0-9_\.@\u4e00-\u9fa5]+' + + # 混合搜索 RRF 参数 + rrf_k: int = 60 + rrf_weight_vector: float = 1.0 + rrf_weight_bm25: float = 0.3 + search_oversample: int = 2 + default_top_k: int = 3 + + # Session LRU 缓存配置 + session_max_count: int = 100 # 内存中最大 session 数 + + +# ============================================================ +# 对话记忆配置 +# ============================================================ + +@dataclass +class ConversationConfig: + """对话记忆配置""" + # 滑动窗口 + max_recent_turns: int = 10 # 保留最近 N 轮对话 + max_context_tokens: int = 8000 # 最大上下文 token 数 + summary_threshold: int = 15 # 超过 N 轮开始压缩 + # 对话记忆是纯内存存储,服务重启自动清空,无需定时清理 + + +# ============================================================ +# Qdrant 配置 +# ============================================================ + +@dataclass +class QdrantServiceConfig: + """ + Qdrant 向量数据库配置 + + 支持三种模式 (通过环境变量 QDRANT_MODE 切换): + - local: 本地嵌入式存储 (开发环境, 单 Worker) + - server: Qdrant Server Docker (生产环境, 多 Worker) + - cloud: Qdrant Cloud 托管服务 + + 环境变量: + - QDRANT_MODE: "local" | "server" | "cloud" + - QDRANT_URL: 服务器 URL (server/cloud 模式) + - QDRANT_API_KEY: API 密钥 (cloud 模式必需) + - QDRANT_LOCAL_PATH: 本地存储路径 (local 模式) + """ + mode: str = os.getenv("QDRANT_MODE", "local") + url: str = os.getenv("QDRANT_URL", "") + host: str = os.getenv("QDRANT_HOST", "localhost") + port: int = int(os.getenv("QDRANT_PORT", "6333")) + grpc_port: int = int(os.getenv("QDRANT_GRPC_PORT", "6334")) + prefer_grpc: bool = True + api_key: str = os.getenv("QDRANT_API_KEY", "") + + local_path: str = os.getenv("QDRANT_LOCAL_PATH", "data/qdrant_db") + + vector_size: int = 1024 # BGE-M3 维度 + hnsw_m: int = 16 + hnsw_ef_construct: int = 100 + batch_size: int = 100 + timeout: float = 30.0 + + +# ============================================================ +# LLM 供应商配置 +# ============================================================ + + +class Settings: + """应用配置类""" + + # --- LLM 供应商选择 --- + # 支持: "openai", "deepseek", "anthropic", "gemini" + LLM_PROVIDER = os.getenv("LLM_PROVIDER", "deepseek") + + # --- API Keys (根据选择的供应商配置对应的 Key) --- + GITHUB_TOKEN = os.getenv("GITHUB_TOKEN") + + # OpenAI + OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") + OPENAI_BASE_URL = os.getenv("OPENAI_BASE_URL") # 可选自定义端点 + + # DeepSeek + DEEPSEEK_API_KEY = os.getenv("DEEPSEEK_API_KEY") + DEEPSEEK_BASE_URL = os.getenv("DEEPSEEK_BASE_URL", "https://api.deepseek.com") + + # Anthropic (Claude) + ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY") + + # Google Gemini + GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") + GEMINI_BASE_URL = os.getenv("GEMINI_BASE_URL") # 可选 OpenAI 兼容端点 + + # SiliconFlow (Embedding) + SILICON_API_KEY = os.getenv("SILICON_API_KEY") + + # --- 模型配置 --- + # 如果不指定,将使用各供应商的默认模型 + MODEL_NAME = os.getenv("MODEL_NAME") + + # --- 服务配置 --- + HOST = os.getenv("HOST", "127.0.0.1") + PORT = int(os.getenv("PORT", 8000)) + + # --- LLM 默认参数 --- + LLM_TEMPERATURE = float(os.getenv("LLM_TEMPERATURE", "0.1")) + LLM_MAX_TOKENS = int(os.getenv("LLM_MAX_TOKENS", "4096")) + LLM_TIMEOUT = int(os.getenv("LLM_TIMEOUT", "600")) + + @property + def current_api_key(self) -> Optional[str]: + """获取当前选择的供应商的 API Key""" + key_mapping = { + "openai": self.OPENAI_API_KEY, + "deepseek": self.DEEPSEEK_API_KEY, + "anthropic": self.ANTHROPIC_API_KEY, + "gemini": self.GEMINI_API_KEY, + } + return key_mapping.get(self.LLM_PROVIDER.lower()) + + @property + def current_base_url(self) -> Optional[str]: + """获取当前选择的供应商的 Base URL""" + url_mapping = { + "openai": self.OPENAI_BASE_URL, + "deepseek": self.DEEPSEEK_BASE_URL, + "anthropic": None, + "gemini": self.GEMINI_BASE_URL, + } + return url_mapping.get(self.LLM_PROVIDER.lower()) + + @property + def default_model_name(self) -> str: + """获取当前供应商的默认模型名称""" + defaults = { + "openai": "gpt-4o-mini", + "deepseek": "deepseek-chat", + "anthropic": "claude-3-5-sonnet-20241022", + "gemini": "gemini-3-flash-preview", + } + return self.MODEL_NAME or defaults.get(self.LLM_PROVIDER.lower(), "default") + + def validate(self): + """启动时检查必要的配置是否存在""" + provider = self.LLM_PROVIDER.lower() + print(f"🔧 LLM Provider: {provider.upper()}") + + # 1. 检查选择的供应商的 API Key + if not self.current_api_key: + key_name = f"{provider.upper()}_API_KEY" + raise ValueError( + f"❌ 错误: 缺少 {key_name}。\n" + f" 当前选择的 LLM 供应商是: {provider}\n" + f" 请在 .env 文件中设置 {key_name},或更改 LLM_PROVIDER 为其他供应商。" + ) + + # 2. 检查 SiliconCloud Key (Embedding 功能) + if not self.SILICON_API_KEY: + print("⚠️ 警告: 未找到 SILICON_API_KEY,向量检索功能可能无法工作。") + + # 3. 检查 GitHub Token (可选但建议) + if not self.GITHUB_TOKEN: + print("⚠️ 警告: 未找到 GITHUB_TOKEN,GitHub API 请求将受到每小时 60 次的严格限制。") + + print(f"✅ 配置验证通过 (Model: {self.default_model_name})") + + +# ============================================================ +# 全局配置实例 +# ============================================================ + +# LLM 设置 +settings = Settings() +settings.validate() + +# 子系统配置 +agent_config = AgentAnalysisConfig() +vector_config = VectorServiceConfig() +conversation_config = ConversationConfig() +qdrant_config = QdrantServiceConfig() \ No newline at end of file diff --git a/app/main.py b/app/main.py new file mode 100644 index 0000000000000000000000000000000000000000..6ffa1b5ffced747d94f01a5d2ec033c76decb911 --- /dev/null +++ b/app/main.py @@ -0,0 +1,560 @@ +# 文件路径: app/main.py +import sys +import io +import os +import asyncio +from contextlib import asynccontextmanager + +# 强制 stdout 使用 utf-8,防止 Windows 控制台乱码 +sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') + +from fastapi import FastAPI, Request +from fastapi.middleware.cors import CORSMiddleware +from sse_starlette.sse import EventSourceResponse +from fastapi.responses import StreamingResponse, HTMLResponse, JSONResponse +from fastapi.staticfiles import StaticFiles +import uvicorn + +from app.core.config import settings +from app.services.agent_service import agent_stream +from app.services.chat_service import process_chat_stream, get_eval_data, clear_eval_data +from app.services.vector_service import store_manager +from app.services.auto_evaluation_service import ( + init_auto_evaluation_service, + get_auto_evaluation_service, + EvaluationConfig +) +from evaluation.evaluation_framework import EvaluationEngine, EvaluationResult, DataRoutingEngine +from datetime import datetime +import uuid + +settings.validate() + +# === 生命周期管理 === +@asynccontextmanager +async def lifespan(app: FastAPI): + """应用生命周期管理""" + from app.services.vector_service import store_manager + + # 启动时运行 + print("🚀 Application starting...") + # 仓库数据永久存储,对话记忆纯内存存储(重启自动清空) + + yield + + # 关闭时运行 + print("🛑 Application shutting down...") + + # 清理 GitHub 客户端连接 + from app.utils.github_client import close_github_client + await close_github_client() + + # 清理向量存储连接 + await store_manager.close_all() + + # 关闭共享的 Qdrant 客户端 + from app.storage.qdrant_store import close_shared_client + await close_shared_client() + + print("✅ Cleanup complete") + +app = FastAPI(title="GitHub RAG Agent", lifespan=lifespan) + +# === 初始化评估引擎 === +from app.utils.llm_client import client +eval_engine = EvaluationEngine(llm_client=client, model_name=settings.default_model_name) +data_router = DataRoutingEngine() + +# === 初始化自动评估服务 (Phase 1) === +auto_eval_config = EvaluationConfig( + enabled=True, + use_ragas=False, # Phase 1: 先不用 Ragas,避免额外依赖 + async_evaluation=True, # 异步模式,不阻塞响应 + min_quality_score=0.4, # 最低分数阈值(0.4 = 只拒绝最差的) + min_query_length=10, # 最小 query 长度 + min_answer_length=100, # 最小 answer 长度 + require_repo_url=True, # 必须有仓库 URL + require_code_in_context=True # 上下文必须包含代码 +) +auto_eval_service = init_auto_evaluation_service( + eval_engine=eval_engine, + data_router=data_router, + config=auto_eval_config +) +print("✅ Auto Evaluation Service Initialized") + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +# === 静态文件与前端 === +app.mount("/static", StaticFiles(directory="app"), name="static") + +# Vue 3 构建输出的静态资源 (JS/CSS/assets) +import os +FRONTEND_DIST = os.path.join(os.path.dirname(os.path.dirname(__file__)), "frontend-dist") +if os.path.exists(FRONTEND_DIST): + app.mount("/assets", StaticFiles(directory=os.path.join(FRONTEND_DIST, "assets")), name="vue-assets") + +@app.get("/", response_class=HTMLResponse) +async def read_root(): + # 优先使用 Vue 3 构建版本,否则回退到原版 + vue_index = os.path.join(FRONTEND_DIST, "index.html") + if os.path.exists(vue_index): + with open(vue_index, "r", encoding="utf-8") as f: + return f.read() + # 回退到原版前端 + with open("frontend/index.html", "r", encoding="utf-8") as f: + return f.read() + +@app.get("/health") +def health_check(): + return {"status": "ok"} + +@app.get("/api/sessions") +async def get_sessions(): + """获取 session 管理状态""" + return JSONResponse(store_manager.get_stats()) + +@app.post("/api/sessions/cleanup") +async def trigger_cleanup(): + """手动触发过期文件清理""" + stats = await store_manager.cleanup_expired_files() + return JSONResponse({"message": "Cleanup completed", "stats": stats}) + +@app.delete("/api/sessions/{session_id}") +async def close_session(session_id: str): + """关闭指定 session""" + await store_manager.close_session(session_id) + return JSONResponse({"message": f"Session {session_id} closed"}) + + +# === 仓库级 Session API === + +@app.post("/api/repo/check") +async def check_repo_session(request: Request): + """ + 检查仓库是否已有指定语言的索引和报告 + + 请求: { "url": "https://github.com/owner/repo", "language": "zh" } + 响应: { + "exists": true/false, + "session_id": "repo_xxx", + "report": "..." (如果存在对应语言的报告), + "has_index": true/false, + "available_languages": ["en", "zh"] + } + """ + from app.utils.session import generate_repo_session_id + + data = await request.json() + repo_url = data.get("url", "").strip() + language = data.get("language", "en") + + if not repo_url: + return JSONResponse({"error": "Missing URL"}, status_code=400) + + # 生成基于仓库的 Session ID + session_id = generate_repo_session_id(repo_url) + + # 检查是否存在 + store = store_manager.get_store(session_id) + + # 尝试加载上下文 + context = store.load_context() + + if context and context.get("repo_url"): + # 存在已分析的仓库 + # 获取指定语言的报告 + report = store.get_report(language) + available_languages = store.get_available_languages() + global_context = context.get("global_context", {}) + has_index = bool(global_context.get("file_tree")) + + return JSONResponse({ + "exists": True, + "session_id": session_id, + "repo_url": context.get("repo_url"), + "report": report, # 指定语言的报告,可能为 None + "has_index": has_index, + "available_languages": available_languages, + "requested_language": language, + }) + else: + return JSONResponse({ + "exists": False, + "session_id": session_id, + "has_index": False, + "available_languages": [], + }) + + +@app.get("/analyze") +async def analyze(url: str, session_id: str, language: str = "en", regenerate_only: bool = False): + """ + 仓库分析端点 + + Args: + url: 仓库 URL + session_id: Session ID + language: 报告语言 ("en" 或 "zh") + regenerate_only: True 时跳过抓取/索引,直接使用已有索引生成新语言报告 + """ + if not session_id: + return {"error": "Missing session_id"} + return EventSourceResponse(agent_stream(url, session_id, language, regenerate_only)) + +@app.post("/chat") +async def chat(request: Request): + """ + 聊天端点 - 自动评估版本 + + 改进点: + 1. 立即返回聊天结果(不阻塞) + 2. 后台异步进行自动评估 + 3. 评估结果自动存储到 evaluation/sft_data/ + """ + data = await request.json() + user_query = data.get("query") + session_id = data.get("session_id") + repo_url = data.get("repo_url", "") + + if not user_query: + return {"answer": "Please enter your question"} + if not session_id: + return {"answer": "Session lost"} + + # 标记流是否完成 + stream_completed = False + + async def chat_stream_with_eval(): + """包装 process_chat_stream,流结束后触发评估""" + nonlocal stream_completed + + # 清除旧的评估数据 + clear_eval_data(session_id) + + # 执行聊天流 + async for chunk in process_chat_stream(user_query, session_id): + yield chunk + + # 流完成后标记 + stream_completed = True + + # 流结束后触发评估(此时数据已存储在 chat_service 中) + try: + auto_eval_service = get_auto_evaluation_service() + eval_data = get_eval_data(session_id) + + if auto_eval_service and eval_data and eval_data.answer: + print(f"\n📊 [Auto-Eval] Starting evaluation for session {session_id}") + print(f" - Query: {user_query[:50]}...") + print(f" - Context length: {len(eval_data.retrieved_context)} chars") + print(f" - Answer length: {len(eval_data.answer)} chars") + + # 异步执行评估(不阻塞流结束) + asyncio.create_task( + auto_eval_service.auto_evaluate_async( + query=user_query, + retrieved_context=eval_data.retrieved_context, + generated_answer=eval_data.answer, + session_id=session_id, + repo_url=repo_url, + language="zh" if any('\u4e00' <= c <= '\u9fff' for c in user_query) else "en" + ) + ) + else: + if not auto_eval_service: + print("⚠️ Auto evaluation service not initialized") + elif not eval_data: + print(f"⚠️ No eval data found for session {session_id}") + elif not eval_data.answer: + print(f"⚠️ Empty answer for session {session_id}") + except Exception as e: + print(f"⚠️ Failed to trigger auto-eval: {e}") + import traceback + traceback.print_exc() + + # 返回流 + return StreamingResponse( + chat_stream_with_eval(), + media_type="text/plain" + ) + +# ===== Phase 2: 新增评估端点 ===== + +@app.post("/evaluate") +async def evaluate(request: Request): + """ + 评估端点: 接收生成结果,进行多维度评估 + + POST /evaluate + { + "query": "用户问题", + "retrieved_context": "检索到的文件内容", + "generated_answer": "生成的回答", + "session_id": "会话ID", + "repo_url": "仓库URL(可选)" + } + """ + try: + data = await request.json() + + # 提取必需字段 + query = data.get("query") + retrieved_context = data.get("retrieved_context", "") + generated_answer = data.get("generated_answer") + session_id = data.get("session_id", "unknown") + repo_url = data.get("repo_url", "") + + if not query or not generated_answer: + return { + "error": "Missing required fields: query, generated_answer", + "status": "failed" + } + + # 调用评估引擎获取生成层指标 + generation_metrics = await eval_engine.evaluate_generation( + query=query, + retrieved_context=retrieved_context, + generated_answer=generated_answer + ) + + # 构建完整的评估结果对象 + evaluation_result = EvaluationResult( + session_id=session_id, + query=query, + repo_url=repo_url, + timestamp=datetime.now(), + language="en", + generation_metrics=generation_metrics + ) + + # 计算综合得分 + evaluation_result.compute_overall_score() + + # 数据路由: 根据得分将样本分类 + quality_tier = data_router.route_sample(evaluation_result) + + return { + "status": "success", + "evaluation": { + "faithfulness": generation_metrics.faithfulness, + "answer_relevance": generation_metrics.answer_relevance, + "answer_completeness": generation_metrics.answer_completeness, + "overall_score": evaluation_result.overall_score + }, + "quality_tier": quality_tier, + "session_id": session_id + } + + except Exception as e: + import traceback + traceback.print_exc() + return { + "error": str(e), + "status": "failed" + } + + +# ===== 自动评估相关端点 ===== + +@app.get("/auto-eval/review-queue") +async def get_review_queue(): + """ + 获取需要人工审查的样本列表 + + 这些是评估出现异常(自己的分数和Ragas分数差异过大)的样本 + 需要人工判断哪个评估器更准确 + + GET /auto-eval/review-queue + """ + try: + auto_eval_service = get_auto_evaluation_service() + if not auto_eval_service: + return {"error": "Auto evaluation service not initialized", "status": "failed"} + + queue = auto_eval_service.get_review_queue() + + return { + "status": "success", + "queue_size": len(queue), + "samples": [ + { + "index": i, + "query": item["eval_result"].query, + "custom_score": item["custom_score"], + "ragas_score": item["ragas_score"], + "diff": item["diff"], + "quality_tier": item["eval_result"].data_quality_tier.value, + "timestamp": item["timestamp"] + } + for i, item in enumerate(queue) + ] + } + except Exception as e: + return {"error": str(e), "status": "failed"} + + +@app.post("/auto-eval/approve/{index}") +async def approve_sample(index: int): + """ + 人工批准某个样本(接受该评估结果) + + POST /auto-eval/approve/0 + """ + try: + auto_eval_service = get_auto_evaluation_service() + if not auto_eval_service: + return {"error": "Auto evaluation service not initialized", "status": "failed"} + + auto_eval_service.approve_sample(index) + + return { + "status": "success", + "message": f"Sample {index} approved and stored" + } + except Exception as e: + return {"error": str(e), "status": "failed"} + + +@app.post("/auto-eval/reject/{index}") +async def reject_sample(index: int): + """ + 人工拒绝某个样本(抛弃该评估结果) + + POST /auto-eval/reject/0 + """ + try: + auto_eval_service = get_auto_evaluation_service() + if not auto_eval_service: + return {"error": "Auto evaluation service not initialized", "status": "failed"} + + auto_eval_service.reject_sample(index) + + return { + "status": "success", + "message": f"Sample {index} rejected and removed from queue" + } + except Exception as e: + return {"error": str(e), "status": "failed"} + + +@app.get("/auto-eval/stats") +async def auto_eval_stats(): + """ + 获取自动评估统计信息 + + GET /auto-eval/stats + """ + try: + auto_eval_service = get_auto_evaluation_service() + if not auto_eval_service: + return {"error": "Auto evaluation service not initialized", "status": "failed"} + + queue = auto_eval_service.get_review_queue() + + return { + "status": "success", + "auto_evaluation": { + "enabled": auto_eval_service.config.enabled, + "use_ragas": auto_eval_service.config.use_ragas, + "async_mode": auto_eval_service.config.async_evaluation, + "custom_weight": auto_eval_service.config.custom_weight, + "ragas_weight": auto_eval_service.config.ragas_weight, + "diff_threshold": auto_eval_service.config.diff_threshold + }, + "review_queue_size": len(queue), + "last_update": datetime.now().isoformat() + } + except Exception as e: + return {"error": str(e), "status": "failed"} + + +@app.get("/evaluation/stats") +async def evaluation_stats(): + """ + 获取评估统计信息 + + GET /evaluation/stats + """ + try: + stats = eval_engine.get_statistics() + return { + "status": "success", + "statistics": { + "total_evaluations": stats.get("total_evaluations", 0), + "average_score": stats.get("average_score", 0), + "quality_distribution": stats.get("quality_distribution", {}), + "top_issues": stats.get("top_issues", []) + } + } + except Exception as e: + return { + "error": str(e), + "status": "failed" + } + + +@app.get("/dashboard/quality-distribution") +async def quality_distribution(): + """ + 获取数据质量分布 (用于仪表盘) + + GET /dashboard/quality-distribution + """ + try: + distribution = data_router.get_distribution() + return { + "status": "success", + "distribution": { + "gold": distribution.get("gold", 0), + "silver": distribution.get("silver", 0), + "bronze": distribution.get("bronze", 0), + "rejected": distribution.get("rejected", 0), + "corrected": distribution.get("corrected", 0) + }, + "timestamp": datetime.now().isoformat() + } + except Exception as e: + return { + "error": str(e), + "status": "failed" + } + + +@app.get("/dashboard/bad-cases") +async def bad_cases(): + """ + 获取低质量样本 (用于人工审核) + + GET /dashboard/bad-cases + """ + try: + bad_samples = data_router.get_bad_samples(limit=10) + return { + "status": "success", + "bad_cases": [ + { + "query": s.get("query", ""), + "issue": s.get("issue", ""), + "score": s.get("score", 0) + } + for s in bad_samples + ], + "total_bad_cases": len(bad_samples) + } + except Exception as e: + return { + "error": str(e), + "status": "failed" + } + + +if __name__ == "__main__": + # 生产模式建议关掉 reload + uvicorn.run("app.main:app", host=settings.HOST, port=settings.PORT, reload=False) \ No newline at end of file diff --git a/app/services/agent_service.py b/app/services/agent_service.py new file mode 100644 index 0000000000000000000000000000000000000000..352be08436e9dc7559a80a30388f48657911e3b2 --- /dev/null +++ b/app/services/agent_service.py @@ -0,0 +1,779 @@ +# 文件路径: app/services/agent_service.py +import json +import asyncio +import traceback +import re +import ast +import httpx +import time +from typing import Set, Tuple, List +from datetime import datetime +from app.core.config import settings, agent_config +from app.utils.llm_client import client +from app.utils.repo_lock import RepoLock +from app.services.github_service import get_repo_structure, get_file_content +from app.services.vector_service import store_manager +from app.services.chunking_service import UniversalChunker, ChunkingConfig +from app.services.tracing_service import tracing_service +from evaluation.evaluation_framework import EvaluationEngine, EvaluationResult, DataRoutingEngine + +# === Helper: 鲁棒的 JSON 提取 === +def extract_json_from_text(text): + try: + text = re.sub(r"^```(json)?|```$", "", text.strip(), flags=re.MULTILINE).strip() + return json.loads(text) + except: + pass + match = re.search(r"\[.*\]", text, re.DOTALL) + if match: + try: return json.loads(match.group(0)) + except: pass + return [] + +# === 多语言符号提取 === +def _extract_symbols(content, file_path): + """ + 根据文件类型,智能提取 Class 和 Function 签名生成地图。 + """ + ext = file_path.split('.')[-1].lower() if '.' in file_path else "" + + # 1. Python 使用 AST (最准) + if ext == 'py': + return _extract_symbols_python(content) + + # 2. 其他语言使用正则 (Java, TS, JS, Go, C++) + elif ext in ['java', 'ts', 'tsx', 'js', 'jsx', 'go', 'cpp', 'cs', 'rs']: + return _extract_symbols_regex(content, ext) + + return [] + +def _extract_symbols_python(content): + try: + tree = ast.parse(content) + symbols = [] + for node in tree.body: + if isinstance(node, ast.ClassDef): + symbols.append(f" [C] {node.name}") + for sub in node.body: + if isinstance(sub, (ast.FunctionDef, ast.AsyncFunctionDef)): + if not sub.name.startswith("_") or sub.name == "__init__": + symbols.append(f" - {sub.name}") + elif isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): + symbols.append(f" [F] {node.name}") + return symbols + except: + return [] + +def _extract_symbols_regex(content, ext): + """ + 针对类 C 语言的通用正则提取。 + """ + symbols = [] + lines = content.split('\n') + + # 定义各语言的正则模式 + patterns = { + 'java': { + 'class': re.compile(r'(?:public|protected|private)?\s*(?:static|abstract)?\s*(?:class|interface|enum)\s+([a-zA-Z0-9_]+)'), + 'func': re.compile(r'(?:public|protected|private)\s+(?:static\s+)?[\w<>[\]]+\s+([a-zA-Z0-9_]+)\s*\(') + }, + 'ts': { + 'class': re.compile(r'class\s+([a-zA-Z0-9_]+)'), + 'func': re.compile(r'(?:function\s+([a-zA-Z0-9_]+)|const\s+([a-zA-Z0-9_]+)\s*=\s*(?:async\s*)?\(|([a-zA-Z0-9_]+)\s*\([^)]*\)\s*[:\{])') + }, + 'go': { + 'class': re.compile(r'type\s+([a-zA-Z0-9_]+)\s+(?:struct|interface)'), + 'func': re.compile(r'func\s+(?:(?:\(.*\)\s+)?([a-zA-Z0-9_]+)|([a-zA-Z0-9_]+)\()') + } + } + + lang_key = 'java' if ext in ['java', 'cs', 'cpp', 'rs'] else 'go' if ext == 'go' else 'ts' + rules = patterns.get(lang_key, patterns['java']) + + count = 0 + for line in lines: + line = line.strip() + # === 正则解析优化 (过滤更多干扰项) === + if not line or line.startswith(("//", "/*", "*", "#", "print", "console.")): continue + if count > agent_config.max_symbols_per_file: break + + # 匹配类 + c_match = rules['class'].search(line) + if c_match: + name = next((g for g in c_match.groups() if g), "Unknown") + symbols.append(f" [C] {name}") + count += 1 + continue + + # 匹配方法 + if line.endswith('{') or "=>" in line: + f_match = rules['func'].search(line) + if f_match: + name = next((g for g in f_match.groups() if g), None) + # 增强过滤 + if name and len(name) > 2 and name not in ['if', 'for', 'switch', 'while', 'catch', 'return']: + symbols.append(f" - {name}") + count += 1 + + return symbols + +async def generate_repo_map(repo_url, file_list, limit=agent_config.initial_map_limit) -> Tuple[str, Set[str]]: + """ + 生成增强版仓库地图 (多语言版) + Returns: + str: 地图字符串 + set: 已包含在地图中的文件路径集合 (用于增量更新查重) + """ + # === 扩展高优先级文件列表 (使用配置) === + priority_files = [ + f for f in file_list + if f.endswith(agent_config.priority_exts) and + (f.count('/') <= 2 or any(k in f.lower() for k in agent_config.priority_keywords)) + ] + + # 去重并截取 + targets = sorted(list(set(priority_files)))[:limit] + remaining = [f for f in file_list if f not in targets] + + repo_map_lines = [] + mapped_files_set = set(targets) # === 记录已映射的文件 === + + async def process_file(path): + content = await get_file_content(repo_url, path) + if not content: return f"{path} (Read Failed)" + + symbols = await asyncio.to_thread(_extract_symbols, content, path) + + if symbols: + return f"{path}\n" + "\n".join(symbols) + return path + + repo_map_lines.append(f"--- Key Files Structure (Top {len(targets)}) ---") + + tasks = [process_file(f) for f in targets] + results = await asyncio.gather(*tasks) + repo_map_lines.extend(results) + + if remaining: + repo_map_lines.append("\n--- Other Files ---") + if len(remaining) > 300: + repo_map_lines.extend(remaining[:300]) + repo_map_lines.append(f"... ({len(remaining)-300} more files)") + else: + repo_map_lines.extend(remaining) + + return "\n".join(repo_map_lines), mapped_files_set + + +async def agent_stream(repo_url: str, session_id: str, language: str = "en", regenerate_only: bool = False): + """ + 主分析流程。 + + Args: + repo_url: GitHub 仓库 URL + session_id: 会话 ID + language: 报告语言 (zh/en) + regenerate_only: 如果为 True,跳过索引步骤,直接使用已有数据生成新语言报告 + """ + short_id = session_id[-6:] if session_id else "unknown" + + # === 追踪初始化 === + trace_id = tracing_service.start_trace( + trace_name="agent_analysis", + session_id=session_id, + metadata={"repo_url": repo_url, "language": language, "regenerate_only": regenerate_only} + ) + start_time = time.time() + + # === 检查是否有其他用户正在分析同一仓库 === + if not regenerate_only: + if await RepoLock.is_locked(session_id): + yield json.dumps({ + "step": "waiting", + "message": f"⏳ Another user is analyzing this repository. Please wait..." + }) + + # === 获取仓库锁 (仅写操作需要) === + try: + async with RepoLock.acquire(session_id): + async for event in _agent_stream_inner( + repo_url, session_id, language, regenerate_only, + short_id, trace_id, start_time + ): + yield event + except TimeoutError as e: + yield json.dumps({ + "step": "error", + "message": f"❌ {str(e)}. The repository is being analyzed by another user." + }) + + +async def _agent_stream_inner( + repo_url: str, session_id: str, language: str, regenerate_only: bool, + short_id: str, trace_id: str, start_time: float +): + """ + 实际的分析流程 (在锁保护下执行) + """ + try: + vector_db = store_manager.get_store(session_id) + + # 调试日志:确认 session 隔离 + print(f"🔍 [DEBUG] session_id: {session_id}, collection: {vector_db.collection_name}, context_file: {vector_db._context_file}") + + # === regenerate_only 模式:跳过索引,直接生成报告 === + if regenerate_only: + yield json.dumps({"step": "init", "message": f"🔄 [Session: {short_id}] Regenerating report in {language}..."}) + await asyncio.sleep(0.3) + + # 从已有索引加载上下文 + context = vector_db.load_context() + if not context: + yield json.dumps({"step": "error", "message": "❌ No existing index found. Please analyze the repository first."}) + return + + # 正确读取 global_context 内的字段 + global_ctx = context.get("global_context", {}) + file_tree_str = global_ctx.get("file_tree", "") + context_summary = global_ctx.get("summary", "") + visited_files = set() # regenerate 模式不需要这个,但报告生成需要引用 + + # 验证上下文与请求的仓库匹配 + stored_repo_url = context.get("repo_url", "") + if stored_repo_url and repo_url not in stored_repo_url and stored_repo_url not in repo_url: + print(f"⚠️ [WARNING] repo_url mismatch! Request: {repo_url}, Stored: {stored_repo_url}") + + yield json.dumps({"step": "generating", "message": f"📝 Generating report in {'Chinese' if language == 'zh' else 'English'}..."}) + else: + # === 正常分析模式 === + yield json.dumps({"step": "init", "message": f"🚀 [Session: {short_id}] Connecting to GitHub..."}) + await asyncio.sleep(0.5) + + await vector_db.reset() # 使用异步方法 + + chunker = UniversalChunker(config=ChunkingConfig(min_chunk_size=50)) + + file_list = await get_repo_structure(repo_url) + if not file_list: + raise Exception("Repository is empty or unreadable.") + + yield json.dumps({"step": "fetched", "message": f"📦 Found {len(file_list)} files. Building Repo Map (AST Parsing)..."}) + + # === 接收 mapped_files 用于后续查重 + 计时 === + map_start = time.time() + file_tree_str, mapped_files = await generate_repo_map(repo_url, file_list, limit=agent_config.initial_map_limit) + map_latency_ms = (time.time() - map_start) * 1000 + tracing_service.add_event("repo_map_generated", {"latency_ms": map_latency_ms, "files_mapped": len(mapped_files)}) + + visited_files = set() + context_summary = "" + readme_file = next((f for f in file_list if f.lower().endswith("readme.md")), None) + + for round_idx in range(agent_config.max_rounds): + yield json.dumps({"step": "thinking", "message": f"🕵️ [Round {round_idx+1}/{agent_config.max_rounds}] DeepSeek is analyzing Repo Map..."}) + + system_prompt = "You are a Senior Software Architect. Your goal is to understand the codebase." + user_content = f""" + [Project Repo Map] + (Contains file paths and key Class/Function signatures) + {file_tree_str} + + [Files Already Read] + {list(visited_files)} + + [Current Knowledge] + {context_summary} + + [Task] + Select 1-{agent_config.files_per_round} MOST CRITICAL files to read next to understand the core logic. + Focus on files that seem to contain main logic based on the Repo Map symbols. + + [Constraint] + Return ONLY a raw JSON list of strings. No markdown. + Example: ["src/main.py", "app/auth.py"] + """ + + if not client: + yield json.dumps({"step": "error", "message": "❌ LLM Client Not Initialized."}) + return + + # === Token & Latency Tracing === + llm_start_time = time.time() + plan_messages = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_content} + ] + + response = await client.chat.completions.create( + model=settings.default_model_name, + messages=plan_messages, + temperature=0.1, + timeout=settings.LLM_TIMEOUT + ) + + llm_latency_ms = (time.time() - llm_start_time) * 1000 + raw_content = response.choices[0].message.content + + # 记录 Token 使用量 + usage = getattr(response, 'usage', None) + tracing_service.record_llm_generation( + model=settings.default_model_name, + prompt_messages=plan_messages, + generated_text=raw_content, + total_latency_ms=llm_latency_ms, + prompt_tokens=usage.prompt_tokens if usage else None, + completion_tokens=usage.completion_tokens if usage else None, + total_tokens=usage.total_tokens if usage else None, + is_streaming=False, + metadata={"step": "file_selection", "round": round_idx + 1} + ) + target_files = extract_json_from_text(raw_content) + + valid_files = [f for f in target_files if f in file_list and f not in visited_files] + + if round_idx == 0 and readme_file and readme_file not in visited_files and readme_file not in valid_files: + valid_files.insert(0, readme_file) + + if not valid_files: + yield json.dumps({"step": "plan", "message": f"🛑 [Round {round_idx+1}] Sufficient context gathered."}) + break + + yield json.dumps({"step": "plan", "message": f"👉 [Round {round_idx+1}] Selected: {valid_files}"}) + + # === 并发模型缺陷优化 (并行下载处理) === + async def process_single_file(file_path): + try: + file_start = time.time() + + # 🔧 异步 GitHub API (已优化为非阻塞) + content = await get_file_content(repo_url, file_path) + if not content: + tracing_service.add_event("file_read_failed", {"file": file_path}) + return None + + # 1. 摘要与 Context + lines = content.split('\n')[:50] + preview = "\n".join(lines) + file_knowledge = f"\n--- File: {file_path} ---\n{preview}\n" + + # 2. Repo Map 增量更新与查重 + new_map_entry = None + if file_path not in mapped_files: + symbols = await asyncio.to_thread(_extract_symbols, content, file_path) + if symbols: + new_map_entry = f"{file_path}\n" + "\n".join(symbols) + + # 3. 切片与入库 + chunks = await asyncio.to_thread(chunker.chunk_file, content, file_path) + if chunks: + documents = [c["content"] for c in chunks] + metadatas = [] + for c in chunks: + meta = c["metadata"] + metadatas.append({ + "file": meta["file"], + "type": meta["type"], + "name": meta.get("name", ""), + "class": meta.get("class") or "" + }) + if documents: + try: + await vector_db.add_documents(documents, metadatas) + except Exception as e: + print(f"❌ 索引错误 {file_path}: {e}") + # 不中断,继续处理其他文件 + return None + + file_latency_ms = (time.time() - file_start) * 1000 + tracing_service.add_event("file_processed", { + "file": file_path, + "latency_ms": file_latency_ms, + "chunks_count": len(chunks) if chunks else 0 + }) + + return { + "path": file_path, + "knowledge": file_knowledge, + "map_entry": new_map_entry + } + except Exception as e: + print(f"❌ 处理文件错误 {file_path}: {e}") + return None + + # 提示开始并发下载 + yield json.dumps({"step": "download", "message": f"📥 Starting parallel download for {len(valid_files)} files..."}) + + # 启动并发任务 (return_exceptions=True 防止单个失败导致整个中断) + tasks = [process_single_file(f) for f in valid_files] + results = await asyncio.gather(*tasks, return_exceptions=True) + + # 聚合结果 + download_count = 0 + for res in results: + if not res or isinstance(res, Exception): + if isinstance(res, Exception): + print(f"❌ Task 异常: {res}") + continue + download_count += 1 + visited_files.add(res["path"]) + context_summary += res["knowledge"] + + # 增量更新 Map + if res["map_entry"]: + file_tree_str = f"{res['map_entry']}\n\n{file_tree_str}" + mapped_files.add(res["path"]) + + # === 硬编码截断解耦 === + context_summary = context_summary[:agent_config.max_context_length] + + global_context_data = { + "file_tree": file_tree_str, + "summary": context_summary[:8000] + } + await vector_db.save_context(repo_url, global_context_data) + + yield json.dumps({"step": "indexing", "message": f"🧠 [Round {round_idx+1}] Processed {download_count} files. Knowledge graph updated."}) + + # Final Report (正常分析模式下的提示) + yield json.dumps({"step": "generating", "message": "📝 Generating technical report..."}) + + # === 报告生成 (两种模式共用) === + + # === P0: 向量检索补充关键代码片段 === + yield json.dumps({"step": "enriching", "message": "🔍 Retrieving key code snippets..."}) + + key_queries = [ + "main entry point initialization startup", + "core business logic handler processor", + "API routes endpoints controllers", + "database models schema ORM", + "authentication authorization middleware" + ] + + retrieved_snippets = [] + try: + await vector_db.initialize() + for query in key_queries: + results = await vector_db.search_hybrid(query, top_k=2) + for r in results: + snippet = r.get("content", "")[:400] + file_path = r.get("file", "unknown") + if snippet and snippet not in [s.split("]")[1] if "]" in s else s for s in retrieved_snippets]: + retrieved_snippets.append(f"[{file_path}]\n{snippet}") + except Exception as e: + print(f"⚠️ 向量检索失败: {e}") + + code_snippets_section = "\n\n".join(retrieved_snippets[:8]) if retrieved_snippets else "" + + # === P1: 依赖文件解析 === + dep_files = ["requirements.txt", "pyproject.toml", "package.json", "go.mod", "Cargo.toml", "pom.xml", "build.gradle"] + dependencies_info = "" + + # 获取 file_list(regenerate_only 模式下需要重新获取) + if regenerate_only: + try: + temp_file_list = await get_repo_structure(repo_url) + except: + temp_file_list = [] + else: + temp_file_list = file_list if 'file_list' in dir() else [] + + for dep_file in dep_files: + matching = [f for f in temp_file_list if f.endswith(dep_file)] + for f in matching[:1]: # 只取第一个匹配 + try: + content = await get_file_content(repo_url, f) + if content: + dependencies_info += f"\n[{f}]\n{content[:800]}\n" + except: + pass + + # 构建增强的上下文 + enhanced_context = f""" + {context_summary[:12000]} + + [Key Code Snippets (Retrieved by Semantic Search)] + {code_snippets_section} + + [Project Dependencies] + {dependencies_info if dependencies_info else "No dependency file found."} + """ + + repo_map_injection = f""" + [Project Repo Map (Structure)] + {file_tree_str} + """ + + # === 根据语言选择 Prompt === + if language == "zh": + # --- 中文 Prompt --- + system_role = "你是一位务实的技术专家。目标是为开发者创建一个'3页纸'架构概览,让他们能在5分钟内看懂这个仓库。重点关注架构和数据流,不要纠结细节。" + analysis_user_content = f""" + [角色] + 你是一位务实的技术专家(Tech Lead)。 + + [输入数据] + {repo_map_injection} + + 分析的文件: {list(visited_files)} + + [代码知识库与关键片段] + {enhanced_context} + + [严格限制] + 1. **不进行代码审查**: 不要列出 Bug、缺失功能或改进建议。 + 2. **不评价**: 不要评价代码质量,只描述它**如何工作**。 + 3. **语调**: 专业、结构化、描述性。使用中文回答。 + 4. **不要废话**: 不要写"安全性"、"未来规划"等未请求的章节。 + + [输出格式要求 (Markdown)] + + # 项目分析报告 + + ## 1. 执行摘要 (Executive Summary) + - **用途**: (这个项目具体解决什么问题?1-2句话) + - **核心功能**: (列出Top 3功能点) + - **技术栈**: (语言、框架、数据库、关键库) + + ## 2. 系统架构 (Mermaid) + 创建一个 `graph TD` 图。 + - 展示高层组件 (如 Client, API Server, Database, Worker, External Service)。 + - 在连线上标注数据流 (如 "HTTP", "SQL")。 + - **风格**: 保持概念清晰简单,节点数量控制在 8 个以内。 + + **⚠️ Mermaid 语法严格要求 (v10.x)**: + 1. **所有节点文本必须用双引号包裹**: `A["用户界面"]` ✓, `A[用户界面]` ✗ + 2. **所有连线标签必须用双引号包裹**: `-->|"HTTP请求"|` ✓, `-->|HTTP请求|` ✗ + 3. **禁止使用特殊字符**: 不要在文本中使用 `
`, `/`, `(`, `)`, `&`, `<`, `>` 等 + 4. **使用简短英文ID**: 节点ID用简短英文如 `A`, `B`, `Client`, `API` + 5. **subgraph 标题也需引号**: `subgraph "核心服务"` ✓ + 6. **数据库节点**: 使用 `[("数据库")]` 格式 + + - **正确示例**: + ```mermaid + graph TD + Client["客户端"] -->|"HTTP请求"| API["API网关"] + API --> Service["业务服务"] + Service --> DB[("数据库")] + Service -->|"调用"| External["外部服务"] + ``` + + ## 3. 核心逻辑分析 (Table) + (总结关键模块,不要列出所有文件,只列最重要的) + + | 组件/文件 | 职责 (它做什么?) | 关键设计模式/逻辑 | + | :--- | :--- | :--- | + | 例如 `auth_service.py` | 处理JWT颁发与验证 | 单例模式, 路由装饰器 | + | ... | ... | ... | + + ## 4. 🔬 核心方法深度解析 + (精选 3-5 个最关键的 `.py` 文件。针对每个文件,列出驱动逻辑的 Top 2-3 个方法) + + ### 4.1 `[文件名]` + * **`[方法名]`**: [解释它做什么以及为什么重要,不要贴代码] + * **`[方法名]`**: [解释...] + + ## 5. 主要工作流 (Mermaid) + 选择**一个最重要**的业务流程 (Happy Path)。 + 创建一个 `sequenceDiagram`。 + - 参与者应该是高层概念 (如 User, API, DB),不要用具体变量名。 + + **⚠️ sequenceDiagram 语法要求**: + 1. **participant 别名格式**: `participant API as "API服务"` ✓ + 2. **消息文本用双引号**: `User->>API: "发起请求"` ✓ + 3. **避免特殊字符**: 不要在消息中使用 `/`, `&`, `<`, `>` 等 + + - **正确示例**: + ```mermaid + sequenceDiagram + participant User as "用户" + participant API as "API服务" + participant DB as "数据库" + User->>API: "发起请求" + API->>DB: "查询数据" + DB-->>API: "返回结果" + API-->>User: "响应数据" + ``` + + ## 6. 快速开始 (Quick Start) + - **前置条件**: (如 Docker, Python 3.9+, .env 配置) + - **入口**: (如何启动主逻辑?如 `python main.py`) + """ + else: + analysis_user_content = f""" + [Role] + You are a **Pragmatic Tech Lead**. Your goal is to create a **"3-Pages" Architecture Overview** for a developer who wants to understand this repo in 5 minutes. + [Input Data] + {repo_map_injection} + + Files analyzed: {list(visited_files)} + + [Code Knowledge & Key Snippets] + {enhanced_context} + + [Strict Constraints] + 1. **NO Code Review**: Do NOT list bugs, issues, missing features, or recommendations. + 2. **NO Critique**: Do not judge the code quality. Focus on HOW it works. + 3. **Tone**: Professional, descriptive, and structural. + 4. **NO "FLUFF"**: Do NOT add unrequested sections like "Security", "Scalability", "Data Models", "Future Enhancements", etc. + + [Required Output Format (Markdown)] + + # Project Analysis Report + + ## 1. Executive Summary + - **Purpose**: (What specific problem does this project solve? 1-2 sentences) + - **Key Features**: (Bullet points of top 3 features) + - **Tech Stack**: (List languages, frameworks, databases, and key libs) + + ## 2. System Architecture + Create a `graph TD` diagram. + - Show high-level components (e.g., Client, API Server, Database, Worker, External Service). + - Label the edges with data flow (e.g., "HTTP", "SQL"). + - **Style**: Keep it simple and conceptual. Limit to 8 nodes max. + + **⚠️ Mermaid Syntax Rules (v10.x - MUST FOLLOW)**: + 1. **Wrap ALL node text in double quotes**: `A["User Client"]` ✓, `A[User Client]` ✗ + 2. **Wrap ALL edge labels in double quotes**: `-->|"HTTP Request"|` ✓, `-->|HTTP Request|` ✗ + 3. **NO special characters in text**: Avoid `/`, `()`, `&`, `<>`, `
` in labels + 4. **Use short alphanumeric IDs**: e.g., `A`, `B`, `Client`, `API`, `DB` + 5. **Subgraph titles need quotes**: `subgraph "Core Services"` ✓ + 6. **Database node format**: Use `[("Database")]` for cylinder shape + + - **Correct Example**: + ```mermaid + graph TD + Client["User Client"] -->|"HTTP Request"| API["API Gateway"] + API --> Service["Business Service"] + Service --> DB[("Database")] + Service -->|"Calls"| External["External API"] + ``` + + ## 3. Core Logic Analysis + (Create a Markdown Table to summarize key modules. Do not list every file, only the most important ones.) + + | Component/File | Responsibility (What does it do?) | Key Design Pattern / Logic | + | :--- | :--- | :--- | + | e.g. `auth_service.py` | Handles JWT issuance and verification | Singleton, Decorator for routes | + | ... | ... | ... | + + ## 4. Core Methods Deep Dive + (Select the 3-5 most critical `.py` files. For each, list the top 2-3 methods that drive the logic.) + + ### 4.1 `[Filename, e.g., agent_service.py]` + * **`[Method Name]`**: [Explanation of what it does and why it matters. No code.] + * **`[Method Name]`**: [Explanation...] + + ### 4.2 `[Filename, e.g., vector_service.py]` + * **`[Method Name]`**: [Explanation...] + * ... + + ## 5. Main Workflow (Mermaid) + Select the **Single Most Important** business flow (The "Happy Path"). + Create a `sequenceDiagram`. + - Participants should be high-level (e.g., User, API, DB), not specific variable names. + + **⚠️ sequenceDiagram Syntax Rules**: + 1. **Wrap participant aliases in quotes**: `participant API as "API Server"` ✓ + 2. **Wrap message text in quotes**: `User->>API: "Send Request"` ✓ + 3. **NO special characters**: Avoid `/`, `&`, `<`, `>` in messages + + - **Correct Example**: + ```mermaid + sequenceDiagram + participant User as "User" + participant API as "API Server" + participant DB as "Database" + User->>API: "Send Request" + API->>DB: "Query Data" + DB-->>API: "Return Result" + API-->>User: "Send Response" + ``` + + ## 6. Quick Start Guide + - **Prerequisites**: (e.g. Docker, Python 3.9+, .env file) + - **Entry Point**: (How to run the main logic? e.g. `python main.py` or `uvicorn`) + + """ + + # === 增加 timeout 防止长文本生成时断连 === + report_messages = [ + {"role": "system", "content": "You are a pragmatic Tech Lead. Focus on architecture and data flow, not implementation details."}, + {"role": "user", "content": analysis_user_content} + ] + + stream_start_time = time.time() + stream = await client.chat.completions.create( + model=settings.default_model_name, + messages=report_messages, + stream=True, + timeout=settings.LLM_TIMEOUT # 使用统一配置 + ) + + # === TTFT & Token Tracking === + first_token_received = False + ttft_ms = None + generated_text = "" + completion_tokens_estimate = 0 + + # === 增加 try-except 捕获流式传输中断 === + try: + async for chunk in stream: + if chunk.choices[0].delta.content: + content = chunk.choices[0].delta.content + + # 记录 TTFT (首 Token 时间) + if not first_token_received: + ttft_ms = (time.time() - stream_start_time) * 1000 + tracing_service.record_ttft( + ttft_ms=ttft_ms, + model=settings.default_model_name, + metadata={"step": "report_generation"} + ) + first_token_received = True + + generated_text += content + completion_tokens_estimate += 1 # 粗略估计每个 chunk 约 1 token + yield json.dumps({"step": "report_chunk", "chunk": content}) + except (httpx.ReadError, httpx.ConnectError) as e: + yield json.dumps({"step": "error", "message": f"⚠️ Network Timeout during generation: {str(e)}"}) + return + + # 流结束后记录完整的 LLM 生成信息 + total_latency_ms = (time.time() - stream_start_time) * 1000 + tracing_service.record_llm_generation( + model=settings.default_model_name, + prompt_messages=report_messages, + generated_text=generated_text, + ttft_ms=ttft_ms, + total_latency_ms=total_latency_ms, + completion_tokens=completion_tokens_estimate, + is_streaming=True, + metadata={"step": "report_generation", "generated_chars": len(generated_text)} + ) + + # === 保存报告 (按语言存储,异步避免阻塞) === + await vector_db.save_report(generated_text, language) + + yield json.dumps({"step": "finish", "message": "✅ Analysis Complete!"}) + + except Exception as e: + # === 全局异常捕获 === + import traceback + traceback.print_exc() + + # 提取友好的错误信息 + error_msg = str(e) + if "401" in error_msg: + ui_msg = "❌ GitHub Token Invalid. Please check your settings." + elif "403" in error_msg: + ui_msg = "❌ GitHub API Rate Limit Exceeded. Try again later or add a Token." + elif "404" in error_msg: + ui_msg = "❌ Repository Not Found. Check the URL." + elif "Timeout" in error_msg or "ConnectError" in error_msg: + ui_msg = "❌ Network Timeout. LLM or GitHub is not responding." + else: + ui_msg = f"💥 System Error: {error_msg}" + + yield json.dumps({"step": "error", "message": ui_msg}) + return # 终止流 \ No newline at end of file diff --git a/app/services/auto_evaluation_service.py b/app/services/auto_evaluation_service.py new file mode 100644 index 0000000000000000000000000000000000000000..ebd205cee8dd4df99a846ae3b968d6b8d8c625aa --- /dev/null +++ b/app/services/auto_evaluation_service.py @@ -0,0 +1,481 @@ +# 文件路径: app/services/auto_evaluation_service.py +""" +自动评估服务 - Phase 1 +在后台异步进行评估,不阻塞用户请求 + +工作流程: + 1. 用户调用 /chat 或 /analyze + 2. 获得立即响应 + 3. 后台异步执行评估 + 4. 评估结果存储到 evaluation/sft_data/ +""" + +import asyncio +import json +import os +from datetime import datetime +from typing import Optional +from dataclasses import dataclass + +from evaluation.evaluation_framework import ( + EvaluationEngine, + EvaluationResult, + DataRoutingEngine, + DataQualityTier +) +from evaluation.utils import is_chatty_query, has_code_indicators +from app.services.tracing_service import tracing_service + + +@dataclass +class EvaluationConfig: + """ + 自动评估配置 + + 数据路由阈值说明(与 data_router.py 一致): + - score > 0.9 → Gold → positive_samples.jsonl + - score > 0.6 → Silver → positive_samples.jsonl + - score > 0.4 → Bronze → negative_samples.jsonl + - score <= 0.4 → Rejected → 不存储 + """ + enabled: bool = True # 是否启用自动评估 + use_ragas: bool = False # 是否使用 Ragas 进行 sanity check + custom_weight: float = 0.7 # custom_eval 的权重 + ragas_weight: float = 0.3 # ragas_eval 的权重 + diff_threshold: float = 0.2 # 差异阈值(超过则标记 needs_review) + min_quality_score: float = 0.4 # 最低质量分数(<=0.4 才拒绝) + async_evaluation: bool = True # 是否异步执行(推荐 True) + min_query_length: int = 10 # 最小 query 长度 + min_answer_length: int = 100 # 最小 answer 长度 + require_repo_url: bool = True # 是否要求有仓库 URL + require_code_in_context: bool = True # 是否要求上下文包含代码 + + +class AutoEvaluationService: + """自动评估服务""" + + def __init__( + self, + eval_engine: EvaluationEngine, + data_router: DataRoutingEngine, + config: EvaluationConfig = None + ): + self.eval_engine = eval_engine + self.data_router = data_router + self.config = config or EvaluationConfig() + self.needs_review_queue: list = [] # 需要人工审查的样本队列 + self._evaluated_keys: set = set() # 防重复评估(session_id:query_hash) + + # 被过滤数据的记录文件 + self.skipped_samples_file = "evaluation/sft_data/skipped_samples.jsonl" + os.makedirs(os.path.dirname(self.skipped_samples_file), exist_ok=True) + + def _record_skipped(self, reason: str, query: str, session_id: str, + repo_url: str = "", context_len: int = 0, answer_len: int = 0) -> None: + """记录被跳过的样本(供日后分析)""" + record = { + "timestamp": datetime.now().isoformat(), + "reason": reason, + "session_id": session_id, + "query": query[:200] if query else "", + "repo_url": repo_url, + "context_length": context_len, + "answer_length": answer_len + } + try: + with open(self.skipped_samples_file, 'a', encoding='utf-8') as f: + f.write(json.dumps(record, ensure_ascii=False) + '\n') + except Exception as e: + print(f" ⚠️ 记录跳过样本失败: {e}") + + def _validate_input( + self, + query: str, + retrieved_context: str, + generated_answer: str, + session_id: str, + repo_url: str + ) -> tuple[bool, Optional[str]]: + """ + 验证输入是否满足评估条件 + + Returns: + (is_valid, skip_reason) - 如果有效返回 (True, None),否则返回 (False, reason) + """ + context_len = len(retrieved_context) if retrieved_context else 0 + answer_len = len(generated_answer) if generated_answer else 0 + + # Query 验证 + if not query or not query.strip(): + self._record_skipped("query_empty", query or "", session_id, repo_url, context_len, answer_len) + return False, "query 为空" + + if len(query.strip()) < self.config.min_query_length: + self._record_skipped("query_too_short", query, session_id, repo_url, context_len, answer_len) + return False, f"query 太短 ({len(query)} < {self.config.min_query_length})" + + if is_chatty_query(query): + self._record_skipped("chatty_query", query, session_id, repo_url, context_len, answer_len) + return False, f"闲聊/无效 query: {query[:30]}" + + # Repo URL 验证 + if self.config.require_repo_url and not repo_url: + self._record_skipped("missing_repo_url", query, session_id, repo_url, context_len, answer_len) + return False, "缺少 repo_url" + + # Answer 验证 + if not generated_answer or len(generated_answer.strip()) < self.config.min_answer_length: + self._record_skipped("answer_too_short", query, session_id, repo_url, context_len, answer_len) + return False, f"回答太短 ({answer_len} < {self.config.min_answer_length})" + + # Context 验证 + if self.config.require_code_in_context and not has_code_indicators(retrieved_context): + self._record_skipped("no_code_in_context", query, session_id, repo_url, context_len, answer_len) + return False, "上下文中未检测到代码" + + return True, None + + def _check_duplicate(self, query: str, session_id: str) -> bool: + """检查是否重复评估,返回 True 表示是重复的""" + import hashlib + query_hash = hashlib.md5(query.encode()).hexdigest()[:8] + eval_key = f"{session_id}:{query_hash}" + + if eval_key in self._evaluated_keys: + return True + + self._evaluated_keys.add(eval_key) + + # 限制缓存大小,防止内存泄漏 + if len(self._evaluated_keys) > 1000: + self._evaluated_keys = set(list(self._evaluated_keys)[-500:]) + + return False + + async def auto_evaluate( + self, + query: str, + retrieved_context: str, + generated_answer: str, + session_id: str = "auto", + repo_url: str = "", + language: str = "en" + ) -> Optional[str]: + """ + 自动评估单个查询-回答对 + + Returns: + 质量等级 (gold/silver/bronze/rejected/needs_review) 或 None + """ + # 输入验证 + is_valid, skip_reason = self._validate_input( + query, retrieved_context, generated_answer, session_id, repo_url + ) + if not is_valid: + print(f" ⚠️ [AutoEval] 跳过: {skip_reason}") + return None + + # 防重复评估 + if self._check_duplicate(query, session_id): + print(f" ⏭️ [AutoEval] 跳过重复评估: {query[:30]}...") + return None + + start_time = datetime.now() + + try: + # Step 1: 自定义评估 + print(f"📊 [AutoEval] 开始评估: {query[:50]}...") + + custom_metrics = await self.eval_engine.evaluate_generation( + query=query, + retrieved_context=retrieved_context, + generated_answer=generated_answer + ) + custom_score = custom_metrics.overall_score() + + print(f" ✓ Custom Score: {custom_score:.3f}") + print(f" - Faithfulness: {custom_metrics.faithfulness:.3f}") + print(f" - Answer Relevance: {custom_metrics.answer_relevance:.3f}") + print(f" - Completeness: {custom_metrics.answer_completeness:.3f}") + + # Step 2: Ragas Sanity Check (如果启用) + ragas_score = None + ragas_details = None + + if self.config.use_ragas: + try: + ragas_score, ragas_details = await self._ragas_eval( + query=query, + context=retrieved_context, + answer=generated_answer + ) + print(f" ✓ Ragas Score: {ragas_score:.3f}") + if ragas_details: + print(f" - {ragas_details}") + except Exception as e: + print(f" ⚠️ Ragas 评估失败: {e}") + # Ragas 失败不应该中断主流程 + + # ============================================================ + # Step 3: 混合评估 + 异常检测 + # ============================================================ + final_score, quality_status = self._compute_final_score( + custom_score=custom_score, + ragas_score=ragas_score + ) + + print(f" ✓ Final Score: {final_score:.3f} | Status: {quality_status}") + + # ============================================================ + # Step 4: 构建评估结果并存储 + # ============================================================ + eval_result = EvaluationResult( + session_id=session_id, + query=query, + repo_url=repo_url, + timestamp=start_time, + language=language, + generation_metrics=custom_metrics, + notes=f"ragas_score={ragas_score:.3f}" if ragas_score else "" + ) + + # 设置综合得分 + eval_result.overall_score = final_score + + # 根据状态和得分确定质量等级 + print(f" [DEBUG] quality_status={quality_status}, final_score={final_score:.3f}, threshold={self.config.min_quality_score}") + + if quality_status == "needs_review": + eval_result.data_quality_tier = DataQualityTier.BRONZE + eval_result.notes += " | needs_review=true" + # 加入审查队列 + self.needs_review_queue.append({ + "eval_result": eval_result, + "custom_score": custom_score, + "ragas_score": ragas_score, + "diff": abs(custom_score - (ragas_score or custom_score)), + "timestamp": start_time.isoformat() + }) + print(f" ⚠️ 需要人工审查 (needs_review),暂存队列") + # 同时也路由到数据存储,便于后续分析 + self.data_router.route_sample(eval_result) + elif final_score > self.config.min_quality_score: + # score > 0.4: 路由到 positive (>0.6) 或 negative (0.4-0.6) + print(f" ✓ 路由到 data_router (score {final_score:.2f} > {self.config.min_quality_score})") + self.data_router.route_sample(eval_result) + else: + # score <= 0.4: 质量太差,直接拒绝 + eval_result.data_quality_tier = DataQualityTier.REJECTED + print(f" ❌ 评分过低 ({final_score:.2f} <= {self.config.min_quality_score}),拒绝存储") + + # 记录到 tracing + tracing_service.add_event("auto_evaluation_completed", { + "query": query[:100], + "custom_score": custom_score, + "ragas_score": ragas_score, + "final_score": final_score, + "status": quality_status, + "quality_tier": eval_result.data_quality_tier.value + }) + + print(f" ✅ 评估完成\n") + + return eval_result.data_quality_tier.value + + except Exception as e: + print(f" ❌ 自动评估异常: {e}") + import traceback + traceback.print_exc() + return None + + async def auto_evaluate_async( + self, + query: str, + retrieved_context: str, + generated_answer: str, + session_id: str = "auto", + repo_url: str = "", + language: str = "en" + ) -> None: + """ + 异步版本 - 不阻塞主流程 + + 在后台执行评估,不等待结果 + """ + if not self.config.async_evaluation: + # 同步模式(不推荐在生产环境) + await self.auto_evaluate( + query=query, + retrieved_context=retrieved_context, + generated_answer=generated_answer, + session_id=session_id, + repo_url=repo_url, + language=language + ) + else: + # 异步模式(推荐)- 在后台执行 + asyncio.create_task( + self._eval_task( + query=query, + retrieved_context=retrieved_context, + generated_answer=generated_answer, + session_id=session_id, + repo_url=repo_url, + language=language + ) + ) + + async def _eval_task( + self, + query: str, + retrieved_context: str, + generated_answer: str, + session_id: str, + repo_url: str, + language: str + ) -> None: + """后台评估任务包装""" + try: + await asyncio.sleep(0.1) # 让用户请求先返回 + await self.auto_evaluate( + query=query, + retrieved_context=retrieved_context, + generated_answer=generated_answer, + session_id=session_id, + repo_url=repo_url, + language=language + ) + except Exception as e: + print(f"❌ Background eval task failed: {e}") + + def _compute_final_score( + self, + custom_score: float, + ragas_score: Optional[float] + ) -> tuple[float, str]: + """ + 计算最终得分和状态 + + Returns: + (final_score, status) + status: "normal" / "needs_review" / "high_confidence" + """ + + if ragas_score is None: + # 没有 Ragas 分数,直接用 custom 分数 + return custom_score, "normal" + + # 计算差异 + diff = abs(custom_score - ragas_score) + + # 判断异常 + if diff > self.config.diff_threshold: + # 差异过大,标记为需要审查 + return custom_score, "needs_review" + + # 混合评分 + final_score = ( + self.config.custom_weight * custom_score + + self.config.ragas_weight * ragas_score + ) + + # 两者都高分 → 高置信度 + if custom_score > 0.75 and ragas_score > 0.75: + status = "high_confidence" + else: + status = "normal" + + return final_score, status + + async def _ragas_eval( + self, + query: str, + context: str, + answer: str + ) -> tuple[Optional[float], Optional[str]]: + """ + 使用 Ragas 进行 sanity check + + Returns: + (score, details) + """ + try: + from ragas.metrics import faithfulness, answer_relevancy + from ragas import evaluate + + # 构造 Ragas 数据集 + dataset_dict = { + "question": [query], + "contexts": [[context]], + "answer": [answer] + } + + # 执行评估 + result = evaluate( + dataset=dataset_dict, + metrics=[faithfulness, answer_relevancy] + ) + + # 提取分数 + faithfulness_score = result["faithfulness"][0] if "faithfulness" in result else 0.5 + relevancy_score = result["answer_relevancy"][0] if "answer_relevancy" in result else 0.5 + + # 平均得分 + ragas_score = (faithfulness_score + relevancy_score) / 2 + + details = f"Ragas: faithfulness={faithfulness_score:.3f}, relevancy={relevancy_score:.3f}" + + return ragas_score, details + + except ImportError: + print("⚠️ Ragas 未安装,跳过 sanity check") + return None, None + except Exception as e: + print(f"⚠️ Ragas 评估异常: {e}") + return None, None + + def get_review_queue(self) -> list: + """获取需要审查的样本列表""" + return self.needs_review_queue + + def clear_review_queue(self) -> None: + """清空审查队列""" + self.needs_review_queue.clear() + + def approve_sample(self, index: int) -> None: + """人工批准某个样本""" + if 0 <= index < len(self.needs_review_queue): + item = self.needs_review_queue[index] + # 直接存储到评估结果 + self.data_router.route_sample(item["eval_result"]) + print(f"✅ 样本 {index} 已批准") + + def reject_sample(self, index: int) -> None: + """人工拒绝某个样本""" + if 0 <= index < len(self.needs_review_queue): + print(f"❌ 样本 {index} 已拒绝") + self.needs_review_queue.pop(index) + + +# 全局实例 +auto_eval_service: Optional[AutoEvaluationService] = None + + +def init_auto_evaluation_service( + eval_engine: EvaluationEngine, + data_router: DataRoutingEngine, + config: EvaluationConfig = None +) -> AutoEvaluationService: + """初始化自动评估服务""" + global auto_eval_service + auto_eval_service = AutoEvaluationService( + eval_engine=eval_engine, + data_router=data_router, + config=config + ) + return auto_eval_service + + +def get_auto_evaluation_service() -> Optional[AutoEvaluationService]: + """获取自动评估服务实例""" + return auto_eval_service diff --git a/app/services/chat_service.py b/app/services/chat_service.py new file mode 100644 index 0000000000000000000000000000000000000000..99c4904a4a98a745527e9a37ecfadf708ff502f6 --- /dev/null +++ b/app/services/chat_service.py @@ -0,0 +1,601 @@ +# 文件路径: app/services/chat_service.py +import json +import asyncio +import re +import time +from dataclasses import dataclass, field +from typing import Dict, Optional, AsyncGenerator, List, Set +from app.core.config import settings +from app.utils.llm_client import client +from app.services.vector_service import store_manager +from app.services.github_service import get_file_content +from app.services.chunking_service import UniversalChunker, ChunkingConfig +from app.services.tracing_service import tracing_service +from app.utils.session import get_conversation_memory, ConversationMemory + + +# ============================================================ +# 配置类 - 解耦所有可调参数 +# ============================================================ + +@dataclass +class ChatConfig: + """Chat 服务配置 - 集中管理所有参数""" + # JIT 动态加载配置 + max_jit_rounds: int = 2 # 最大 JIT 轮数 + max_files_per_round: int = 3 # 每轮最多加载文件数 + + # LLM 配置 + temperature_thinking: float = 0.1 # 思考阶段温度 + temperature_final: float = 0.2 # 最终回答温度 + max_tokens: int = 4096 # 最大 token 数 + + # 检索配置 + retrieval_top_k: int = 6 # RAG 检索 top-k + context_max_chars: int = 2000 # 单文档最大字符数 + + # 对话上下文配置 + max_history_turns: int = 6 # 保留最近 N 轮对话 + summary_threshold: int = 10 # 超过 N 轮开始压缩 + + # 调试配置 + show_debug_info: bool = False # 是否显示调试信息 + + +# 全局配置实例 +chat_config = ChatConfig() + + +@dataclass +class ChatResult: + """聊天结果 - 用于后续自动评估""" + answer: str # 最终回答 + retrieved_context: str # 检索到的上下文 + generation_latency_ms: float # 生成耗时 + retrieval_latency_ms: float = 0 # 检索耗时 + + +# === 评估数据存储 (供 main.py 获取) === +# 存储每个 session 的评估数据,key 为 session_id +_eval_data_store: Dict[str, ChatResult] = {} + +def get_eval_data(session_id: str) -> Optional[ChatResult]: + """获取指定 session 的评估数据""" + return _eval_data_store.get(session_id) + +def clear_eval_data(session_id: str) -> None: + """清除指定 session 的评估数据""" + if session_id in _eval_data_store: + del _eval_data_store[session_id] + + +# [Fix 2] 使用 Config 对象初始化,而非直接传参 +# 之前的写法: chunker = UniversalChunker(min_chunk_size=100) +# 现在的写法: +chunker = UniversalChunker(config=ChunkingConfig(min_chunk_size=100)) + +# === 新增:简单的中文检测 === +def is_chinese_query(text: str) -> bool: + """检测字符串中是否包含中文字符""" + for char in text: + if '\u4e00' <= char <= '\u9fff': + return True + return False + +# === 优化 2:查询重写 (解决中英文检索不匹配问题) === +async def _rewrite_query(user_query: str): + """ + 使用 LLM 将用户的自然语言(可能是中文)转换为 3-5 个代码搜索关键词(英文)。 + """ + prompt = f""" + You are a Code Search Expert. + Task: Convert the user's query into 3-5 English keywords for code search (BM25/Vector). + + User Query: "{user_query}" + + Rules: + 1. Output ONLY a JSON list of strings. + 2. Translate concepts to technical terms (e.g., "鉴权" -> "auth", "login", "middleware"). + 3. Keep it short. + + Example Output: ["authentication", "login_handler", "jwt_verify"] + """ + try: + response = await client.chat.completions.create( + model=settings.default_model_name, + messages=[{"role": "user", "content": prompt}], + temperature=0.1, + max_tokens=100 + ) + content = response.choices[0].message.content + # 简单清洗 + content = re.sub(r"^```(json)?|```$", "", content.strip(), flags=re.MULTILINE).strip() + keywords = json.loads(content) + if isinstance(keywords, list): + return " ".join(keywords) # 返回空格分隔的字符串供 BM25 使用 + return user_query + except Exception as e: + print(f"⚠️ Query Rewrite Failed: {e}") + return user_query # 降级:直接用原句 + +async def process_chat_stream(user_query: str, session_id: str): + """ + 处理聊天流 - 支持多轮 JIT 动态加载文件 + 对话上下文记忆 + + 流程: + 1. 获取对话记忆,构建上下文 + 2. 初始检索 RAG 上下文 + 3. LLM 思考并回答,可能请求文件 + 4. 如果请求文件,加载后继续对话 (最多 max_jit_rounds 轮) + 5. 最终生成答案并保存到对话记忆 + """ + vector_db = store_manager.get_store(session_id) + cfg = chat_config # 使用全局配置 + + # === 获取对话记忆 === + memory = get_conversation_memory(session_id) + memory.add_user_message(user_query) # 立即记录用户消息 + + # 检查是否需要摘要压缩 + if memory.needs_summarization(): + yield "> 📝 *Compressing conversation history...*\n\n" + await _compress_conversation_history(memory) + + # === 评估数据收集变量 === + collected_context = "" + collected_response = "" + collected_retrieval_latency = 0.0 + collected_generation_latency = 0.0 + + # === JIT 状态跟踪 === + all_loaded_files: Set[str] = set() # 所有已加载的文件 + all_failed_files: Set[str] = set() # 所有失败的文件 + jit_round = 0 # 当前 JIT 轮数 + + # === 语言环境检测 === + use_chinese = is_chinese_query(user_query) + + # UI 提示语 + ui_msgs = _get_ui_messages(use_chinese) + + # === 步骤 0: 查询重写 === + search_query = await _rewrite_query(user_query) + yield f"{ui_msgs['thinking']}`{search_query}`...\n\n" + + # === 步骤 1: 初始 RAG 检索 === + retrieval_start = time.time() + relevant_docs = await vector_db.search_hybrid(search_query, top_k=cfg.retrieval_top_k) + retrieval_latency_ms = (time.time() - retrieval_start) * 1000 + collected_retrieval_latency = retrieval_latency_ms + tracing_service.add_event("retrieval_completed", { + "latency_ms": retrieval_latency_ms, + "documents_retrieved": len(relevant_docs) if relevant_docs else 0 + }) + + rag_context = _build_context(relevant_docs, cfg.context_max_chars) + collected_context = rag_context + + # === 步骤 2: 构建初始 Prompt === + global_context = vector_db.global_context or {} + file_tree = global_context.get("file_tree", "(File tree not available.)") + agent_summary = global_context.get("summary", "") + + # 获取对话历史上下文 + conversation_context = _build_conversation_context(memory) + + system_instruction = _build_system_prompt( + file_tree=file_tree, + agent_summary=agent_summary, + rag_context=rag_context, + use_chinese=use_chinese, + is_final_round=False, + conversation_context=conversation_context + ) + + augmented_user_query = f""" + {user_query} + + (System Note: Priority 1: Answer using context. Priority 2: Use ONLY if critical info is missing.) + """ + + if not client: + yield "❌ LLM Error: Client not initialized" + return + + # 初始化对话历史 + messages = [ + {"role": "system", "content": system_instruction}, + {"role": "user", "content": augmented_user_query} + ] + + try: + generation_start = time.time() + + # === 多轮 JIT 循环 === + while jit_round <= cfg.max_jit_rounds: + is_final_round = (jit_round == cfg.max_jit_rounds) + + # 如果是最终轮,更新系统提示禁用工具 + if is_final_round and jit_round > 0: + # 更新系统消息,告知这是最后一轮 + messages[0] = {"role": "system", "content": _build_system_prompt( + file_tree=file_tree, + agent_summary=agent_summary, + rag_context=collected_context, + use_chinese=use_chinese, + is_final_round=True, + failed_files=list(all_failed_files) + )} + + # LLM 流式生成 + stream = await client.chat.completions.create( + model=settings.default_model_name, + messages=messages, + stream=True, + temperature=cfg.temperature_final if is_final_round else cfg.temperature_thinking, + max_tokens=cfg.max_tokens + ) + + buffer = "" + round_response = "" + requested_files: Set[str] = set() + + async for chunk in stream: + content = chunk.choices[0].delta.content or "" + if not content: + continue + + buffer += content + round_response += content + collected_response += content + + # 检测 tool_code 标签 + if "" in buffer: + matches = re.findall(r"\s*(.*?)\s*", buffer, re.DOTALL) + for f in matches: + clean_f = f.strip().replace("'", "").replace('"', "").replace("`", "") + # 过滤已加载和已失败的文件 + if clean_f and clean_f not in all_loaded_files and clean_f not in all_failed_files: + requested_files.add(clean_f) + yield content + buffer = "" + else: + yield content + + # 处理缓冲区残留 + if "" in buffer: + matches = re.findall(r"\s*(.*?)\s*", buffer, re.DOTALL) + for f in matches: + clean_f = f.strip().replace("'", "").replace('"', "").replace("`", "") + if clean_f and clean_f not in all_loaded_files and clean_f not in all_failed_files: + requested_files.add(clean_f) + + # === 判断是否需要继续 JIT === + if not requested_files or is_final_round: + # 没有新文件请求,或已达最大轮数,结束循环 + break + + # === JIT 文件加载 === + jit_round += 1 + + # 限制每轮文件数 + files_to_load = list(requested_files)[:cfg.max_files_per_round] + file_list_str = ", ".join([f"`{f}`" for f in files_to_load]) + + yield f"\n\n> 🔍 **[JIT Round {jit_round}/{cfg.max_jit_rounds}]** {ui_msgs['action_short']}{file_list_str}...\n\n" + + if not vector_db.repo_url: + yield ui_msgs['error_url'] + break + + # 加载文件 + round_loaded_docs = [] + round_failed_files = [] + + for file_path in files_to_load: + if file_path in vector_db.indexed_files: + docs = vector_db.get_documents_by_file(file_path) + round_loaded_docs.extend(docs) + all_loaded_files.add(file_path) + yield f"> ✅ Loaded: `{file_path}`\n" + else: + success = await _download_and_index(vector_db, file_path) + if success: + docs = vector_db.get_documents_by_file(file_path) + round_loaded_docs.extend(docs) + all_loaded_files.add(file_path) + yield f"> ✅ Downloaded: `{file_path}`\n" + else: + round_failed_files.append(file_path) + all_failed_files.add(file_path) + yield f"> ⚠️ Failed: `{file_path}`\n" + + # 构建后续消息 + if round_loaded_docs: + new_context = _build_context(round_loaded_docs, cfg.context_max_chars) + collected_context += f"\n\n[JIT Round {jit_round} Context]\n{new_context}" + + # 构建状态消息 + status_msg = _build_jit_status_message( + loaded_count=len(round_loaded_docs), + failed_files=round_failed_files, + remaining_rounds=cfg.max_jit_rounds - jit_round, + use_chinese=use_chinese + ) + + context_section = f"\n\n[New Code Context]\n{_build_context(round_loaded_docs, cfg.context_max_chars)}" if round_loaded_docs else "" + + # 更新对话历史,继续对话 + messages.append({"role": "assistant", "content": round_response}) + messages.append({"role": "user", "content": f"{status_msg}{context_section}\n\nPlease continue your analysis."}) + + yield "\n\n" # 分隔符 + + # === 生成完成 === + generation_latency_ms = (time.time() - generation_start) * 1000 + collected_generation_latency = generation_latency_ms + + tracing_service.add_event("generation_completed", { + "latency_ms": generation_latency_ms, + "jit_rounds": jit_round, + "files_loaded": len(all_loaded_files), + "files_failed": len(all_failed_files) + }) + + # === 保存助手回复到对话记忆 === + memory.add_assistant_message(collected_response) + + # 存储评估数据 + _eval_data_store[session_id] = ChatResult( + answer=collected_response, + retrieved_context=collected_context, + generation_latency_ms=collected_generation_latency, + retrieval_latency_ms=collected_retrieval_latency + ) + print(f"📦 [EvalData] Session {session_id}: {len(collected_context)} chars context, {len(collected_response)} chars answer, {jit_round} JIT rounds, {memory.get_turn_count()} turns") + + except Exception as e: + import traceback + traceback.print_exc() + error_msg = str(e) + # 即使出错也保存部分回复 + if collected_response: + memory.add_assistant_message(collected_response + f"\n\n[Error: {error_msg}]") + tracing_service.add_event("generation_error", { + "error": error_msg, + "error_type": type(e).__name__, + "jit_round": jit_round + }) + yield f"\n\n❌ System Error: {error_msg}" + + +# ============================================================ +# 辅助函数 +# ============================================================ + +def _get_ui_messages(use_chinese: bool) -> Dict[str, str]: + """获取 UI 消息(根据语言)""" + if use_chinese: + return { + "thinking": "> 🧠 **思考中:** 正在检索相关代码: ", + "action_short": "正在读取文件: ", + "error_url": "> ⚠️ 错误: 仓库链接丢失。\n", + } + else: + return { + "thinking": "> 🧠 **Thinking:** Searching for code related to: ", + "action_short": "Retrieving files: ", + "error_url": "> ⚠️ Error: Repository URL lost.\n", + } + + +def _build_system_prompt( + file_tree: str, + agent_summary: str, + rag_context: str, + use_chinese: bool, + is_final_round: bool, + failed_files: List[str] = None, + conversation_context: str = "" +) -> str: + """构建系统提示词""" + lang_instruction = ( + "IMPORTANT: The user is asking in Chinese. You MUST reply in Simplified Chinese (简体中文)." + if use_chinese else "Reply in English." + ) + + if is_final_round: + tool_instruction = """ + [INSTRUCTIONS - FINAL ROUND] + This is your FINAL response. You MUST provide a complete answer NOW. + - DO NOT request any more files + - DO NOT use tags + - Synthesize all available context and give your best answer + - If some files were not accessible, explain what information is missing and provide the best possible answer with what you have + """ + if failed_files: + tool_instruction += f"\n Note: The following files could not be accessed: {', '.join(failed_files)}" + else: + tool_instruction = """ + [INSTRUCTIONS] + 1. **CHECK CONTEXT FIRST**: Look at the [Current Code Context]. Does it contain the answer? + 2. **IF YES**: Answer directly. DO NOT use tools. + 3. **IF NO**: Request missing files using tags: path/to/file + """ + + # 添加对话历史上下文 + conversation_section = "" + if conversation_context: + conversation_section = f""" + [Previous Conversation] + {conversation_context} + """ + + return f""" + You are a Senior GitHub Repository Analyst. + {lang_instruction} + + [Global Context - Repo Map] + {file_tree} + + [Agent Analysis Summary] + {agent_summary} + {conversation_section} + [Current Code Context (Retrieved)] + {rag_context} + {tool_instruction} + """ + + +def _build_conversation_context(memory: ConversationMemory) -> str: + """ + 构建对话历史上下文字符串 + + 只包含最近几轮对话的摘要,用于 system prompt + """ + messages = memory.get_context_messages() + + if len(messages) <= 2: + # 只有当前轮,不需要历史 + return "" + + # 排除最后一条(当前用户消息) + history_messages = messages[:-1] + + if not history_messages: + return "" + + context_parts = [] + for msg in history_messages[-6:]: # 最多 6 条(3 轮) + role = "User" if msg["role"] == "user" else "Assistant" + # 截断过长的内容 + content = msg["content"][:500] + if len(msg["content"]) > 500: + content += "..." + context_parts.append(f"{role}: {content}") + + return "\n".join(context_parts) + + +async def _compress_conversation_history(memory: ConversationMemory) -> None: + """ + 压缩对话历史 - 使用 LLM 生成摘要 + """ + messages_to_summarize = memory.get_messages_to_summarize() + + if not messages_to_summarize: + return + + # 构建摘要请求 + conversation_text = "\n".join([ + f"{'User' if m['role'] == 'user' else 'Assistant'}: {m['content'][:300]}" + for m in messages_to_summarize + ]) + + prompt = f"""Summarize the following conversation in 2-3 sentences, focusing on: +1. What questions were asked +2. Key information discovered +3. Important conclusions + +Conversation: +{conversation_text} + +Summary (be concise):""" + + try: + response = await client.chat.completions.create( + model=settings.default_model_name, + messages=[{"role": "user", "content": prompt}], + temperature=0.3, + max_tokens=200 + ) + summary = response.choices[0].message.content.strip() + + # 保存摘要 + end_idx = len(memory._messages) - chat_config.max_history_turns * 2 + memory.set_summary(summary, end_idx) + + print(f"📝 Conversation compressed: {len(messages_to_summarize)} messages -> summary") + except Exception as e: + print(f"⚠️ Failed to compress conversation: {e}") + + +def _build_jit_status_message( + loaded_count: int, + failed_files: List[str], + remaining_rounds: int, + use_chinese: bool +) -> str: + """构建 JIT 状态消息""" + if use_chinese: + if loaded_count > 0 and not failed_files: + return f"系统通知: 成功加载 {loaded_count} 个文件。" + elif loaded_count > 0 and failed_files: + failed_list = ", ".join(failed_files) + return f"系统通知: 加载了 {loaded_count} 个文件,但以下文件无法访问: {failed_list}。" + else: + failed_list = ", ".join(failed_files) + if remaining_rounds > 0: + return f"系统通知: 文件 ({failed_list}) 无法访问。你还有 {remaining_rounds} 次机会请求其他文件,或者基于现有上下文回答。" + else: + return f"系统通知: 文件 ({failed_list}) 无法访问。请基于现有上下文给出最佳回答。" + else: + if loaded_count > 0 and not failed_files: + return f"System Notification: Successfully loaded {loaded_count} files." + elif loaded_count > 0 and failed_files: + failed_list = ", ".join(failed_files) + return f"System Notification: Loaded {loaded_count} files, but the following could not be accessed: {failed_list}." + else: + failed_list = ", ".join(failed_files) + if remaining_rounds > 0: + return f"System Notification: Files ({failed_list}) could not be accessed. You have {remaining_rounds} more attempts to request other files, or answer based on available context." + else: + return f"System Notification: Files ({failed_list}) could not be accessed. Please provide the best possible answer based on existing context." + +async def _download_and_index(vector_db, file_path): + """下载并索引文件""" + try: + content = await get_file_content(vector_db.repo_url, file_path) + if not content: return False + + chunks = await asyncio.to_thread(chunker.chunk_file, content, file_path) + if not chunks: + chunks = [{ + "content": content, + "metadata": {"file": file_path, "type": "text", "name": "root", "class": ""} + }] + + documents = [c["content"] for c in chunks] + metadatas = [] + for c in chunks: + meta = c["metadata"] + metadatas.append({ + "file": meta["file"], + "type": meta["type"], + "name": meta.get("name", ""), + "class": meta.get("class") or "" + }) + await vector_db.add_documents(documents, metadatas) + return True + except Exception as e: + print(f"Download Error: {e}") + return False + + +def _build_context(docs: List[Dict], max_chars: int = 2000) -> str: + """构建上下文字符串""" + if not docs: + return "(No relevant code snippets found yet)" + + context = "" + for doc in docs: + file_info = doc.get('file', 'unknown') + metadata = doc.get('metadata', {}) + + if 'class' in metadata and metadata['class']: + file_info += f" (Class: {metadata['class']})" + + content = doc.get('content', '')[:max_chars] + context += f"\n--- File: {file_info} ---\n{content}\n" + + return context \ No newline at end of file diff --git a/app/services/chunking_service.py b/app/services/chunking_service.py new file mode 100644 index 0000000000000000000000000000000000000000..a0d7ca53a6931808f38441437cb145b7642bb98c --- /dev/null +++ b/app/services/chunking_service.py @@ -0,0 +1,372 @@ +import ast +import re +import os +from dataclasses import dataclass + +# --- 配置类 --- +@dataclass +class ChunkingConfig: + """ + 统一管理切分服务的配置参数 + """ + min_chunk_size: int = 50 # 最小分块阈值 (chars) + max_chunk_size: int = 2000 # 最大分块阈值 (chars) + fallback_line_size: int = 100 # 兜底策略的行数 (lines) + max_context_chars: int = 500 # 允许注入到每个Chunk的上下文最大长度 + # 超过此长度则不再注入,避免冗余内容撑爆 Token + +class UniversalChunker: + def __init__(self, config: ChunkingConfig = None): + # 如果未传入配置,使用默认配置 + self.config = config if config else ChunkingConfig() + + def chunk_file(self, content: str, file_path: str): + if not content: + return [] + + ext = os.path.splitext(file_path)[1].lower() + + if ext == '.py': + return self._chunk_python(content, file_path) + + # 2. C-Style 语言优化 + elif ext in ['.java', '.js', '.ts', '.jsx', '.tsx', '.go', '.cpp', '.c', '.h', '.cs', '.php', '.rs']: + return self._chunk_c_style(content, file_path) + + else: + return self._fallback_chunking(content, file_path) + + def _chunk_python(self, content, file_path): + """ + 分级注入策略 + """ + chunks = [] + try: + tree = ast.parse(content) + except SyntaxError: + return self._fallback_chunking(content, file_path) + + import_nodes = [] + other_nodes = [] + function_class_chunks = [] + + # A. 遍历与分类 + for node in tree.body: + if isinstance(node, ast.ClassDef): + class_code = ast.get_source_segment(content, node) + if not class_code: continue + if len(class_code) <= self.config.max_chunk_size: + function_class_chunks.append(self._create_chunk( + class_code, file_path, "class", node.name, node.lineno, node.name + )) + else: + # function_class_chunks 包含了从大类中拆分出的方法 + function_class_chunks.extend( + self._chunk_large_python_class(node, content, file_path) + ) + + elif isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): + func_code = ast.get_source_segment(content, node) + if func_code and len(func_code) >= self.config.min_chunk_size: + function_class_chunks.append(self._create_chunk( + func_code, file_path, "function", node.name, node.lineno + )) + + else: + segment = ast.get_source_segment(content, node) + if segment and len(segment.strip()) > 0: + if isinstance(node, (ast.Import, ast.ImportFrom)): + import_nodes.append(segment) + else: + other_nodes.append(segment) + + # B. 决策准备 + has_core_code = len(function_class_chunks) > 0 + others_text = "\n".join(other_nodes).strip() + should_inject_others = len(others_text) <= self.config.max_context_chars + + # C. 构建 Context Header + context_parts = [] + # 1. Import 永远注入 + if import_nodes: + context_parts.append("\n".join(import_nodes)) + # 2. Globals 按需注入 + if others_text and should_inject_others: + context_parts.append(others_text) + + full_header = "\n".join(context_parts).strip() + if full_header: + full_header = f"# --- Context ---\n{full_header}\n# ---------------\n" + + # D. 注入 Header 到核心 Chunk (函数/类) + # 此时 function_class_chunks 已经包含了大类拆分出来的方法 + # 这里的循环会给它们都加上 Import/Global Context + for chunk in function_class_chunks: + chunk["content"] = full_header + chunk["content"] + + # E. 处理溢出 (仅当有核心代码时,才独立存储溢出的 Globals) + if has_core_code and others_text and not should_inject_others: + chunks.append(self._create_chunk( + others_text, file_path, "global_context", "globals", 1 + )) + + # F. 纯脚本兜底 + if not has_core_code: + # 这是一个纯脚本文件 (只有 Import 和 顶层逻辑) + full_script = (("\n".join(import_nodes) + "\n") if import_nodes else "") + others_text + if full_script.strip(): + # 如果脚本太长,不要硬切成一个大块,而是走 Fallback 按行切分 + if len(full_script) > self.config.max_chunk_size * 1.5: # 1.5倍宽容度 + return self._fallback_chunking(content, file_path) + else: + chunks.append(self._create_chunk( + full_script, file_path, "script", "main", 1 + )) + + chunks.extend(function_class_chunks) + + if not chunks and len(content.strip()) > 0: + return self._fallback_chunking(content, file_path) + + return chunks + + def _chunk_large_python_class(self, class_node, content, file_path): + chunks = [] + class_name = class_node.name + docstring = ast.get_docstring(class_node) or "" + + # === 尝试收集类级别的变量定义 === + class_vars = [] + for node in class_node.body: + # 如果是赋值语句,且在方法定义之前 (通常 AST 是有序的) + if isinstance(node, (ast.Assign, ast.AnnAssign)): + seg = ast.get_source_segment(content, node) + if seg: class_vars.append(seg) + # 一旦遇到函数,就停止收集变量,避免把乱七八糟的逻辑也收进去 + elif isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): + break + + vars_text = "\n ".join(class_vars) + if vars_text: + vars_text = "\n " + vars_text # 缩进对齐 + + # 将变量拼接到 Header 中 + context_header = f"class {class_name}:{vars_text}\n \"\"\"{docstring}\"\"\"\n # ... (Parent Context)\n" + + for node in class_node.body: + if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): + method_code = ast.get_source_segment(content, node) + if not method_code: continue + + full_chunk_content = context_header + "\n" + method_code + chunks.append(self._create_chunk( + full_chunk_content, file_path, "method", node.name, node.lineno, class_name + )) + return chunks + + def _chunk_c_style(self, content, file_path): + """ + 解决宏干扰、全局变量丢失、跨行函数头问题 + """ + chunks = [] + if not content: return [] + + # === 1. 定义正则 Token === + # 使用 Named Groups 避免 startswith 的模糊匹配 + # 顺序至关重要:长匹配优先 + token_pattern = re.compile( + r'(?P/\*.*?\*/)|' # 块注释 + r'(?P//[^\n]*)|' # 行注释 + r'(?P"(?:\\.|[^"\\])*")|' # 双引号字符串 + r'(?P\'(?:\\.|[^\'\\])*\')|' # 单引号字符 + r'(?P