STORAGE_DIR 环境变量使用分析
概述
STORAGE_DIR="/app/server/storage" 是 AnythingLLM 系统中核心的环境变量,用于定义所有持久化数据的存储根目录。该变量在 Docker 容器化部署中尤为重要,确保数据持久化和容器重启后的数据保留。
主要使用场景
1. 数据库文件存储
位置: server/prisma/schema.prisma
datasource db {
provider = "sqlite"
url = "file:../storage/anythingllm.db"
}
2. 向量数据库存储
LanceDB 向量数据库:
// server/utils/vectorDbProviders/lance/index.js
uri: `${
!!process.env.STORAGE_DIR ? `${process.env.STORAGE_DIR}/` : "./storage/"
}lancedb`,
3. 文档存储路径
主文档存储:
// server/utils/files/index.js
const documentsPath = process.env.STORAGE_DIR
? path.resolve(process.env.STORAGE_DIR, "documents")
: path.resolve(__dirname, `../../storage/documents`);
const directUploadsPath = process.env.STORAGE_DIR
? path.resolve(process.env.STORAGE_DIR, "direct-uploads")
: path.resolve(__dirname, `../../storage/direct-uploads`);
const vectorCachePath = process.env.STORAGE_DIR
? path.resolve(process.env.STORAGE_DIR, "vector-cache")
: path.resolve(__dirname, `../../storage/vector-cache`);
4. Agent 相关存储
Agent Flows 存储:
// server/utils/agentFlows/index.js
static flowsDir = process.env.STORAGE_DIR
? path.join(process.env.STORAGE_DIR, "plugins", "agent-flows")
: path.join(process.cwd(), "storage", "plugins", "agent-flows");
Agent Skills 存储:
// server/utils/agents/imported.js
const importedSkillsDir = process.env.STORAGE_DIR
? path.resolve(process.env.STORAGE_DIR, "plugins", "agent-skills")
: path.resolve(__dirname, "../../storage/plugins/agent-skills");
5. MCP 服务器配置
MCP 配置文件路径:
// server/utils/MCP/hypervisor/index.js
this.mcpServerJSONPath =
process.env.NODE_ENV === "development"
? path.resolve(
__dirname,
`../../../storage/plugins/anythingllm_mcp_servers.json`
)
: path.resolve(
process.env.STORAGE_DIR ??
path.resolve(__dirname, `../../../storage`),
`plugins/anythingllm_mcp_servers.json`
);
6. 模型和缓存存储
Embedding 模型缓存:
// server/utils/EmbeddingEngines/native/index.js
this.cacheDir = path.resolve(
process.env.STORAGE_DIR
? path.resolve(process.env.STORAGE_DIR, `models`)
: path.resolve(__dirname, `../../../storage/models`)
);
AI Provider 缓存:
// server/utils/AiProviders/openRouter/index.js
const cacheFolder = path.resolve(
process.env.STORAGE_DIR
? path.resolve(process.env.STORAGE_DIR, "models", "openrouter")
: path.resolve(__dirname, `../../../storage/models/openrouter`)
);
7. 资源文件存储
Logo 和 PFP 存储:
// server/utils/files/logo.js
const assetsDirectory = process.env.STORAGE_DIR
? path.join(process.env.STORAGE_DIR, "assets")
: path.join(__dirname, "../../storage/assets");
// server/utils/files/pfp.js
const basePath = process.env.STORAGE_DIR
? path.join(process.env.STORAGE_DIR, "assets/pfp")
: path.join(__dirname, "../../storage/assets/pfp");
8. Collector 相关存储
文档收集器使用:
// collector/utils/files/index.js
const documentsPath = process.env.STORAGE_DIR
? path.resolve(process.env.STORAGE_DIR, `documents`)
: path.resolve(__dirname, `../../../server/storage/documents`);
const directUploadsPath = process.env.STORAGE_DIR
? path.resolve(process.env.STORAGE_DIR, `direct-uploads`)
: path.resolve(__dirname, `../../../server/storage/direct-uploads`);
9. 临时文件存储
临时文件目录:
// server/utils/EmbeddingEngines/native/index.js
const tmpPath = process.env.STORAGE_DIR
? path.resolve(process.env.STORAGE_DIR, "tmp")
: path.resolve(__dirname, `../../../storage/tmp`);
10. 系统配置中的使用
// server/models/systemSettings.js
StorageDir: process.env.STORAGE_DIR,
Docker 部署中的重要性
在 Docker 容器化部署中,STORAGE_DIR 的正确配置至关重要:
- 数据持久化: 通过 Docker 卷挂载确保容器重启后数据不丢失
- 路径映射: 容器内路径
/app/server/storage映射到宿主机路径 - 权限管理: 确保容器内进程有足够的权限访问存储目录
Docker Compose 配置示例
services:
anything-llm:
volumes:
- "../server/storage:/app/server/storage" # 数据持久化
- "../collector/hotdir/:/app/collector/hotdir" # 热目录
目录结构
当 STORAGE_DIR="/app/server/storage" 时,系统会创建以下目录结构:
/app/server/storage/
├── anythingllm.db # SQLite 数据库
├── documents/ # 存储的文档
├── direct-uploads/ # 直接上传的文件
├── vector-cache/ # 向量缓存
├── assets/
│ ├── pfp/ # 用户头像
│ └── logos/ # 自定义 logos
├── models/ # 下载的模型文件
│ ├── context-windows/
│ ├── openrouter/
│ ├── gemini/
│ └── ...
├── plugins/
│ ├── agent-flows/ # Agent 工作流定义
│ ├── agent-skills/ # 导入的 Agent 技能
│ └── anythingllm_mcp_servers.json # MCP 服务器配置
├── tmp/ # 临时文件
└── logs/ # 日志文件
开发环境 vs 生产环境
系统会根据 NODE_ENV 和 STORAGE_DIR 的存在与否来决定使用哪个路径:
生产环境 (Docker):
- 使用
STORAGE_DIR指定的路径 - 通常为
/app/server/storage
- 使用
开发环境:
- 如果
STORAGE_DIR未设置,使用相对路径./storage/或../../storage/ - 确保开发时数据存储在项目目录中
- 如果
最佳实践
- 始终设置 STORAGE_DIR: 在生产环境中明确设置此变量
- 使用绝对路径: 避免使用相对路径,确保路径可解析
- 权限管理: 确保运行进程对存储目录有读写权限
- 备份策略: 定期备份
STORAGE_DIR目录以防止数据丢失 - 监控空间: 监控存储目录的磁盘空间使用情况
故障排除
如果遇到存储相关问题,检查:
- 环境变量是否正确设置
- 目录是否存在且有正确的权限
- 磁盘空间是否充足
- Docker 卷是否正确挂载(仅容器化部署)
13. Collector Hotdir 详细分析
概述
collector/hotdir 是 AnythingLLM 系统中的热目录(Hot Directory)功能,用于自动监控和处理文档文件。该功能允许用户将文件放置在指定目录中,系统会自动检测、处理并向量化这些文件,无需手动上传。
目录结构
collector/
├── hotdir/ # 热目录根目录
│ ├── processing/ # 正在处理的文件
│ ├── completed/ # 处理完成的文件
│ └── failed/ # 处理失败的文件
├── middleware/ # 中间件
│ ├── setDataSigner.js # 数据签名中间件
│ └── verifyIntegrity.js # 完整性验证中间件
└── utils/ # 工具函数
└── files/
└── index.js # 文件处理工具
核心功能
1. 自动文件监控
Hotdir 通过文件系统监控机制,自动检测新添加的文件:
// 文件路径处理
const documentsPath = process.env.STORAGE_DIR
? path.resolve(process.env.STORAGE_DIR, `documents`)
: path.resolve(__dirname, `../../../server/storage/documents`);
const directUploadsPath = process.env.STORAGE_DIR
? path.resolve(process.env.STORAGE_DIR, `direct-uploads`)
: path.resolve(__dirname, `../../../server/storage/direct-uploads`);
2. 文件处理流程
flowchart TD
A[文件放入 hotdir] --> B[文件监控检测]
B --> C[移动到 processing 目录]
C --> D[文件完整性验证]
D --> E[格式解析和文本提取]
E --> F[分块处理]
F --> G[向量化]
G --> H[存储到向量数据库]
H --> I[移动到 completed 目录]
I --> J[更新工作空间文档列表]
D --> K[验证失败?]
K --> L[移动到 failed 目录]
L --> M[记录错误日志]
3. 中间件系统
数据签名中间件 (setDataSigner.js):
- 为处理的文件生成数字签名
- 确保文件的完整性和来源可信
- 防止文件被篡改
完整性验证中间件 (verifyIntegrity.js):
- 验证文件的数字签名
- 检查文件是否损坏
- 确保文件处理的安全性
Docker 部署配置
在 Docker Compose 中,hotdir 需要单独挂载:
services:
anything-llm:
volumes:
- "../server/storage:/app/server/storage" # 主存储目录
- "../collector/hotdir/:/app/collector/hotdir" # 热目录
与 STORAGE_DIR 的关系
Hotdir 与 STORAGE_DIR 紧密协作:
- 文件来源: Hotdir 作为文件输入源
- 存储目标: 处理后的文件存储在 STORAGE_DIR/documents
- 临时处理: 使用 STORAGE_DIR/direct-uploads 作为中转
- 状态跟踪: 在数据库中记录文件处理状态
使用场景
1. 批量文档导入
# 将多个文档文件放入 hotdir
cp /path/to/documents/*.pdf ../collector/hotdir/
cp /path/to/documents/*.docx ../collector/hotdir/
# 系统自动处理这些文件
2. 定期文档更新
# 设置定时任务,定期同步文档到 hotdir
*/5 * * * * rsync -av /shared/documents/ /app/collector/hotdir/
3. 外部系统集成
// 其他系统可以通过 API 触发 hotdir 扫描
POST /api/collector/scan-hotdir
{
"workspace": "workspace-slug",
"options": {
"recursive": true,
"fileTypes": ["pdf", "docx", "txt"]
}
}
配置选项
系统支持通过环境变量配置 hotdir 行为:
# Hotdir 监控间隔(毫秒)
HOTDIR_WATCH_INTERVAL=5000
# 支持的文件类型
HOTDIR_ALLOWED_EXTENSIONS=pdf,docx,txt,md,html
# 并发处理数量
HOTDIR_CONCURRENT_LIMIT=3
# 自动删除已处理文件
HOTDIR_AUTO_CLEANUP=true
监控和日志
1. 处理状态监控
// 查看处理状态
GET /api/collector/status
{
"processing": [
{
"filename": "document.pdf",
"status": "processing",
"progress": 45,
"startedAt": "2024-01-01T10:00:00Z"
}
],
"completed": 150,
"failed": 2
}
2. 错误处理
// 处理失败的文件会记录详细错误信息
{
"filename": "corrupted.pdf",
"error": "File integrity check failed",
"timestamp": "2024-01-01T10:05:00Z",
"stack": "..."
}
最佳实践
文件组织
- 按类型或项目创建子目录
- 使用有意义的文件名
- 避免特殊字符
性能优化
- 控制单次处理的文件数量
- 避免超大文件(>100MB)
- 定期清理 completed 目录
安全考虑
- 设置适当的文件权限
- 定期检查 failed 目录
- 监控异常文件活动
备份策略
- 定期备份 hotdir 中的原始文件
- 保留处理日志
- 实施灾难恢复计划
故障排除
常见问题
文件不被处理
- 检查文件扩展名是否支持
- 确认文件权限
- 查看监控服务状态
处理失败
- 检查 failed 目录中的错误日志
- 验证文件完整性
- 确认磁盘空间充足
性能问题
- 调整并发处理限制
- 优化文件监控间隔
- 清理历史文件
调试命令
# 查看 hotdir 状态
ls -la collector/hotdir/{processing,completed,failed}
# 检查文件权限
stat collector/hotdir/filename.pdf
# 监控文件系统事件
inotifywatch -v collector/hotdir
扩展功能
1. 自定义处理器
// 添加自定义文件处理器
const customProcessor = {
name: 'custom-csv',
extensions: ['csv'],
handler: async (filePath) => {
// 自定义 CSV 处理逻辑
return processedData;
}
};
2. 钩子系统
// 处理前钩子
beforeProcess: async (file) => {
// 预处理逻辑
}
// 处理后钩子
afterProcess: async (file, result) => {
// 后处理逻辑
}
通过 collector/hotdir 功能,AnythingLLM 提供了一个强大而灵活的文档自动化处理系统,大大简化了批量文档导入和持续更新的工作流程。
14. 工作空间创建 API 流程分析
概述
工作空间创建是 AnythingLLM 系统中的核心功能,通过 /api/workspace/new 端点实现。该 API 允许用户创建新的工作空间,配置 LLM 和嵌入模型参数,并自动设置相应的权限和向量数据库。
前端实现流程
1. 创建工作空间组件 (frontend/src/components/WorkspaceNew/index.jsx)
// 1. 状态管理
const [loading, setLoading] = useState(false);
const [error, setError] = useState(null);
// 2. 表单数据处理
const handleSubmit = async (e) => {
e.preventDefault();
setLoading(true);
const form = e.target;
const formData = new FormData(form);
// 3. 构建请求数据
const data = {
name: formData.get("name"),
slug: slugify(formData.get("name")),
...Object.fromEntries(formData.entries()),
similarityThreshold: parseFloat(formData.get("similarityThreshold")),
};
try {
// 4. 发送创建请求
const { workspace } = await System.workspaceNew(data);
// 5. 成功后跳转
window.location.href = `/workspace/${workspace.slug}`;
} catch (err) {
setError(err.message);
} finally {
setLoading(false);
}
};
2. API 请求封装 (frontend/src/utils/request.js)
// System.workspaceNew 实现
async function workspaceNew(data) {
return await fetch(`${API_BASE}/api/workspace/new`, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${getAuthToken()}`,
},
body: JSON.stringify(data),
}).then(handleResponse);
}
后端 API 实现
1. 路由定义 (server/endpoints/workspaces.js)
// POST /api/workspace/new
router.post(
"/new",
[validatedRequest(WorkspaceNewSchema)],
async (request, response) => {
try {
const body = request.body;
const { user } = response.locals;
// 1. 验证用户权限
if (!canCreateWorkspace(user)) {
response.status(403).json({
success: false,
error: "You are not authorized to create workspaces",
});
return;
}
// 2. 调用工作空间创建服务
const { workspace, message } = await Workspace.new(body, user.id);
// 3. 返回成功响应
response.status(201).json({
success: true,
message,
workspace,
});
} catch (e) {
console.error(e.message, e);
response.status(500).json({
success: false,
error: e.message,
});
}
}
);
2. 数据验证模式 (server/utils/middleware/schemaValidation.js)
// WorkspaceNewSchema 定义
const WorkspaceNewSchema = {
name: {
type: "string",
required: true,
minLength: 2,
maxLength: 50,
pattern: "^[a-zA-Z0-9_-\\s]+$",
},
similarityThreshold: {
type: "number",
required: false,
min: 0.0,
max: 1.0,
default: 0.25,
},
openAiTemp: {
type: "number",
required: false,
min: 0.0,
max: 2.0,
default: 0.7,
},
// ... 其他验证规则
};
数据库操作流程
1. 工作空间模型 (server/models/workspace.js)
// Workspace.new 静态方法
static async new(data = {}, ownerId = null) {
try {
// 1. 数据准备和验证
const { name, slug, ...rest } = data;
// 2. 生成唯一标识
const workspaceData = {
name,
slug: await this.generateUniqueSlug(slug),
vectorTag: `ws_${randomUUID().slice(0, 8)}`,
createdAt: new Date(),
lastUpdatedAt: new Date(),
...rest,
};
// 3. 数据库事务处理
const workspace = await prisma.$transaction(async (prisma) => {
// 3.1 创建工作空间记录
const newWorkspace = await prisma.workspaces.create({
data: workspaceData,
});
// 3.2 如果指定了所有者,创建权限记录
if (ownerId) {
await prisma.workspace_users.create({
data: {
user_id: ownerId,
workspace_id: newWorkspace.id,
createdAt: new Date(),
},
});
}
return newWorkspace;
});
// 4. 初始化向量数据库
await this.initVectorDatabase(workspace);
// 5. 记录事件日志
await EventLogs.logEvent(
"workspace_created",
{
workspaceName: workspace.name,
workspaceId: workspace.id,
ownerId,
},
ownerId
);
return { workspace, message: "Workspace created successfully" };
} catch (error) {
console.error("Error creating workspace:", error);
throw new Error("Failed to create workspace");
}
}
2. 用户权限模型 (server/models/workspaceUsers.js)
// 添加用户到工作空间
static async create(workspaceId, userId) {
try {
const workspaceUser = await prisma.workspace_users.create({
data: {
user_id: userId,
workspace_id: workspaceId,
createdAt: new Date(),
},
});
return workspaceUser;
} catch (error) {
console.error("Error creating workspace user:", error);
throw new Error("Failed to add user to workspace");
}
}
向量数据库初始化
1. 向量数据库管理 (server/utils/vectors/index.js)
// 初始化工作空间的向量数据库
async function initVectorDatabase(workspace) {
const vectorDb = getVectorDbClass();
try {
// 创建工作空间特定的向量索引
const result = await vectorDb.createNamespace(workspace.vectorTag);
if (!result.success) {
throw new Error(`Failed to create vector namespace: ${result.error}`);
}
console.log(`Vector database initialized for workspace: ${workspace.name}`);
return result;
} catch (error) {
console.error("Error initializing vector database:", error);
throw error;
}
}
完整的生命周期序列图
sequenceDiagram
participant U as 用户
participant F as 前端 (React)
participant R as 路由器
participant V as 验证中间件
participant W as Workspace 模型
participant P as Prisma ORM
participant VDB as 向量数据库
participant E as 事件日志
participant DB as 数据库
U->>F: 填写工作空间表单
F->>F: 验证表单数据
F->>R: POST /api/workspace/new
R->>V: 执行数据验证
V->>V: 检查字段格式和必填项
V->>R: 验证通过
R->>R: 检查用户权限
R->>W: Workspace.new(data, userId)
W->>W: 生成唯一 slug
W->>P: 开始事务
P->>DB: 创建工作空间记录
DB-->>P: 返回工作空间数据
alt 指定了所有者
P->>DB: 创建 workspace_users 记录
DB-->>P: 返回权限记录
end
P->>P: 提交事务
P-->>W: 返回创建的工作空间
W->>VDB: initVectorDatabase(workspace)
VDB->>VDB: 创建向量命名空间
VDB-->>W: 初始化完成
W->>E: 记录 workspace_created 事件
E->>DB: 保存事件日志
DB-->>E: 保存成功
W-->>R: 返回工作空间和成功消息
R-->>F: 201 Created 响应
F->>F: 更新 UI 状态
F->>U: 重定向到新工作空间
错误处理机制
1. 数据验证错误
// 验证中间件错误处理
if (validationError) {
return response.status(400).json({
success: false,
error: "Invalid request data",
details: validationError.details,
});
}
2. 数据库事务错误
// 事务回滚处理
await prisma.$transaction(async (prisma) => {
// 如果任何操作失败,整个事务会自动回滚
const workspace = await prisma.workspaces.create({ data: workspaceData });
await prisma.workspace_users.create({ data: userWorkspaceData });
return workspace;
}).catch(error => {
console.error("Transaction failed:", error);
throw new Error("Failed to create workspace due to database error");
});
3. 向量数据库错误
// 向量数据库初始化失败处理
if (!vectorResult.success) {
// 工作空间已创建,但向量数据库失败
// 需要清理已创建的工作空间或标记为需要修复
await markWorkspaceForRepair(workspace.id);
throw new Error(`Workspace created but vector database initialization failed: ${vectorResult.error}`);
}
性能优化措施
1. 并发控制
// 使用信号量限制并发创建
const workspaceCreationSemaphore = new Semaphore(5); // 最多同时5个创建操作
router.post("/new", [
validatedRequest(WorkspaceNewSchema),
async (request, response) => {
const release = await workspaceCreationSemaphore.acquire();
try {
// 创建工作空间逻辑
} finally {
release();
}
}
]);
2. 缓存策略
// 缓存工作空间列表
const workspaceCache = new TTLCache({
ttl: 300, // 5分钟缓存
max: 100,
});
// 创建后清除缓存
await workspaceCache.del(`user:${userId}:workspaces`);
监控和日志
1. 性能指标
// 记录创建时间
const startTime = Date.now();
const { workspace } = await Workspace.new(data, userId);
const creationTime = Date.now() - startTime;
// 发送到监控系统
metrics.timing('workspace.creation.time', creationTime);
2. 审计日志
// 记录详细的审计信息
await AuditLog.create({
action: 'workspace_created',
userId: user.id,
details: {
workspaceName: data.name,
workspaceConfig: {
similarityThreshold: data.similarityThreshold,
chatProvider: data.chatProvider,
// ... 其他配置
},
ipAddress: request.ip,
userAgent: request.get('User-Agent'),
},
});
通过这个完整的工作空间创建 API 流程分析,我们可以看到 AnythingLLM 系统如何通过分层架构、事务处理、错误处理和性能优化来确保工作空间创建的可靠性和性能。整个流程从前端表单到后端数据库操作,再到向量数据库初始化,形成了一个完整的数据处理链路。
15. 工作空间存储位置和数据结构
概述
工作空间是 AnythingLLM 系统中的核心隔离单元,每个工作空间拥有独立的文档、向量数据、聊天记录和配置。系统通过多层次的存储机制确保工作空间间的数据隔离和安全性。
工作空间存储架构
1. 文档存储结构
工作空间的文档存储采用分层组织方式:
// server/utils/files/index.js
const documentsPath = process.env.STORAGE_DIR
? path.resolve(process.env.STORAGE_DIR, "documents")
: path.resolve(__dirname, `../../storage/documents`);
文档存储路径结构:
/app/server/storage/documents/
├── workspace-slug-1/
│ ├── doc-uuid-1/
│ │ ├── original-file.pdf
│ │ ├── processed.json
│ │ └── chunks/
│ │ ├── chunk-1.json
│ │ └── chunk-2.json
│ └── doc-uuid-2/
│ ├── original-file.docx
│ └── processed.json
└── workspace-slug-2/
└── doc-uuid-3/
└── original-file.txt
2. 向量数据库隔离机制
每个工作空间通过 vectorTag 实现向量数据的完全隔离:
// 工作空间创建时生成唯一标识
vectorTag: `ws_${randomUUID().slice(0, 8)}`
// 向量数据库查询时使用工作空间标签
const vectorDb = getVectorDbClass();
const results = await vectorDb.performSimilaritySearch({
namespace: workspace.vectorTag, // 工作空间隔离
inputText: query,
similarityThreshold: 0.25,
});
支持的向量数据库隔离方式:
- LanceDB: 每个工作空间独立的命名空间
- Pinecone: 每个工作空间独立的索引
- Chroma: 每个工作空间独立的集合
- Qdrant: 每个工作空间独立的集合
3. 聊天记录存储
聊天记录存储在关系数据库中,通过 workspace_id 进行隔离:
// server/models/workspaceChats.js
static async forWorkspace(workspaceId) {
return await prisma.workspace_chats.findMany({
where: {
workspace_id: workspaceId,
},
orderBy: {
createdAt: 'desc',
},
});
}
聊天数据结构:
{
id: 1,
workspace_id: 1,
prompt: "用户查询内容",
response: "AI回复内容",
include: true, // 是否包含在上下文中
session_id: "session-uuid",
thread_id: "thread-uuid", // 对话线程ID
createdAt: "2024-01-01T10:00:00Z"
}
4. Agent 相关数据存储
Agent Flows 存储
// server/utils/agentFlows/index.js
static flowsDir = process.env.STORAGE_DIR
? path.join(process.env.STORAGE_DIR, "plugins", "agent-flows")
: path.join(process.cwd(), "storage", "plugins", "agent-flows");
// 工作空间特定的 Flow 存储路径
const workspaceFlowDir = path.join(AgentFlows.flowsDir, workspace.slug);
Agent Invocations 记录
// server/models/workspaceAgentInvocations.js
// 记录所有 Agent 调用历史
{
id: 1,
uuid: "invocation-uuid",
prompt: "Agent 执行的提示",
response: "Agent 的响应结果",
closed: true, // 是否已完成
user_id: 1,
workspace_id: 1,
createdAt: "2024-01-01T10:00:00Z"
}
完整的工作空间存储目录结构
graph TD
A[STORAGE_DIR] --> B[documents/]
A --> C[lancedb/]
A --> D[plugins/]
A --> E[assets/]
A --> F[models/]
A --> G[tmp/]
A --> H[logs/]
B --> B1[workspace-slug-1/]
B --> B2[workspace-slug-2/]
B1 --> B11[doc-uuid-1/]
B1 --> B12[doc-uuid-2/]
B11 --> B111[original-file.pdf]
B11 --> B112[processed.json]
B11 --> B113[chunks/]
C --> C1[ws_12345678/] <!-- workspace 1 向量数据 -->
C --> C2[ws_87654321/] <!-- workspace 2 向量数据 -->
D --> D1[agent-flows/]
D --> D2[agent-skills/]
D1 --> D11[workspace-slug-1/]
D1 --> D12[workspace-slug-2/]
D11 --> D111[flow-1.json]
D11 --> D112[flow-2.json]
E --> E1[pfp/]
E --> E2[logos/]
E1 --> E11[workspace-slug-1.png]
F --> F1[context-windows/]
F --> F2[openrouter/]
F --> F3[gemini/]
数据隔离机制详解
1. 数据库层面隔离
erDiagram
workspaces ||--o{ workspace_documents : "包含"
workspaces ||--o{ workspace_chats : "拥有"
workspaces ||--o{ workspace_users : "授权"
workspaces ||--o{ workspace_agent_invocations : "记录"
workspaces ||--o{ workspace_threads : "组织"
workspace_documents {
int id PK
string docId UK
string filename
string docpath
int workspaceId FK
string metadata
boolean pinned
boolean watched
datetime createdAt
}
workspace_chats {
int id PK
workspaceId FK
string prompt
string response
boolean include
string sessionId
string threadId
datetime createdAt
}
2. 文件系统隔离
每个工作空间的文档存储在独立的子目录中:
// 文档路径生成函数
function getWorkspaceDocumentPath(workspaceSlug, docId) {
return path.join(
process.env.STORAGE_DIR || './storage',
'documents',
workspaceSlug,
docId
);
}
// 示例路径
// /app/server/storage/documents/my-workspace/doc-123456/
3. 向量数据库隔离
不同向量数据库的隔离实现:
// LanceDB 隔离
const lancedbUri = `${
process.env.STORAGE_DIR ? `${process.env.STORAGE_DIR}/` : "./storage/"
}lancedb`;
// Chroma 隔离
const chromaCollection = `workspace_${workspace.id}`;
// Pinecone 隔离
const pineconeIndex = `anythingllm-${workspace.vectorTag}`;
工作空间数据备份和恢复
1. 备份策略
# 完整工作空间备份脚本
#!/bin/bash
WORKSPACE_SLUG=$1
BACKUP_DIR="./backups/${WORKSPACE_SLUG}"
STORAGE_DIR="/app/server/storage"
# 创建备份目录
mkdir -p $BACKUP_DIR
# 备份文档
cp -r $STORAGE_DIR/documents/$WORKSPACE_SLUG $BACKUP_DIR/
# 备份向量数据
cp -r $STORAGE_DIR/lancedb/ws_${WORKSPACE_SLUG} $BACKUP_DIR/
# 备份 Agent Flows
cp -r $STORAGE_DIR/plugins/agent-flows/$WORKSPACE_SLUG $BACKUP_DIR/
# 导出数据库记录
sqlite3 $STORAGE_DIR/anythingllm.db \
".backup $BACKUP_DIR/workspace_${WORKSPACE_SLUG}.db"
2. 恢复流程
// 工作空间恢复服务
async function restoreWorkspace(backupPath, workspaceSlug) {
// 1. 恢复文件系统数据
await restoreDocuments(backupPath, workspaceSlug);
await restoreVectorData(backupPath, workspaceSlug);
await restoreAgentFlows(backupPath, workspaceSlug);
// 2. 恢复数据库记录
await restoreDatabaseRecords(backupPath, workspaceSlug);
// 3. 重新索引向量数据库
await reindexWorkspace(workspaceSlug);
}
性能优化措施
1. 文档存储优化
// 文档分片存储
const CHUNK_SIZE = 1000; // 1MB per chunk
async function storeLargeFile(file, workspaceSlug) {
const chunks = splitFileIntoChunks(file, CHUNK_SIZE);
const chunkPaths = [];
for (const chunk of chunks) {
const chunkPath = await storeChunk(chunk, workspaceSlug);
chunkPaths.push(chunkPath);
}
return {
originalPath: await storeOriginal(file, workspaceSlug),
chunkPaths,
manifest: createManifest(chunks)
};
}
2. 向量缓存优化
// 工作空间级别的向量缓存
class WorkspaceVectorCache {
constructor(workspaceId) {
this.cachePath = path.join(
process.env.STORAGE_DIR,
'vector-cache',
`workspace-${workspaceId}.cache`
);
this.cache = new LRUCache({
max: 1000, // 最多缓存1000个向量
ttl: 3600 // 1小时过期
});
}
}
安全性考虑
1. 访问控制
// 工作空间访问权限检查
async function checkWorkspaceAccess(userId, workspaceSlug) {
const workspace = await Workspace.get({ slug: workspaceSlug });
if (!workspace) {
throw new Error('Workspace not found');
}
const hasAccess = await WorkspaceUser.hasAccess(userId, workspace.id);
if (!hasAccess) {
throw new Error('Access denied');
}
return workspace;
}
2. 数据加密
// 敏感数据加密存储
const crypto = require('crypto');
const algorithm = 'aes-256-gcm';
function encryptWorkspaceData(data, secretKey) {
const iv = crypto.randomBytes(16);
const cipher = crypto.createCipheriv(algorithm, secretKey, iv);
let encrypted = cipher.update(JSON.stringify(data), 'utf8', 'hex');
encrypted += cipher.final('hex');
const authTag = cipher.getAuthTag();
return {
encrypted,
iv: iv.toString('hex'),
authTag: authTag.toString('hex')
};
}
监控和维护
1. 存储空间监控
// 工作空间存储使用情况监控
async function getWorkspaceStorageStats(workspaceSlug) {
const workspacePath = path.join(
process.env.STORAGE_DIR,
'documents',
workspaceSlug
);
const stats = await getDirectorySize(workspacePath);
const documentCount = await countDocuments(workspaceSlug);
const vectorCount = await getVectorCount(workspaceSlug);
return {
totalSize: stats.size,
fileCount: stats.files,
documentCount,
vectorCount,
lastUpdated: stats.lastModified
};
}
2. 清理策略
// 定期清理未使用的文件
async function cleanupWorkspaceFiles(workspaceSlug, retentionDays = 30) {
const cutoffDate = new Date();
cutoffDate.setDate(cutoffDate.getDate() - retentionDays);
// 清理临时文件
await cleanupTempFiles(workspaceSlug, cutoffDate);
// 清理未引用的文档
await cleanupOrphanedDocuments(workspaceSlug, cutoffDate);
// 压缩旧日志
await compressOldLogs(workspaceSlug, cutoffDate);
}
通过这种多层次的工作空间存储架构,AnythingLLM 确保了数据的隔离性、安全性和可扩展性,为企业级应用提供了可靠的数据管理基础。
16. Prisma ORM 配置和 PostgreSQL 迁移
概述
AnythingLLM 系统使用 Prisma 作为 ORM(对象关系映射)工具,管理数据库操作和数据模型。系统默认使用 SQLite 数据库,但支持迁移到 PostgreSQL 等企业级数据库。本章详细分析 Prisma 的配置机制、迁移流程以及 PostgreSQL 迁移的具体步骤。
Prisma 配置架构
1. 数据源配置 (server/prisma/schema.prisma)
// 数据源定义
datasource db {
provider = "sqlite"
url = "file:../storage/anythingllm.db"
}
// 生成配置
generator client {
provider = "prisma-client-js"
}
关键配置说明:
provider: 数据库类型,支持 "sqlite"、"postgresql"、"mysql" 等url: 数据库连接字符串,支持环境变量占位符shadowDatabaseUrl: PostgreSQL 迁移时使用的影子数据库
2. 环境变量集成
Prisma 通过环境变量实现不同环境的配置切换:
// 环境变量检测逻辑
const databaseUrl = process.env.DATABASE_URL ||
(process.env.POSTGRES_USER && process.env.POSTGRES_PASSWORD && process.env.POSTGRES_HOST
? `postgresql://${process.env.POSTGRES_USER}:${process.env.POSTGRES_PASSWORD}@${process.env.POSTGRES_HOST}:5432/${process.env.POSTGRES_DB || 'anythingllm'}`
: 'file:../storage/anythingllm.db');
数据模型关系设计
1. 核心实体关系图
erDiagram
workspaces ||--o{ workspace_documents : "contains"
workspaces ||--o{ workspace_users : "has"
workspaces ||--o{ workspace_chats : "has"
workspaces ||--o{ workspace_agent_invocations : "has"
workspaces ||--o{ workspace_threads : "has"
users ||--o{ workspace_users : "belongs_to"
users ||--o{ recovery_codes : "has"
users ||--o{ password_reset_tokens : "has"
system_settings ||--|| system_settings : "unique"
event_logs ||--o{ users : "tracked_by"
2. 关键模型定义
// 工作空间模型
model workspaces {
id Int @id @default(autoincrement())
name String
slug String @unique
vectorTag String?
similarityThreshold Float @default(0.25)
chatProvider String? @default("openai")
chatModel String? @default("gpt-3.5-turbo")
agentProvider String? @default("openai")
agentModel String? @default("gpt-3.5-turbo")
createdAt DateTime @default(now())
lastUpdatedAt DateTime @updatedAt
// 关系定义
documents workspace_documents[]
users workspace_users[]
chats workspace_chats[]
agent_invocations workspace_agent_invocations[]
threads workspace_threads[]
@@map("workspaces")
}
// 用户模型
model users {
id Int @id @default(autoincrement())
username String @unique
password String
role String @default("default")
suspended Int @default(0)
dailyMessageLimit Int? @default(-1)
createdAt DateTime @default(now())
// 关系定义
workspaces workspace_users[]
recovery_codes recovery_codes[]
password_reset_tokens password_reset_tokens[]
system_prompt_variables system_prompt_variables[]
event_logs event_logs[]
@@map("users")
}
迁移机制详解
1. 迁移文件组织
server/prisma/migrations/
├── 20250226005538_init/ # 初始迁移
│ ├── migration.sql # SQL 迁移脚本
│ └── README.md # 迁移说明
├── 20250808171557_init/ # PostgreSQL 迁移
│ └── migration.sql # PostgreSQL 适配脚本
└── migration_lock.toml # 迁移锁文件
2. 迁移锁机制
# migration_lock.toml
provider = "postgresql"
# 记录当前数据库状态和迁移历史
PostgreSQL 迁移完整流程
1. 环境准备
# 1. 安装 PostgreSQL
sudo apt-get update
sudo apt-get install postgresql postgresql-contrib
# 2. 创建数据库和用户
sudo -u postgres psql
CREATE DATABASE anythingllm;
CREATE USER anythingllm_user WITH ENCRYPTED PASSWORD 'your_password';
GRANT ALL PRIVILEGES ON DATABASE anythingllm TO anythingllm_user;
\q
# 3. 设置环境变量
export POSTGRES_USER=anythingllm_user
export POSTGRES_PASSWORD=your_password
export POSTGRES_HOST=localhost
export POSTGRES_DB=anythingllm
export DATABASE_URL=postgresql://anythingllm_user:your_password@localhost:5432/anythingllm
2. Prisma 配置更新
// 更新 schema.prisma
datasource db {
provider = "postgresql"
url = env("DATABASE_URL")
shadowDatabaseUrl = env("SHADOW_DATABASE_URL") // 可选,用于迁移测试
}
3. 数据迁移步骤
# 1. 生成 PostgreSQL 迁移文件
npx prisma migrate dev --name postgres-migration
# 2. 查看生成的迁移
npx prisma migrate status
# 3. 应用迁移
npx prisma migrate deploy
# 4. 生成 Prisma 客户端
npx prisma generate
# 5. 验证数据库结构
npx prisma db pull
4. 数据迁移脚本
// server/prisma/migrateToPostgres.js
const { PrismaClient } = require('@prisma/client');
const fs = require('fs');
const path = require('path');
async function migrateData() {
// 1. 连接 SQLite 数据库
const sqlite = new PrismaClient({
datasources: {
db: {
url: 'file:../storage/anythingllm.db'
}
}
});
// 2. 连接 PostgreSQL 数据库
const postgres = new PrismaClient();
try {
// 3. 迁移工作空间数据
const workspaces = await sqlite.workspaces.findMany();
for (const workspace of workspaces) {
await postgres.workspaces.create({
data: {
name: workspace.name,
slug: workspace.slug,
vectorTag: workspace.vectorTag,
similarityThreshold: workspace.similarityThreshold,
chatProvider: workspace.chatProvider,
chatModel: workspace.chatModel,
agentProvider: workspace.agentProvider,
agentModel: workspace.agentModel,
createdAt: workspace.createdAt,
lastUpdatedAt: workspace.lastUpdatedAt
}
});
}
// 4. 迁移用户数据
const users = await sqlite.users.findMany();
for (const user of users) {
await postgres.users.create({
data: {
username: user.username,
password: user.password,
role: user.role,
suspended: user.suspended,
dailyMessageLimit: user.dailyMessageLimit,
createdAt: user.createdAt
}
});
}
// 5. 迁移关系数据
// ... 其他表的迁移逻辑
console.log('数据迁移完成');
} catch (error) {
console.error('迁移失败:', error);
} finally {
await sqlite.$disconnect();
await postgres.$disconnect();
}
}
migrateData();
迁移中的常见问题和解决方案
1. 数据类型兼容性
-- SQLite 到 PostgreSQL 的类型映射
-- TEXT -> VARCHAR or TEXT
-- INTEGER -> INTEGER or BIGINT
-- REAL -> DOUBLE PRECISION
-- BLOB -> BYTEA
-- DATETIME -> TIMESTAMP or TIMESTAMPTZ
2. 自定义 SQL 迁移
-- 处理 SQLite 特有的语法
-- 1. AUTOINCREMENT -> SERIAL or BIGSERIAL
CREATE TABLE workspaces (
id SERIAL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
-- 其他字段
);
-- 2. 处理布尔值
-- SQLite: 0/1 -> PostgreSQL: true/false
UPDATE users SET suspended = true WHERE suspended = 1;
-- 3. 处理日期时间
-- SQLite: Unix timestamp -> PostgreSQL: TIMESTAMP
UPDATE workspaces SET created_at = to_timestamp(created_at) WHERE created_at > 0;
3. 索引和约束优化
-- 添加必要的索引
CREATE INDEX idx_workspace_documents_workspace_id ON workspace_documents(workspace_id);
CREATE INDEX idx_workspace_chats_workspace_id ON workspace_chats(workspace_id);
CREATE INDEX idx_workspace_users_user_id ON workspace_users(user_id);
-- 添加外键约束
ALTER TABLE workspace_documents
ADD CONSTRAINT fk_workspace_documents_workspace_id
FOREIGN KEY (workspace_id) REFERENCES workspaces(id) ON DELETE CASCADE;
生产环境迁移策略
1. 零停机迁移流程
sequenceDiagram
participant U as 用户
participant A as 应用
participant S as SQLite
participant P as PostgreSQL
participant M as 迁移工具
Note over U,M: 阶段1: 准备
M->>P: 创建目标数据库
M->>P: 应用 schema 迁移
Note over U,M: 阶段2: 数据同步
M->>S: 读取现有数据
M->>P: 写入到 PostgreSQL
loop 增量同步
M->>S: 检查新数据
M->>P: 同步新数据
end
Note over U,M: 阶段3: 切换
A->>A: 停止写入
M->>S: 最终同步
M->>P: 验证数据
A->>A: 切换数据库连接
A->>P: 恢复服务
2. 回滚策略
# 1. 备份 SQLite 数据库
cp server/storage/anythingllm.db server/storage/anythingllm.db.backup
# 2. 创建回滚脚本
cat > rollback.sh << 'EOF'
#!/bin/bash
# 停止应用
pm2 stop anythingllm
# 恢复环境变量
export DATABASE_URL="file:../storage/anythingllm.db"
# 重启应用
pm2 start anythingllm
EOF
chmod +x rollback.sh
性能优化建议
1. PostgreSQL 配置优化
# postgresql.conf
# 内存配置
shared_buffers = 256MB
effective_cache_size = 1GB
work_mem = 16MB
# 连接配置
max_connections = 100
# 日志配置
log_statement = 'all'
log_line_prefix = '%t [%p]: [%l-1] user=%u,db=%d,app=%a,client=%h '
# 查询优化
random_page_cost = 1.1
effective_io_concurrency = 200
2. Prisma 查询优化
// 使用 include 减少查询次数
const workspaces = await prisma.workspaces.findMany({
include: {
users: true,
documents: {
take: 10,
orderBy: { createdAt: 'desc' }
}
}
});
// 使用分页避免大数据集
const chats = await prisma.workspace_chats.findMany({
where: { workspace_id: workspaceId },
take: 50,
skip: page * 50,
orderBy: { createdAt: 'desc' }
});
// 使用事务确保数据一致性
await prisma.$transaction(async (tx) => {
const workspace = await tx.workspaces.create({
data: workspaceData
});
await tx.workspace_users.create({
data: {
user_id: userId,
workspace_id: workspace.id
}
});
return workspace;
});
监控和维护
1. 数据库监控
-- 监控查询性能
SELECT query, calls, total_time, mean_time, rows
FROM pg_stat_statements
ORDER BY total_time DESC
LIMIT 10;
-- 监控表大小
SELECT
schemaname,
tablename,
pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as size
FROM pg_tables
WHERE schemaname = 'public'
ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC;
-- 监控连接数
SELECT state, count(*)
FROM pg_stat_activity
GROUP BY state;
2. 自动化维护脚本
#!/bin/bash
# 数据库维护脚本
# db-maintenance.sh
# 1. 清理旧数据
psql $DATABASE_URL -c "DELETE FROM event_logs WHERE occurred_at < NOW() - INTERVAL '90 days';"
# 2. 更新统计信息
psql $DATABASE_URL -c "ANALYZE;"
# 3. 重建索引
psql $DATABASE_URL -c "REINDEX DATABASE anythingllm;"
# 4. 备份数据库
pg_dump $DATABASE_URL | gzip > backup_$(date +%Y%m%d).sql.gz
最佳实践总结
开发环境
- 使用 SQLite 进行快速开发和测试
- 定期运行
prisma format保持代码风格一致 - 使用
prisma studio可视化查看数据
生产环境
- 使用 PostgreSQL 提供更好的性能和可靠性
- 配置适当的连接池大小
- 实施定期备份策略
- 监控数据库性能指标
迁移流程
- 始终在测试环境验证迁移脚本
- 使用事务确保数据一致性
- 准备详细的回滚计划
- 在低峰期执行生产迁移
安全考虑
- 使用环境变量管理数据库凭据
- 限制数据库访问权限
- 定期更新 Prisma 版本
- 实施数据加密策略
通过 Prisma ORM 和 PostgreSQL 的结合,AnythingLLM 系统能够支持更大规模的数据处理需求,提供更好的性能、可靠性和扩展性,为企业级部署提供了坚实的基础。