# STORAGE_DIR 环境变量使用分析 ## 概述 `STORAGE_DIR="/app/server/storage"` 是 AnythingLLM 系统中核心的环境变量,用于定义所有持久化数据的存储根目录。该变量在 Docker 容器化部署中尤为重要,确保数据持久化和容器重启后的数据保留。 ## 主要使用场景 ### 1. 数据库文件存储 **位置**: `server/prisma/schema.prisma` ```prisma datasource db { provider = "sqlite" url = "file:../storage/anythingllm.db" } ``` ### 2. 向量数据库存储 **LanceDB 向量数据库**: ```javascript // server/utils/vectorDbProviders/lance/index.js uri: `${ !!process.env.STORAGE_DIR ? `${process.env.STORAGE_DIR}/` : "./storage/" }lancedb`, ``` ### 3. 文档存储路径 **主文档存储**: ```javascript // server/utils/files/index.js const documentsPath = process.env.STORAGE_DIR ? path.resolve(process.env.STORAGE_DIR, "documents") : path.resolve(__dirname, `../../storage/documents`); const directUploadsPath = process.env.STORAGE_DIR ? path.resolve(process.env.STORAGE_DIR, "direct-uploads") : path.resolve(__dirname, `../../storage/direct-uploads`); const vectorCachePath = process.env.STORAGE_DIR ? path.resolve(process.env.STORAGE_DIR, "vector-cache") : path.resolve(__dirname, `../../storage/vector-cache`); ``` ### 4. Agent 相关存储 **Agent Flows 存储**: ```javascript // server/utils/agentFlows/index.js static flowsDir = process.env.STORAGE_DIR ? path.join(process.env.STORAGE_DIR, "plugins", "agent-flows") : path.join(process.cwd(), "storage", "plugins", "agent-flows"); ``` **Agent Skills 存储**: ```javascript // server/utils/agents/imported.js const importedSkillsDir = process.env.STORAGE_DIR ? path.resolve(process.env.STORAGE_DIR, "plugins", "agent-skills") : path.resolve(__dirname, "../../storage/plugins/agent-skills"); ``` ### 5. MCP 服务器配置 **MCP 配置文件路径**: ```javascript // server/utils/MCP/hypervisor/index.js this.mcpServerJSONPath = process.env.NODE_ENV === "development" ? path.resolve( __dirname, `../../../storage/plugins/anythingllm_mcp_servers.json` ) : path.resolve( process.env.STORAGE_DIR ?? path.resolve(__dirname, `../../../storage`), `plugins/anythingllm_mcp_servers.json` ); ``` ### 6. 模型和缓存存储 **Embedding 模型缓存**: ```javascript // server/utils/EmbeddingEngines/native/index.js this.cacheDir = path.resolve( process.env.STORAGE_DIR ? path.resolve(process.env.STORAGE_DIR, `models`) : path.resolve(__dirname, `../../../storage/models`) ); ``` **AI Provider 缓存**: ```javascript // server/utils/AiProviders/openRouter/index.js const cacheFolder = path.resolve( process.env.STORAGE_DIR ? path.resolve(process.env.STORAGE_DIR, "models", "openrouter") : path.resolve(__dirname, `../../../storage/models/openrouter`) ); ``` ### 7. 资源文件存储 **Logo 和 PFP 存储**: ```javascript // server/utils/files/logo.js const assetsDirectory = process.env.STORAGE_DIR ? path.join(process.env.STORAGE_DIR, "assets") : path.join(__dirname, "../../storage/assets"); // server/utils/files/pfp.js const basePath = process.env.STORAGE_DIR ? path.join(process.env.STORAGE_DIR, "assets/pfp") : path.join(__dirname, "../../storage/assets/pfp"); ``` ### 8. Collector 相关存储 **文档收集器使用**: ```javascript // collector/utils/files/index.js const documentsPath = process.env.STORAGE_DIR ? path.resolve(process.env.STORAGE_DIR, `documents`) : path.resolve(__dirname, `../../../server/storage/documents`); const directUploadsPath = process.env.STORAGE_DIR ? path.resolve(process.env.STORAGE_DIR, `direct-uploads`) : path.resolve(__dirname, `../../../server/storage/direct-uploads`); ``` ### 9. 临时文件存储 **临时文件目录**: ```javascript // server/utils/EmbeddingEngines/native/index.js const tmpPath = process.env.STORAGE_DIR ? path.resolve(process.env.STORAGE_DIR, "tmp") : path.resolve(__dirname, `../../../storage/tmp`); ``` ### 10. 系统配置中的使用 ```javascript // server/models/systemSettings.js StorageDir: process.env.STORAGE_DIR, ``` ## Docker 部署中的重要性 在 Docker 容器化部署中,`STORAGE_DIR` 的正确配置至关重要: 1. **数据持久化**: 通过 Docker 卷挂载确保容器重启后数据不丢失 2. **路径映射**: 容器内路径 `/app/server/storage` 映射到宿主机路径 3. **权限管理**: 确保容器内进程有足够的权限访问存储目录 ### Docker Compose 配置示例 ```yaml services: anything-llm: volumes: - "../server/storage:/app/server/storage" # 数据持久化 - "../collector/hotdir/:/app/collector/hotdir" # 热目录 ``` ## 目录结构 当 `STORAGE_DIR="/app/server/storage"` 时,系统会创建以下目录结构: ``` /app/server/storage/ ├── anythingllm.db # SQLite 数据库 ├── documents/ # 存储的文档 ├── direct-uploads/ # 直接上传的文件 ├── vector-cache/ # 向量缓存 ├── assets/ │ ├── pfp/ # 用户头像 │ └── logos/ # 自定义 logos ├── models/ # 下载的模型文件 │ ├── context-windows/ │ ├── openrouter/ │ ├── gemini/ │ └── ... ├── plugins/ │ ├── agent-flows/ # Agent 工作流定义 │ ├── agent-skills/ # 导入的 Agent 技能 │ └── anythingllm_mcp_servers.json # MCP 服务器配置 ├── tmp/ # 临时文件 └── logs/ # 日志文件 ``` ## 开发环境 vs 生产环境 系统会根据 `NODE_ENV` 和 `STORAGE_DIR` 的存在与否来决定使用哪个路径: 1. **生产环境** (Docker): - 使用 `STORAGE_DIR` 指定的路径 - 通常为 `/app/server/storage` 2. **开发环境**: - 如果 `STORAGE_DIR` 未设置,使用相对路径 `./storage/` 或 `../../storage/` - 确保开发时数据存储在项目目录中 ## 最佳实践 1. **始终设置 STORAGE_DIR**: 在生产环境中明确设置此变量 2. **使用绝对路径**: 避免使用相对路径,确保路径可解析 3. **权限管理**: 确保运行进程对存储目录有读写权限 4. **备份策略**: 定期备份 `STORAGE_DIR` 目录以防止数据丢失 5. **监控空间**: 监控存储目录的磁盘空间使用情况 ## 故障排除 如果遇到存储相关问题,检查: 1. 环境变量是否正确设置 2. 目录是否存在且有正确的权限 3. 磁盘空间是否充足 4. Docker 卷是否正确挂载(仅容器化部署) ## 13. Collector Hotdir 详细分析 ### 概述 `collector/hotdir` 是 AnythingLLM 系统中的热目录(Hot Directory)功能,用于自动监控和处理文档文件。该功能允许用户将文件放置在指定目录中,系统会自动检测、处理并向量化这些文件,无需手动上传。 ### 目录结构 ``` collector/ ├── hotdir/ # 热目录根目录 │ ├── processing/ # 正在处理的文件 │ ├── completed/ # 处理完成的文件 │ └── failed/ # 处理失败的文件 ├── middleware/ # 中间件 │ ├── setDataSigner.js # 数据签名中间件 │ └── verifyIntegrity.js # 完整性验证中间件 └── utils/ # 工具函数 └── files/ └── index.js # 文件处理工具 ``` ### 核心功能 #### 1. 自动文件监控 Hotdir 通过文件系统监控机制,自动检测新添加的文件: ```javascript // 文件路径处理 const documentsPath = process.env.STORAGE_DIR ? path.resolve(process.env.STORAGE_DIR, `documents`) : path.resolve(__dirname, `../../../server/storage/documents`); const directUploadsPath = process.env.STORAGE_DIR ? path.resolve(process.env.STORAGE_DIR, `direct-uploads`) : path.resolve(__dirname, `../../../server/storage/direct-uploads`); ``` #### 2. 文件处理流程 ```mermaid flowchart TD A[文件放入 hotdir] --> B[文件监控检测] B --> C[移动到 processing 目录] C --> D[文件完整性验证] D --> E[格式解析和文本提取] E --> F[分块处理] F --> G[向量化] G --> H[存储到向量数据库] H --> I[移动到 completed 目录] I --> J[更新工作空间文档列表] D --> K[验证失败?] K --> L[移动到 failed 目录] L --> M[记录错误日志] ``` #### 3. 中间件系统 **数据签名中间件** (`setDataSigner.js`): - 为处理的文件生成数字签名 - 确保文件的完整性和来源可信 - 防止文件被篡改 **完整性验证中间件** (`verifyIntegrity.js`): - 验证文件的数字签名 - 检查文件是否损坏 - 确保文件处理的安全性 ### Docker 部署配置 在 Docker Compose 中,hotdir 需要单独挂载: ```yaml services: anything-llm: volumes: - "../server/storage:/app/server/storage" # 主存储目录 - "../collector/hotdir/:/app/collector/hotdir" # 热目录 ``` ### 与 STORAGE_DIR 的关系 Hotdir 与 STORAGE_DIR 紧密协作: 1. **文件来源**: Hotdir 作为文件输入源 2. **存储目标**: 处理后的文件存储在 STORAGE_DIR/documents 3. **临时处理**: 使用 STORAGE_DIR/direct-uploads 作为中转 4. **状态跟踪**: 在数据库中记录文件处理状态 ### 使用场景 #### 1. 批量文档导入 ```bash # 将多个文档文件放入 hotdir cp /path/to/documents/*.pdf ../collector/hotdir/ cp /path/to/documents/*.docx ../collector/hotdir/ # 系统自动处理这些文件 ``` #### 2. 定期文档更新 ```bash # 设置定时任务,定期同步文档到 hotdir */5 * * * * rsync -av /shared/documents/ /app/collector/hotdir/ ``` #### 3. 外部系统集成 ```javascript // 其他系统可以通过 API 触发 hotdir 扫描 POST /api/collector/scan-hotdir { "workspace": "workspace-slug", "options": { "recursive": true, "fileTypes": ["pdf", "docx", "txt"] } } ``` ### 配置选项 系统支持通过环境变量配置 hotdir 行为: ```bash # Hotdir 监控间隔(毫秒) HOTDIR_WATCH_INTERVAL=5000 # 支持的文件类型 HOTDIR_ALLOWED_EXTENSIONS=pdf,docx,txt,md,html # 并发处理数量 HOTDIR_CONCURRENT_LIMIT=3 # 自动删除已处理文件 HOTDIR_AUTO_CLEANUP=true ``` ### 监控和日志 #### 1. 处理状态监控 ```javascript // 查看处理状态 GET /api/collector/status { "processing": [ { "filename": "document.pdf", "status": "processing", "progress": 45, "startedAt": "2024-01-01T10:00:00Z" } ], "completed": 150, "failed": 2 } ``` #### 2. 错误处理 ```javascript // 处理失败的文件会记录详细错误信息 { "filename": "corrupted.pdf", "error": "File integrity check failed", "timestamp": "2024-01-01T10:05:00Z", "stack": "..." } ``` ### 最佳实践 1. **文件组织** - 按类型或项目创建子目录 - 使用有意义的文件名 - 避免特殊字符 2. **性能优化** - 控制单次处理的文件数量 - 避免超大文件(>100MB) - 定期清理 completed 目录 3. **安全考虑** - 设置适当的文件权限 - 定期检查 failed 目录 - 监控异常文件活动 4. **备份策略** - 定期备份 hotdir 中的原始文件 - 保留处理日志 - 实施灾难恢复计划 ### 故障排除 #### 常见问题 1. **文件不被处理** - 检查文件扩展名是否支持 - 确认文件权限 - 查看监控服务状态 2. **处理失败** - 检查 failed 目录中的错误日志 - 验证文件完整性 - 确认磁盘空间充足 3. **性能问题** - 调整并发处理限制 - 优化文件监控间隔 - 清理历史文件 #### 调试命令 ```bash # 查看 hotdir 状态 ls -la collector/hotdir/{processing,completed,failed} # 检查文件权限 stat collector/hotdir/filename.pdf # 监控文件系统事件 inotifywatch -v collector/hotdir ``` ### 扩展功能 #### 1. 自定义处理器 ```javascript // 添加自定义文件处理器 const customProcessor = { name: 'custom-csv', extensions: ['csv'], handler: async (filePath) => { // 自定义 CSV 处理逻辑 return processedData; } }; ``` #### 2. 钩子系统 ```javascript // 处理前钩子 beforeProcess: async (file) => { // 预处理逻辑 } // 处理后钩子 afterProcess: async (file, result) => { // 后处理逻辑 } ``` 通过 collector/hotdir 功能,AnythingLLM 提供了一个强大而灵活的文档自动化处理系统,大大简化了批量文档导入和持续更新的工作流程。 ## 14. 工作空间创建 API 流程分析 ### 概述 工作空间创建是 AnythingLLM 系统中的核心功能,通过 `/api/workspace/new` 端点实现。该 API 允许用户创建新的工作空间,配置 LLM 和嵌入模型参数,并自动设置相应的权限和向量数据库。 ### 前端实现流程 #### 1. 创建工作空间组件 (`frontend/src/components/WorkspaceNew/index.jsx`) ```javascript // 1. 状态管理 const [loading, setLoading] = useState(false); const [error, setError] = useState(null); // 2. 表单数据处理 const handleSubmit = async (e) => { e.preventDefault(); setLoading(true); const form = e.target; const formData = new FormData(form); // 3. 构建请求数据 const data = { name: formData.get("name"), slug: slugify(formData.get("name")), ...Object.fromEntries(formData.entries()), similarityThreshold: parseFloat(formData.get("similarityThreshold")), }; try { // 4. 发送创建请求 const { workspace } = await System.workspaceNew(data); // 5. 成功后跳转 window.location.href = `/workspace/${workspace.slug}`; } catch (err) { setError(err.message); } finally { setLoading(false); } }; ``` #### 2. API 请求封装 (`frontend/src/utils/request.js`) ```javascript // System.workspaceNew 实现 async function workspaceNew(data) { return await fetch(`${API_BASE}/api/workspace/new`, { method: "POST", headers: { "Content-Type": "application/json", Authorization: `Bearer ${getAuthToken()}`, }, body: JSON.stringify(data), }).then(handleResponse); } ``` ### 后端 API 实现 #### 1. 路由定义 (`server/endpoints/workspaces.js`) ```javascript // POST /api/workspace/new router.post( "/new", [validatedRequest(WorkspaceNewSchema)], async (request, response) => { try { const body = request.body; const { user } = response.locals; // 1. 验证用户权限 if (!canCreateWorkspace(user)) { response.status(403).json({ success: false, error: "You are not authorized to create workspaces", }); return; } // 2. 调用工作空间创建服务 const { workspace, message } = await Workspace.new(body, user.id); // 3. 返回成功响应 response.status(201).json({ success: true, message, workspace, }); } catch (e) { console.error(e.message, e); response.status(500).json({ success: false, error: e.message, }); } } ); ``` #### 2. 数据验证模式 (`server/utils/middleware/schemaValidation.js`) ```javascript // WorkspaceNewSchema 定义 const WorkspaceNewSchema = { name: { type: "string", required: true, minLength: 2, maxLength: 50, pattern: "^[a-zA-Z0-9_-\\s]+$", }, similarityThreshold: { type: "number", required: false, min: 0.0, max: 1.0, default: 0.25, }, openAiTemp: { type: "number", required: false, min: 0.0, max: 2.0, default: 0.7, }, // ... 其他验证规则 }; ``` ### 数据库操作流程 #### 1. 工作空间模型 (`server/models/workspace.js`) ```javascript // Workspace.new 静态方法 static async new(data = {}, ownerId = null) { try { // 1. 数据准备和验证 const { name, slug, ...rest } = data; // 2. 生成唯一标识 const workspaceData = { name, slug: await this.generateUniqueSlug(slug), vectorTag: `ws_${randomUUID().slice(0, 8)}`, createdAt: new Date(), lastUpdatedAt: new Date(), ...rest, }; // 3. 数据库事务处理 const workspace = await prisma.$transaction(async (prisma) => { // 3.1 创建工作空间记录 const newWorkspace = await prisma.workspaces.create({ data: workspaceData, }); // 3.2 如果指定了所有者,创建权限记录 if (ownerId) { await prisma.workspace_users.create({ data: { user_id: ownerId, workspace_id: newWorkspace.id, createdAt: new Date(), }, }); } return newWorkspace; }); // 4. 初始化向量数据库 await this.initVectorDatabase(workspace); // 5. 记录事件日志 await EventLogs.logEvent( "workspace_created", { workspaceName: workspace.name, workspaceId: workspace.id, ownerId, }, ownerId ); return { workspace, message: "Workspace created successfully" }; } catch (error) { console.error("Error creating workspace:", error); throw new Error("Failed to create workspace"); } } ``` #### 2. 用户权限模型 (`server/models/workspaceUsers.js`) ```javascript // 添加用户到工作空间 static async create(workspaceId, userId) { try { const workspaceUser = await prisma.workspace_users.create({ data: { user_id: userId, workspace_id: workspaceId, createdAt: new Date(), }, }); return workspaceUser; } catch (error) { console.error("Error creating workspace user:", error); throw new Error("Failed to add user to workspace"); } } ``` ### 向量数据库初始化 #### 1. 向量数据库管理 (`server/utils/vectors/index.js`) ```javascript // 初始化工作空间的向量数据库 async function initVectorDatabase(workspace) { const vectorDb = getVectorDbClass(); try { // 创建工作空间特定的向量索引 const result = await vectorDb.createNamespace(workspace.vectorTag); if (!result.success) { throw new Error(`Failed to create vector namespace: ${result.error}`); } console.log(`Vector database initialized for workspace: ${workspace.name}`); return result; } catch (error) { console.error("Error initializing vector database:", error); throw error; } } ``` ### 完整的生命周期序列图 ```mermaid sequenceDiagram participant U as 用户 participant F as 前端 (React) participant R as 路由器 participant V as 验证中间件 participant W as Workspace 模型 participant P as Prisma ORM participant VDB as 向量数据库 participant E as 事件日志 participant DB as 数据库 U->>F: 填写工作空间表单 F->>F: 验证表单数据 F->>R: POST /api/workspace/new R->>V: 执行数据验证 V->>V: 检查字段格式和必填项 V->>R: 验证通过 R->>R: 检查用户权限 R->>W: Workspace.new(data, userId) W->>W: 生成唯一 slug W->>P: 开始事务 P->>DB: 创建工作空间记录 DB-->>P: 返回工作空间数据 alt 指定了所有者 P->>DB: 创建 workspace_users 记录 DB-->>P: 返回权限记录 end P->>P: 提交事务 P-->>W: 返回创建的工作空间 W->>VDB: initVectorDatabase(workspace) VDB->>VDB: 创建向量命名空间 VDB-->>W: 初始化完成 W->>E: 记录 workspace_created 事件 E->>DB: 保存事件日志 DB-->>E: 保存成功 W-->>R: 返回工作空间和成功消息 R-->>F: 201 Created 响应 F->>F: 更新 UI 状态 F->>U: 重定向到新工作空间 ``` ### 错误处理机制 #### 1. 数据验证错误 ```javascript // 验证中间件错误处理 if (validationError) { return response.status(400).json({ success: false, error: "Invalid request data", details: validationError.details, }); } ``` #### 2. 数据库事务错误 ```javascript // 事务回滚处理 await prisma.$transaction(async (prisma) => { // 如果任何操作失败,整个事务会自动回滚 const workspace = await prisma.workspaces.create({ data: workspaceData }); await prisma.workspace_users.create({ data: userWorkspaceData }); return workspace; }).catch(error => { console.error("Transaction failed:", error); throw new Error("Failed to create workspace due to database error"); }); ``` #### 3. 向量数据库错误 ```javascript // 向量数据库初始化失败处理 if (!vectorResult.success) { // 工作空间已创建,但向量数据库失败 // 需要清理已创建的工作空间或标记为需要修复 await markWorkspaceForRepair(workspace.id); throw new Error(`Workspace created but vector database initialization failed: ${vectorResult.error}`); } ``` ### 性能优化措施 #### 1. 并发控制 ```javascript // 使用信号量限制并发创建 const workspaceCreationSemaphore = new Semaphore(5); // 最多同时5个创建操作 router.post("/new", [ validatedRequest(WorkspaceNewSchema), async (request, response) => { const release = await workspaceCreationSemaphore.acquire(); try { // 创建工作空间逻辑 } finally { release(); } } ]); ``` #### 2. 缓存策略 ```javascript // 缓存工作空间列表 const workspaceCache = new TTLCache({ ttl: 300, // 5分钟缓存 max: 100, }); // 创建后清除缓存 await workspaceCache.del(`user:${userId}:workspaces`); ``` ### 监控和日志 #### 1. 性能指标 ```javascript // 记录创建时间 const startTime = Date.now(); const { workspace } = await Workspace.new(data, userId); const creationTime = Date.now() - startTime; // 发送到监控系统 metrics.timing('workspace.creation.time', creationTime); ``` #### 2. 审计日志 ```javascript // 记录详细的审计信息 await AuditLog.create({ action: 'workspace_created', userId: user.id, details: { workspaceName: data.name, workspaceConfig: { similarityThreshold: data.similarityThreshold, chatProvider: data.chatProvider, // ... 其他配置 }, ipAddress: request.ip, userAgent: request.get('User-Agent'), }, }); ``` 通过这个完整的工作空间创建 API 流程分析,我们可以看到 AnythingLLM 系统如何通过分层架构、事务处理、错误处理和性能优化来确保工作空间创建的可靠性和性能。整个流程从前端表单到后端数据库操作,再到向量数据库初始化,形成了一个完整的数据处理链路。 ## 15. 工作空间存储位置和数据结构 ### 概述 工作空间是 AnythingLLM 系统中的核心隔离单元,每个工作空间拥有独立的文档、向量数据、聊天记录和配置。系统通过多层次的存储机制确保工作空间间的数据隔离和安全性。 ### 工作空间存储架构 #### 1. 文档存储结构 工作空间的文档存储采用分层组织方式: ```javascript // server/utils/files/index.js const documentsPath = process.env.STORAGE_DIR ? path.resolve(process.env.STORAGE_DIR, "documents") : path.resolve(__dirname, `../../storage/documents`); ``` 文档存储路径结构: ``` /app/server/storage/documents/ ├── workspace-slug-1/ │ ├── doc-uuid-1/ │ │ ├── original-file.pdf │ │ ├── processed.json │ │ └── chunks/ │ │ ├── chunk-1.json │ │ └── chunk-2.json │ └── doc-uuid-2/ │ ├── original-file.docx │ └── processed.json └── workspace-slug-2/ └── doc-uuid-3/ └── original-file.txt ``` #### 2. 向量数据库隔离机制 每个工作空间通过 `vectorTag` 实现向量数据的完全隔离: ```javascript // 工作空间创建时生成唯一标识 vectorTag: `ws_${randomUUID().slice(0, 8)}` // 向量数据库查询时使用工作空间标签 const vectorDb = getVectorDbClass(); const results = await vectorDb.performSimilaritySearch({ namespace: workspace.vectorTag, // 工作空间隔离 inputText: query, similarityThreshold: 0.25, }); ``` 支持的向量数据库隔离方式: 1. **LanceDB**: 每个工作空间独立的命名空间 2. **Pinecone**: 每个工作空间独立的索引 3. **Chroma**: 每个工作空间独立的集合 4. **Qdrant**: 每个工作空间独立的集合 #### 3. 聊天记录存储 聊天记录存储在关系数据库中,通过 `workspace_id` 进行隔离: ```javascript // server/models/workspaceChats.js static async forWorkspace(workspaceId) { return await prisma.workspace_chats.findMany({ where: { workspace_id: workspaceId, }, orderBy: { createdAt: 'desc', }, }); } ``` 聊天数据结构: ```javascript { id: 1, workspace_id: 1, prompt: "用户查询内容", response: "AI回复内容", include: true, // 是否包含在上下文中 session_id: "session-uuid", thread_id: "thread-uuid", // 对话线程ID createdAt: "2024-01-01T10:00:00Z" } ``` #### 4. Agent 相关数据存储 ##### Agent Flows 存储 ```javascript // server/utils/agentFlows/index.js static flowsDir = process.env.STORAGE_DIR ? path.join(process.env.STORAGE_DIR, "plugins", "agent-flows") : path.join(process.cwd(), "storage", "plugins", "agent-flows"); // 工作空间特定的 Flow 存储路径 const workspaceFlowDir = path.join(AgentFlows.flowsDir, workspace.slug); ``` ##### Agent Invocations 记录 ```javascript // server/models/workspaceAgentInvocations.js // 记录所有 Agent 调用历史 { id: 1, uuid: "invocation-uuid", prompt: "Agent 执行的提示", response: "Agent 的响应结果", closed: true, // 是否已完成 user_id: 1, workspace_id: 1, createdAt: "2024-01-01T10:00:00Z" } ``` ### 完整的工作空间存储目录结构 ```mermaid graph TD A[STORAGE_DIR] --> B[documents/] A --> C[lancedb/] A --> D[plugins/] A --> E[assets/] A --> F[models/] A --> G[tmp/] A --> H[logs/] B --> B1[workspace-slug-1/] B --> B2[workspace-slug-2/] B1 --> B11[doc-uuid-1/] B1 --> B12[doc-uuid-2/] B11 --> B111[original-file.pdf] B11 --> B112[processed.json] B11 --> B113[chunks/] C --> C1[ws_12345678/] C --> C2[ws_87654321/] D --> D1[agent-flows/] D --> D2[agent-skills/] D1 --> D11[workspace-slug-1/] D1 --> D12[workspace-slug-2/] D11 --> D111[flow-1.json] D11 --> D112[flow-2.json] E --> E1[pfp/] E --> E2[logos/] E1 --> E11[workspace-slug-1.png] F --> F1[context-windows/] F --> F2[openrouter/] F --> F3[gemini/] ``` ### 数据隔离机制详解 #### 1. 数据库层面隔离 ```mermaid erDiagram workspaces ||--o{ workspace_documents : "包含" workspaces ||--o{ workspace_chats : "拥有" workspaces ||--o{ workspace_users : "授权" workspaces ||--o{ workspace_agent_invocations : "记录" workspaces ||--o{ workspace_threads : "组织" workspace_documents { int id PK string docId UK string filename string docpath int workspaceId FK string metadata boolean pinned boolean watched datetime createdAt } workspace_chats { int id PK workspaceId FK string prompt string response boolean include string sessionId string threadId datetime createdAt } ``` #### 2. 文件系统隔离 每个工作空间的文档存储在独立的子目录中: ```javascript // 文档路径生成函数 function getWorkspaceDocumentPath(workspaceSlug, docId) { return path.join( process.env.STORAGE_DIR || './storage', 'documents', workspaceSlug, docId ); } // 示例路径 // /app/server/storage/documents/my-workspace/doc-123456/ ``` #### 3. 向量数据库隔离 不同向量数据库的隔离实现: ```javascript // LanceDB 隔离 const lancedbUri = `${ process.env.STORAGE_DIR ? `${process.env.STORAGE_DIR}/` : "./storage/" }lancedb`; // Chroma 隔离 const chromaCollection = `workspace_${workspace.id}`; // Pinecone 隔离 const pineconeIndex = `anythingllm-${workspace.vectorTag}`; ``` ### 工作空间数据备份和恢复 #### 1. 备份策略 ```bash # 完整工作空间备份脚本 #!/bin/bash WORKSPACE_SLUG=$1 BACKUP_DIR="./backups/${WORKSPACE_SLUG}" STORAGE_DIR="/app/server/storage" # 创建备份目录 mkdir -p $BACKUP_DIR # 备份文档 cp -r $STORAGE_DIR/documents/$WORKSPACE_SLUG $BACKUP_DIR/ # 备份向量数据 cp -r $STORAGE_DIR/lancedb/ws_${WORKSPACE_SLUG} $BACKUP_DIR/ # 备份 Agent Flows cp -r $STORAGE_DIR/plugins/agent-flows/$WORKSPACE_SLUG $BACKUP_DIR/ # 导出数据库记录 sqlite3 $STORAGE_DIR/anythingllm.db \ ".backup $BACKUP_DIR/workspace_${WORKSPACE_SLUG}.db" ``` #### 2. 恢复流程 ```javascript // 工作空间恢复服务 async function restoreWorkspace(backupPath, workspaceSlug) { // 1. 恢复文件系统数据 await restoreDocuments(backupPath, workspaceSlug); await restoreVectorData(backupPath, workspaceSlug); await restoreAgentFlows(backupPath, workspaceSlug); // 2. 恢复数据库记录 await restoreDatabaseRecords(backupPath, workspaceSlug); // 3. 重新索引向量数据库 await reindexWorkspace(workspaceSlug); } ``` ### 性能优化措施 #### 1. 文档存储优化 ```javascript // 文档分片存储 const CHUNK_SIZE = 1000; // 1MB per chunk async function storeLargeFile(file, workspaceSlug) { const chunks = splitFileIntoChunks(file, CHUNK_SIZE); const chunkPaths = []; for (const chunk of chunks) { const chunkPath = await storeChunk(chunk, workspaceSlug); chunkPaths.push(chunkPath); } return { originalPath: await storeOriginal(file, workspaceSlug), chunkPaths, manifest: createManifest(chunks) }; } ``` #### 2. 向量缓存优化 ```javascript // 工作空间级别的向量缓存 class WorkspaceVectorCache { constructor(workspaceId) { this.cachePath = path.join( process.env.STORAGE_DIR, 'vector-cache', `workspace-${workspaceId}.cache` ); this.cache = new LRUCache({ max: 1000, // 最多缓存1000个向量 ttl: 3600 // 1小时过期 }); } } ``` ### 安全性考虑 #### 1. 访问控制 ```javascript // 工作空间访问权限检查 async function checkWorkspaceAccess(userId, workspaceSlug) { const workspace = await Workspace.get({ slug: workspaceSlug }); if (!workspace) { throw new Error('Workspace not found'); } const hasAccess = await WorkspaceUser.hasAccess(userId, workspace.id); if (!hasAccess) { throw new Error('Access denied'); } return workspace; } ``` #### 2. 数据加密 ```javascript // 敏感数据加密存储 const crypto = require('crypto'); const algorithm = 'aes-256-gcm'; function encryptWorkspaceData(data, secretKey) { const iv = crypto.randomBytes(16); const cipher = crypto.createCipheriv(algorithm, secretKey, iv); let encrypted = cipher.update(JSON.stringify(data), 'utf8', 'hex'); encrypted += cipher.final('hex'); const authTag = cipher.getAuthTag(); return { encrypted, iv: iv.toString('hex'), authTag: authTag.toString('hex') }; } ``` ### 监控和维护 #### 1. 存储空间监控 ```javascript // 工作空间存储使用情况监控 async function getWorkspaceStorageStats(workspaceSlug) { const workspacePath = path.join( process.env.STORAGE_DIR, 'documents', workspaceSlug ); const stats = await getDirectorySize(workspacePath); const documentCount = await countDocuments(workspaceSlug); const vectorCount = await getVectorCount(workspaceSlug); return { totalSize: stats.size, fileCount: stats.files, documentCount, vectorCount, lastUpdated: stats.lastModified }; } ``` #### 2. 清理策略 ```javascript // 定期清理未使用的文件 async function cleanupWorkspaceFiles(workspaceSlug, retentionDays = 30) { const cutoffDate = new Date(); cutoffDate.setDate(cutoffDate.getDate() - retentionDays); // 清理临时文件 await cleanupTempFiles(workspaceSlug, cutoffDate); // 清理未引用的文档 await cleanupOrphanedDocuments(workspaceSlug, cutoffDate); // 压缩旧日志 await compressOldLogs(workspaceSlug, cutoffDate); } ``` 通过这种多层次的工作空间存储架构,AnythingLLM 确保了数据的隔离性、安全性和可扩展性,为企业级应用提供了可靠的数据管理基础。 ## 16. Prisma ORM 配置和 PostgreSQL 迁移 ### 概述 AnythingLLM 系统使用 Prisma 作为 ORM(对象关系映射)工具,管理数据库操作和数据模型。系统默认使用 SQLite 数据库,但支持迁移到 PostgreSQL 等企业级数据库。本章详细分析 Prisma 的配置机制、迁移流程以及 PostgreSQL 迁移的具体步骤。 ### Prisma 配置架构 #### 1. 数据源配置 (`server/prisma/schema.prisma`) ```prisma // 数据源定义 datasource db { provider = "sqlite" url = "file:../storage/anythingllm.db" } // 生成配置 generator client { provider = "prisma-client-js" } ``` 关键配置说明: - `provider`: 数据库类型,支持 "sqlite"、"postgresql"、"mysql" 等 - `url`: 数据库连接字符串,支持环境变量占位符 - `shadowDatabaseUrl`: PostgreSQL 迁移时使用的影子数据库 #### 2. 环境变量集成 Prisma 通过环境变量实现不同环境的配置切换: ```javascript // 环境变量检测逻辑 const databaseUrl = process.env.DATABASE_URL || (process.env.POSTGRES_USER && process.env.POSTGRES_PASSWORD && process.env.POSTGRES_HOST ? `postgresql://${process.env.POSTGRES_USER}:${process.env.POSTGRES_PASSWORD}@${process.env.POSTGRES_HOST}:5432/${process.env.POSTGRES_DB || 'anythingllm'}` : 'file:../storage/anythingllm.db'); ``` ### 数据模型关系设计 #### 1. 核心实体关系图 ```mermaid erDiagram workspaces ||--o{ workspace_documents : "contains" workspaces ||--o{ workspace_users : "has" workspaces ||--o{ workspace_chats : "has" workspaces ||--o{ workspace_agent_invocations : "has" workspaces ||--o{ workspace_threads : "has" users ||--o{ workspace_users : "belongs_to" users ||--o{ recovery_codes : "has" users ||--o{ password_reset_tokens : "has" system_settings ||--|| system_settings : "unique" event_logs ||--o{ users : "tracked_by" ``` #### 2. 关键模型定义 ```prisma // 工作空间模型 model workspaces { id Int @id @default(autoincrement()) name String slug String @unique vectorTag String? similarityThreshold Float @default(0.25) chatProvider String? @default("openai") chatModel String? @default("gpt-3.5-turbo") agentProvider String? @default("openai") agentModel String? @default("gpt-3.5-turbo") createdAt DateTime @default(now()) lastUpdatedAt DateTime @updatedAt // 关系定义 documents workspace_documents[] users workspace_users[] chats workspace_chats[] agent_invocations workspace_agent_invocations[] threads workspace_threads[] @@map("workspaces") } // 用户模型 model users { id Int @id @default(autoincrement()) username String @unique password String role String @default("default") suspended Int @default(0) dailyMessageLimit Int? @default(-1) createdAt DateTime @default(now()) // 关系定义 workspaces workspace_users[] recovery_codes recovery_codes[] password_reset_tokens password_reset_tokens[] system_prompt_variables system_prompt_variables[] event_logs event_logs[] @@map("users") } ``` ### 迁移机制详解 #### 1. 迁移文件组织 ``` server/prisma/migrations/ ├── 20250226005538_init/ # 初始迁移 │ ├── migration.sql # SQL 迁移脚本 │ └── README.md # 迁移说明 ├── 20250808171557_init/ # PostgreSQL 迁移 │ └── migration.sql # PostgreSQL 适配脚本 └── migration_lock.toml # 迁移锁文件 ``` #### 2. 迁移锁机制 ```toml # migration_lock.toml provider = "postgresql" # 记录当前数据库状态和迁移历史 ``` ### PostgreSQL 迁移完整流程 #### 1. 环境准备 ```bash # 1. 安装 PostgreSQL sudo apt-get update sudo apt-get install postgresql postgresql-contrib # 2. 创建数据库和用户 sudo -u postgres psql CREATE DATABASE anythingllm; CREATE USER anythingllm_user WITH ENCRYPTED PASSWORD 'your_password'; GRANT ALL PRIVILEGES ON DATABASE anythingllm TO anythingllm_user; \q # 3. 设置环境变量 export POSTGRES_USER=anythingllm_user export POSTGRES_PASSWORD=your_password export POSTGRES_HOST=localhost export POSTGRES_DB=anythingllm export DATABASE_URL=postgresql://anythingllm_user:your_password@localhost:5432/anythingllm ``` #### 2. Prisma 配置更新 ```prisma // 更新 schema.prisma datasource db { provider = "postgresql" url = env("DATABASE_URL") shadowDatabaseUrl = env("SHADOW_DATABASE_URL") // 可选,用于迁移测试 } ``` #### 3. 数据迁移步骤 ```bash # 1. 生成 PostgreSQL 迁移文件 npx prisma migrate dev --name postgres-migration # 2. 查看生成的迁移 npx prisma migrate status # 3. 应用迁移 npx prisma migrate deploy # 4. 生成 Prisma 客户端 npx prisma generate # 5. 验证数据库结构 npx prisma db pull ``` #### 4. 数据迁移脚本 ```javascript // server/prisma/migrateToPostgres.js const { PrismaClient } = require('@prisma/client'); const fs = require('fs'); const path = require('path'); async function migrateData() { // 1. 连接 SQLite 数据库 const sqlite = new PrismaClient({ datasources: { db: { url: 'file:../storage/anythingllm.db' } } }); // 2. 连接 PostgreSQL 数据库 const postgres = new PrismaClient(); try { // 3. 迁移工作空间数据 const workspaces = await sqlite.workspaces.findMany(); for (const workspace of workspaces) { await postgres.workspaces.create({ data: { name: workspace.name, slug: workspace.slug, vectorTag: workspace.vectorTag, similarityThreshold: workspace.similarityThreshold, chatProvider: workspace.chatProvider, chatModel: workspace.chatModel, agentProvider: workspace.agentProvider, agentModel: workspace.agentModel, createdAt: workspace.createdAt, lastUpdatedAt: workspace.lastUpdatedAt } }); } // 4. 迁移用户数据 const users = await sqlite.users.findMany(); for (const user of users) { await postgres.users.create({ data: { username: user.username, password: user.password, role: user.role, suspended: user.suspended, dailyMessageLimit: user.dailyMessageLimit, createdAt: user.createdAt } }); } // 5. 迁移关系数据 // ... 其他表的迁移逻辑 console.log('数据迁移完成'); } catch (error) { console.error('迁移失败:', error); } finally { await sqlite.$disconnect(); await postgres.$disconnect(); } } migrateData(); ``` ### 迁移中的常见问题和解决方案 #### 1. 数据类型兼容性 ```sql -- SQLite 到 PostgreSQL 的类型映射 -- TEXT -> VARCHAR or TEXT -- INTEGER -> INTEGER or BIGINT -- REAL -> DOUBLE PRECISION -- BLOB -> BYTEA -- DATETIME -> TIMESTAMP or TIMESTAMPTZ ``` #### 2. 自定义 SQL 迁移 ```sql -- 处理 SQLite 特有的语法 -- 1. AUTOINCREMENT -> SERIAL or BIGSERIAL CREATE TABLE workspaces ( id SERIAL PRIMARY KEY, name VARCHAR(255) NOT NULL, -- 其他字段 ); -- 2. 处理布尔值 -- SQLite: 0/1 -> PostgreSQL: true/false UPDATE users SET suspended = true WHERE suspended = 1; -- 3. 处理日期时间 -- SQLite: Unix timestamp -> PostgreSQL: TIMESTAMP UPDATE workspaces SET created_at = to_timestamp(created_at) WHERE created_at > 0; ``` #### 3. 索引和约束优化 ```sql -- 添加必要的索引 CREATE INDEX idx_workspace_documents_workspace_id ON workspace_documents(workspace_id); CREATE INDEX idx_workspace_chats_workspace_id ON workspace_chats(workspace_id); CREATE INDEX idx_workspace_users_user_id ON workspace_users(user_id); -- 添加外键约束 ALTER TABLE workspace_documents ADD CONSTRAINT fk_workspace_documents_workspace_id FOREIGN KEY (workspace_id) REFERENCES workspaces(id) ON DELETE CASCADE; ``` ### 生产环境迁移策略 #### 1. 零停机迁移流程 ```mermaid sequenceDiagram participant U as 用户 participant A as 应用 participant S as SQLite participant P as PostgreSQL participant M as 迁移工具 Note over U,M: 阶段1: 准备 M->>P: 创建目标数据库 M->>P: 应用 schema 迁移 Note over U,M: 阶段2: 数据同步 M->>S: 读取现有数据 M->>P: 写入到 PostgreSQL loop 增量同步 M->>S: 检查新数据 M->>P: 同步新数据 end Note over U,M: 阶段3: 切换 A->>A: 停止写入 M->>S: 最终同步 M->>P: 验证数据 A->>A: 切换数据库连接 A->>P: 恢复服务 ``` #### 2. 回滚策略 ```bash # 1. 备份 SQLite 数据库 cp server/storage/anythingllm.db server/storage/anythingllm.db.backup # 2. 创建回滚脚本 cat > rollback.sh << 'EOF' #!/bin/bash # 停止应用 pm2 stop anythingllm # 恢复环境变量 export DATABASE_URL="file:../storage/anythingllm.db" # 重启应用 pm2 start anythingllm EOF chmod +x rollback.sh ``` ### 性能优化建议 #### 1. PostgreSQL 配置优化 ```ini # postgresql.conf # 内存配置 shared_buffers = 256MB effective_cache_size = 1GB work_mem = 16MB # 连接配置 max_connections = 100 # 日志配置 log_statement = 'all' log_line_prefix = '%t [%p]: [%l-1] user=%u,db=%d,app=%a,client=%h ' # 查询优化 random_page_cost = 1.1 effective_io_concurrency = 200 ``` #### 2. Prisma 查询优化 ```javascript // 使用 include 减少查询次数 const workspaces = await prisma.workspaces.findMany({ include: { users: true, documents: { take: 10, orderBy: { createdAt: 'desc' } } } }); // 使用分页避免大数据集 const chats = await prisma.workspace_chats.findMany({ where: { workspace_id: workspaceId }, take: 50, skip: page * 50, orderBy: { createdAt: 'desc' } }); // 使用事务确保数据一致性 await prisma.$transaction(async (tx) => { const workspace = await tx.workspaces.create({ data: workspaceData }); await tx.workspace_users.create({ data: { user_id: userId, workspace_id: workspace.id } }); return workspace; }); ``` ### 监控和维护 #### 1. 数据库监控 ```sql -- 监控查询性能 SELECT query, calls, total_time, mean_time, rows FROM pg_stat_statements ORDER BY total_time DESC LIMIT 10; -- 监控表大小 SELECT schemaname, tablename, pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as size FROM pg_tables WHERE schemaname = 'public' ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC; -- 监控连接数 SELECT state, count(*) FROM pg_stat_activity GROUP BY state; ``` #### 2. 自动化维护脚本 ```bash #!/bin/bash # 数据库维护脚本 # db-maintenance.sh # 1. 清理旧数据 psql $DATABASE_URL -c "DELETE FROM event_logs WHERE occurred_at < NOW() - INTERVAL '90 days';" # 2. 更新统计信息 psql $DATABASE_URL -c "ANALYZE;" # 3. 重建索引 psql $DATABASE_URL -c "REINDEX DATABASE anythingllm;" # 4. 备份数据库 pg_dump $DATABASE_URL | gzip > backup_$(date +%Y%m%d).sql.gz ``` ### 最佳实践总结 1. **开发环境** - 使用 SQLite 进行快速开发和测试 - 定期运行 `prisma format` 保持代码风格一致 - 使用 `prisma studio` 可视化查看数据 2. **生产环境** - 使用 PostgreSQL 提供更好的性能和可靠性 - 配置适当的连接池大小 - 实施定期备份策略 - 监控数据库性能指标 3. **迁移流程** - 始终在测试环境验证迁移脚本 - 使用事务确保数据一致性 - 准备详细的回滚计划 - 在低峰期执行生产迁移 4. **安全考虑** - 使用环境变量管理数据库凭据 - 限制数据库访问权限 - 定期更新 Prisma 版本 - 实施数据加密策略 通过 Prisma ORM 和 PostgreSQL 的结合,AnythingLLM 系统能够支持更大规模的数据处理需求,提供更好的性能、可靠性和扩展性,为企业级部署提供了坚实的基础。