jm / src /lib /server.ts
nanoppa's picture
Upload 49 files
d9b2169 verified
import Koa from 'koa';
import KoaRouter from 'koa-router';
import koaRange from 'koa-range';
import koaCors from "koa2-cors";
import koaBody from 'koa-body';
import _ from 'lodash';
import Exception from './exceptions/Exception.ts';
import Request from './request/Request.ts';
import Response from './response/Response.js';
import FailureBody from './response/FailureBody.ts';
import EX from './consts/exceptions.ts';
import logger from './logger.ts';
import config from './config.ts';
class Server {
app;
router;
koaBodyMiddleware;
constructor() {
this.app = new Koa();
this.app.use(koaCors());
// 范围请求支持
this.app.use(koaRange);
this.router = new KoaRouter({ prefix: config.service.urlPrefix });
// 预先创建 koa-body 中间件,支持 multipart 文件上传
this.koaBodyMiddleware = koaBody({
multipart: true,
formidable: {
maxFileSize: 100 * 1024 * 1024, // 100MB
keepExtensions: true,
},
formLimit: '100mb',
jsonLimit: '100mb',
textLimit: '100mb',
parsedMethods: ['POST', 'PUT', 'PATCH'],
});
// 前置处理异常拦截
this.app.use(async (ctx: any, next: Function) => {
if(ctx.request.type === "application/xml" || ctx.request.type === "application/ssml+xml")
ctx.req.headers["content-type"] = "text/xml";
try { await next() }
catch (err) {
logger.error(err);
const failureBody = new FailureBody(err);
new Response(failureBody).injectTo(ctx);
}
});
// 自定义 JSON 解析中间件
this.app.use(async (ctx: any, next: Function) => {
// 跳过 multipart 请求,让 koa-body 处理
if (ctx.is('multipart')) {
await next();
return;
}
if (ctx.is('application/json') && ['POST', 'PUT', 'PATCH'].includes(ctx.method)) {
logger.debug('开始自定义 JSON 解析');
const chunks: Buffer[] = [];
await new Promise((resolve, reject) => {
ctx.req.on('data', (chunk: Buffer) => {
chunks.push(chunk);
});
ctx.req.on('end', () => {
resolve(null);
});
ctx.req.on('error', reject);
});
const body = Buffer.concat(chunks).toString('utf8');
// 清理问题字符
let cleanedBody = body
.replace(/\r\n/g, '\n')
.replace(/\r/g, '\n')
.replace(/\u00A0/g, ' ')
.replace(/[\u2000-\u200B]/g, ' ')
.replace(/\uFEFF/g, '')
.trim();
const parsedBody = JSON.parse(cleanedBody);
logger.debug('JSON 解析成功,跳过 koa-body');
ctx.request.body = parsedBody;
ctx.request.rawBody = cleanedBody;
// 标记已处理,避免 koa-body 再次处理
ctx._jsonProcessed = true;
}
await next();
});
// 载荷解析器支持(只处理未被自定义解析器处理的请求)
this.app.use(async (ctx: any, next: Function) => {
if (!ctx._jsonProcessed) {
await this.koaBodyMiddleware(ctx, next);
} else {
await next();
}
});
this.app.on("error", (err: any) => {
// 忽略连接重试、中断、管道、取消错误
if (["ECONNRESET", "ECONNABORTED", "EPIPE", "ECANCELED"].includes(err.code)) return;
logger.error(err);
});
logger.success("Server initialized");
}
/**
* 附加路由
*
* @param routes 路由列表
*/
attachRoutes(routes: any[]) {
routes.forEach((route: any) => {
const prefix = route.prefix || "";
for (let method in route) {
if(method === "prefix") continue;
if (!_.isObject(route[method])) {
logger.warn(`Router ${prefix} ${method} invalid`);
continue;
}
for (let uri in route[method]) {
this.router[method](`${prefix}${uri}`, async ctx => {
const { request, response } = await this.#requestProcessing(ctx, route[method][uri]);
if(response != null && config.system.requestLog)
logger.info(`<- ${request.method} ${request.url} ${response.time - request.time}ms`);
});
}
}
logger.info(`Route ${config.service.urlPrefix || ""}${prefix} attached`);
});
this.app.use(this.router.routes());
this.app.use((ctx: any) => {
const request = new Request(ctx);
logger.debug(`-> ${ctx.request.method} ${ctx.request.url} request is not supported - ${request.remoteIP || "unknown"}`);
// const failureBody = new FailureBody(new Exception(EX.SYSTEM_NOT_ROUTE_MATCHING, "Request is not supported"));
// const response = new Response(failureBody);
const message = `[请求有误]: 正确请求为 POST -> /v1/chat/completions,当前请求为 ${ctx.request.method} -> ${ctx.request.url} 请纠正`;
logger.warn(message);
const failureBody = new FailureBody(new Error(message));
const response = new Response(failureBody);
response.injectTo(ctx);
if(config.system.requestLog)
logger.info(`<- ${request.method} ${request.url} ${response.time - request.time}ms`);
});
}
/**
* 请求处理
*
* @param ctx 上下文
* @param routeFn 路由方法
*/
#requestProcessing(ctx: any, routeFn: Function): Promise<any> {
return new Promise(resolve => {
const request = new Request(ctx);
try {
if(config.system.requestLog)
logger.info(`-> ${request.method} ${request.url}`);
routeFn(request)
.then(response => {
try {
if(!Response.isInstance(response)) {
const _response = new Response(response);
_response.injectTo(ctx);
return resolve({ request, response: _response });
}
response.injectTo(ctx);
resolve({ request, response });
}
catch(err) {
logger.error(err);
const failureBody = new FailureBody(err);
const response = new Response(failureBody);
response.injectTo(ctx);
resolve({ request, response });
}
})
.catch(err => {
try {
logger.error(err);
const failureBody = new FailureBody(err);
const response = new Response(failureBody);
response.injectTo(ctx);
resolve({ request, response });
}
catch(err) {
logger.error(err);
const failureBody = new FailureBody(err);
const response = new Response(failureBody);
response.injectTo(ctx);
resolve({ request, response });
}
});
}
catch(err) {
logger.error(err);
const failureBody = new FailureBody(err);
const response = new Response(failureBody);
response.injectTo(ctx);
resolve({ request, response });
}
});
}
/**
* 监听端口
*/
async listen() {
const host = config.service.host;
const port = config.service.port;
await Promise.all([
new Promise((resolve, reject) => {
if(host === "0.0.0.0" || host === "localhost" || host === "127.0.0.1")
return resolve(null);
this.app.listen(port, "localhost", err => {
if(err) return reject(err);
resolve(null);
});
}),
new Promise((resolve, reject) => {
this.app.listen(port, host, err => {
if(err) return reject(err);
resolve(null);
});
})
]);
logger.success(`Server listening on port ${port} (${host})`);
}
}
export default new Server();