|
|
|
|
| import fetch from "node-fetch";
|
| import { PassThrough } from "stream";
|
| import { log } from '../config/logger.js';
|
| import { logApiCall, generateId, getFileExtension, getFileType } from "./utils.js";
|
|
|
|
|
| async function uploadFileToDify(base64Data, config, userId) {
|
| try {
|
|
|
| const matches = base64Data.match(/^data:(.+);base64,(.+)$/);
|
| if (!matches || matches.length !== 3) {
|
| throw new Error("Invalid base64 data");
|
| }
|
| let contentType = matches[1];
|
| const base64String = matches[2];
|
| let fileData = Buffer.from(base64String, "base64");
|
|
|
|
|
| if (contentType === "image/jpg") {
|
| contentType = "image/jpeg";
|
| }
|
|
|
|
|
| const fileExtension = contentType.split("/")[1];
|
|
|
|
|
| const filename = `image.${fileExtension}`;
|
|
|
|
|
| const form = new FormData();
|
| form.append("file", fileData, {
|
| filename: filename,
|
| contentType: contentType,
|
| });
|
| form.append("user", userId);
|
|
|
|
|
| log("info", "正在上传文件到 Dify", {
|
| url: `${config.DIFY_API_URL}/files/upload`,
|
| headers: {
|
| Authorization: `Bearer ${config.API_KEY}`,
|
| ...form.getHeaders(),
|
| },
|
| formData: "<<FILE DATA>>",
|
| });
|
|
|
|
|
| const response = await fetch(`${config.DIFY_API_URL}/files/upload`, {
|
| method: "POST",
|
| headers: {
|
| Authorization: `Bearer ${config.API_KEY}`,
|
| ...form.getHeaders(),
|
| },
|
| body: form,
|
| });
|
|
|
|
|
| log("info", "文件上传响应", {
|
| status: response.status,
|
| statusText: response.statusText,
|
| });
|
|
|
| if (!response.ok) {
|
| const errorBody = await response.text();
|
| log("error", "文件上传失败", {
|
| status: response.status,
|
| statusText: response.statusText,
|
| errorBody: errorBody,
|
| });
|
| throw new Error(
|
| `文件上传失败: ${response.status} ${response.statusText}: ${errorBody}`
|
| );
|
| }
|
|
|
| const result = await response.json();
|
| log("info", "文件上传成功", { fileId: result.id });
|
| return result.id;
|
| } catch (error) {
|
| console.error("上传文件出错:", error);
|
| throw error;
|
| }
|
| }
|
|
|
|
|
| async function handleRequest(req, res, config, requestId, startTime) {
|
| try {
|
| const apiPath = "/completion-messages";
|
| const data = req.body;
|
| const messages = data.messages;
|
| let queryString = "";
|
| let files = [];
|
|
|
|
|
| log("info", "收到请求", {
|
| requestId,
|
| headers: req.headers,
|
| body: data,
|
| });
|
|
|
| const userId = "apiuser";
|
|
|
|
|
| log("info", "开始扫描所有消息中的图片", { requestId, messageCount: messages.length });
|
| for (const message of messages) {
|
| if (Array.isArray(message.content)) {
|
| for (const content of message.content) {
|
| if (content.type === "image_url" && content.image_url && content.image_url.url) {
|
| const imageUrl = content.image_url.url;
|
|
|
|
|
| if (imageUrl.startsWith('data:')) {
|
|
|
| const fileExt = getFileExtension(imageUrl);
|
| const fileType = getFileType(fileExt);
|
| log("info", "检测到base64数据,准备上传", { requestId, fileType, fileExt });
|
| const fileId = await uploadFileToDify(
|
| imageUrl,
|
| config,
|
| userId
|
| );
|
| files.push({
|
| type: fileType,
|
| transfer_method: "local_file",
|
| upload_file_id: fileId,
|
| });
|
| } else {
|
|
|
| const fileExt = getFileExtension(imageUrl);
|
| const fileType = getFileType(fileExt);
|
| log("info", "检测到远程文件URL", { requestId, url: imageUrl.substring(0, 30) + '...', fileType, fileExt });
|
| files.push({
|
| type: fileType,
|
| transfer_method: "remote_url",
|
| url: imageUrl,
|
| });
|
| }
|
| }
|
| }
|
| }
|
| }
|
|
|
|
|
| const lastMessage = messages[messages.length - 1];
|
| if (Array.isArray(lastMessage.content)) {
|
| for (const content of lastMessage.content) {
|
|
|
| if (typeof content === "string") {
|
| queryString += content + "\n";
|
| }
|
|
|
| else if (content.type === "text") {
|
| queryString += content.text + "\n";
|
| }
|
|
|
| }
|
| queryString = queryString.trim();
|
| } else {
|
| queryString = lastMessage.content;
|
| }
|
|
|
|
|
| log("info", "处理 Completion 类型消息", {
|
| requestId,
|
| contentLength: queryString.length,
|
| queryString,
|
| files,
|
| });
|
|
|
| const stream = data.stream !== undefined ? data.stream : false;
|
| let requestBody;
|
|
|
|
|
| const inputKey = config.INPUT_VARIABLE || "query";
|
|
|
|
|
| requestBody = {
|
| inputs: { [inputKey]: queryString },
|
| response_mode: "streaming",
|
| user: userId,
|
| files: files,
|
| };
|
|
|
|
|
| log("info", "发送请求到 Dify", {
|
| requestId,
|
| url: config.DIFY_API_URL + apiPath,
|
| method: "POST",
|
| headers: {
|
| "Content-Type": "application/json",
|
| Authorization: `Bearer ${config.API_KEY}`,
|
| },
|
| body: requestBody,
|
| });
|
|
|
|
|
| const resp = await fetch(config.DIFY_API_URL + apiPath, {
|
| method: "POST",
|
| headers: {
|
| "Content-Type": "application/json",
|
| Authorization: `Bearer ${config.API_KEY}`,
|
| },
|
| body: JSON.stringify(requestBody),
|
| });
|
|
|
|
|
| const apiCallDuration = Date.now() - startTime;
|
| logApiCall(requestId, config, apiPath, apiCallDuration);
|
|
|
|
|
| log("info", "收到 Dify 响应", {
|
| requestId,
|
| status: resp.status,
|
| statusText: resp.statusText,
|
| });
|
|
|
| if (!resp.ok) {
|
| const errorBody = await resp.text();
|
| log("error", "Dify API 请求失败", {
|
| requestId,
|
| status: resp.status,
|
| statusText: resp.statusText,
|
| errorBody: errorBody,
|
| });
|
| res.status(resp.status).send(errorBody);
|
| return;
|
| }
|
|
|
| let isResponseEnded = false;
|
|
|
| if (stream) {
|
| res.setHeader("Content-Type", "text/event-stream");
|
| let buffer = "";
|
| const responseStream = resp.body
|
| .pipe(new PassThrough())
|
| .on("data", (chunk) => {
|
| buffer += chunk.toString();
|
| let lines = buffer.split("\n");
|
|
|
| for (let i = 0; i < lines.length - 1; i++) {
|
| let line = lines[i].trim();
|
|
|
| if (!line.startsWith("data:")) continue;
|
| line = line.slice(5).trim();
|
| let chunkObj;
|
| try {
|
| if (line.startsWith("{")) {
|
| chunkObj = JSON.parse(line);
|
| } else {
|
| continue;
|
| }
|
| } catch (error) {
|
| console.error("解析 chunk 出错:", error);
|
| continue;
|
| }
|
|
|
|
|
| log("debug", "处理 chunk", {
|
| requestId,
|
| chunkObj,
|
| });
|
|
|
| if (
|
| chunkObj.event === "message" ||
|
| chunkObj.event === "agent_message" ||
|
| chunkObj.event === "text_chunk"
|
| ) {
|
| let chunkContent;
|
| if (chunkObj.event === "text_chunk") {
|
| chunkContent = chunkObj.data.text;
|
| } else {
|
| chunkContent = chunkObj.answer;
|
| }
|
|
|
| if (chunkContent !== "") {
|
| const chunkId = `chatcmpl-${Date.now()}`;
|
| const chunkCreated = chunkObj.created_at;
|
|
|
| if (!isResponseEnded) {
|
| res.write(
|
| "data: " +
|
| JSON.stringify({
|
| id: chunkId,
|
| object: "chat.completion.chunk",
|
| created: chunkCreated,
|
| model: data.model,
|
| choices: [
|
| {
|
| index: 0,
|
| delta: {
|
| content: chunkContent,
|
| },
|
| finish_reason: null,
|
| },
|
| ],
|
| }) +
|
| "\n\n"
|
| );
|
| }
|
| }
|
| } else if (
|
| chunkObj.event === "workflow_finished" ||
|
| chunkObj.event === "message_end"
|
| ) {
|
| const chunkId = `chatcmpl-${Date.now()}`;
|
| const chunkCreated = chunkObj.created_at;
|
| if (!isResponseEnded) {
|
| res.write(
|
| "data: " +
|
| JSON.stringify({
|
| id: chunkId,
|
| object: "chat.completion.chunk",
|
| created: chunkCreated,
|
| model: data.model,
|
| choices: [
|
| {
|
| index: 0,
|
| delta: {},
|
| finish_reason: "stop",
|
| },
|
| ],
|
| }) +
|
| "\n\n"
|
| );
|
| }
|
| if (!isResponseEnded) {
|
| res.write("data: [DONE]\n\n");
|
| }
|
|
|
| res.end();
|
| isResponseEnded = true;
|
| } else if (chunkObj.event === "agent_thought") {
|
|
|
| } else if (chunkObj.event === "ping") {
|
|
|
| } else if (chunkObj.event === "error") {
|
| console.error(`Error: ${chunkObj.code}, ${chunkObj.message}`);
|
| res
|
| .status(500)
|
| .write(
|
| `data: ${JSON.stringify({ error: chunkObj.message })}\n\n`
|
| );
|
|
|
| if (!isResponseEnded) {
|
| res.write("data: [DONE]\n\n");
|
| }
|
|
|
| res.end();
|
| isResponseEnded = true;
|
| }
|
| }
|
|
|
| buffer = lines[lines.length - 1];
|
| });
|
|
|
|
|
| responseStream.on("end", () => {
|
| log("info", "响应结束", { requestId });
|
| });
|
| } else {
|
| let result = "";
|
| let usageData = "";
|
| let buffer = "";
|
| let hasError = false;
|
|
|
|
|
| log("info", "开始处理普通响应", {
|
| requestId,
|
| timestamp: new Date().toISOString(),
|
| });
|
|
|
| const responseStream = resp.body;
|
| responseStream.on("data", (chunk) => {
|
| buffer += chunk.toString();
|
| let lines = buffer.split("\n");
|
|
|
| for (let i = 0; i < lines.length - 1; i++) {
|
| const line = lines[i].trim();
|
| if (line === "") continue;
|
| let chunkObj;
|
| try {
|
| const cleanedLine = line.replace(/^data: /, "").trim();
|
| if (cleanedLine.startsWith("{") && cleanedLine.endsWith("}")) {
|
| chunkObj = JSON.parse(cleanedLine);
|
| } else {
|
| continue;
|
| }
|
| } catch (error) {
|
| console.error("解析 JSON 出错:", error);
|
| continue;
|
| }
|
|
|
|
|
| log("debug", "处理 chunk", {
|
| requestId,
|
| chunkObj,
|
| });
|
|
|
| if (
|
| chunkObj.event === "message" ||
|
| chunkObj.event === "agent_message"
|
| ) {
|
| result += chunkObj.answer;
|
| } else if (chunkObj.event === "message_end") {
|
| usageData = {
|
| prompt_tokens: chunkObj.metadata.usage.prompt_tokens || 100,
|
| completion_tokens:
|
| chunkObj.metadata.usage.completion_tokens || 10,
|
| total_tokens: chunkObj.metadata.usage.total_tokens || 110,
|
| };
|
| } else if (chunkObj.event === "workflow_finished") {
|
| const outputs = chunkObj.data.outputs;
|
| if (config.OUTPUT_VARIABLE) {
|
| result = outputs[config.OUTPUT_VARIABLE];
|
| } else {
|
| result = outputs;
|
| }
|
| result = String(result);
|
| usageData = {
|
| prompt_tokens: chunkObj.metadata?.usage?.prompt_tokens || 100,
|
| completion_tokens:
|
| chunkObj.metadata?.usage?.completion_tokens || 10,
|
| total_tokens: chunkObj.data.total_tokens || 110,
|
| };
|
| } else if (chunkObj.event === "agent_thought") {
|
|
|
| } else if (chunkObj.event === "ping") {
|
|
|
| } else if (chunkObj.event === "error") {
|
| hasError = true;
|
| console.error(`Error: ${chunkObj.code}, ${chunkObj.message}`);
|
| break;
|
| }
|
| }
|
|
|
| buffer = lines[lines.length - 1];
|
| });
|
|
|
| responseStream.on("end", () => {
|
| if (hasError) {
|
| res
|
| .status(500)
|
| .json({ error: "An error occurred while processing the request." });
|
| } else {
|
| const formattedResponse = {
|
| id: `chatcmpl-${generateId()}`,
|
| object: "chat.completion",
|
| created: Math.floor(Date.now() / 1000),
|
| model: data.model,
|
| choices: [
|
| {
|
| index: 0,
|
| message: {
|
| role: "assistant",
|
| content: result.trim(),
|
| },
|
| logprobs: null,
|
| finish_reason: "stop",
|
| },
|
| ],
|
| usage: usageData,
|
| system_fingerprint: "fp_2f57f81c11",
|
| };
|
| const jsonResponse = JSON.stringify(formattedResponse, null, 2);
|
|
|
|
|
| log("info", "发送响应", {
|
| requestId,
|
| response: formattedResponse,
|
| });
|
|
|
| res.set("Content-Type", "application/json");
|
| res.send(jsonResponse);
|
| }
|
| });
|
| }
|
| } catch (error) {
|
| console.error("处理 Completion 请求时发生错误:", error);
|
|
|
|
|
| log("error", "处理 Completion 请求时发生错误", {
|
| requestId,
|
| error: error.message,
|
| stack: error.stack,
|
| });
|
|
|
| res.status(500).json({ error: error.message });
|
| }
|
| }
|
|
|
| export default {
|
| handleRequest,
|
| };
|
|
|