luowuyin's picture
25:05:05 10:41:39 v0.3.7
a572854
import {
createClient,
QueryResult as VercelQueryResult,
} from "@vercel/postgres";
import { Pool, PoolClient } from "pg";
const isVercel = process.env.VERCEL === "1";
let vercelPool: {
client: ReturnType<typeof createClient>;
isConnected: boolean;
} | null = null;
let pgPool: Pool | null = null;
async function getVercelClient() {
if (!vercelPool) {
vercelPool = {
client: createClient(),
isConnected: false,
};
}
if (!vercelPool.isConnected) {
try {
await vercelPool.client.connect();
vercelPool.isConnected = true;
} catch (error) {
console.error("Vercel DB connection error:", error);
throw error;
}
}
return vercelPool.client;
}
function getClient() {
if (isVercel) {
return getVercelClient();
} else {
if (!pgPool) {
const config = {
host: process.env.POSTGRES_HOST || "db",
user: process.env.POSTGRES_USER || "postgres",
password: process.env.POSTGRES_PASSWORD,
database: process.env.POSTGRES_DATABASE || "openwebui_monitor",
port: parseInt(process.env.POSTGRES_PORT || "5432"),
max: 20,
idleTimeoutMillis: 30000,
connectionTimeoutMillis: 30000,
statement_timeout: 30000,
};
if (process.env.POSTGRES_URL) {
pgPool = new Pool({
connectionString: process.env.POSTGRES_URL,
ssl: {
rejectUnauthorized: false,
},
max: 20,
idleTimeoutMillis: 30000,
connectionTimeoutMillis: 30000,
statement_timeout: 30000,
});
} else {
pgPool = new Pool(config);
}
pgPool.on("error", (err) => {
console.error("Unexpected error on idle client", err);
process.exit(-1);
});
}
return pgPool;
}
}
type CommonQueryResult<T = any> = {
rows: T[];
rowCount: number;
};
export async function query<T = any>(
text: string,
params?: any[]
): Promise<CommonQueryResult<T>> {
const client = await getClient();
const startTime = Date.now();
if (isVercel) {
try {
const result = await (client as ReturnType<typeof createClient>).query({
text,
values: params || [],
});
return {
rows: result.rows,
rowCount: result.rowCount || 0,
};
} catch (error) {
console.error("[DB Query Error]", error);
if (vercelPool) {
vercelPool.isConnected = false;
}
throw error;
}
} else {
let pgClient;
try {
pgClient = await (client as Pool).connect();
const result = await pgClient.query(text, params);
return {
rows: result.rows,
rowCount: result.rowCount || 0,
};
} catch (error) {
console.error("[DB Query Error]", error);
console.error(`Query text: ${text}`);
console.error(`Query params:`, params);
throw error;
} finally {
if (pgClient) {
pgClient.release();
}
}
}
}
if (typeof window === "undefined") {
process.on("SIGTERM", async () => {
console.log("SIGTERM received, closing database connections");
if (pgPool) {
await pgPool.end();
}
if (vercelPool?.client) {
await vercelPool.client.end();
vercelPool.isConnected = false;
}
});
}
export { getClient };
export async function ensureTablesExist() {
try {
const usersTableExists = await query(`
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_name = 'users'
);
`);
if (!usersTableExists.rows[0].exists) {
await query(`
CREATE TABLE IF NOT EXISTS users (
id TEXT PRIMARY KEY,
email TEXT NOT NULL,
name TEXT NOT NULL,
role TEXT NOT NULL DEFAULT 'user',
balance DECIMAL(16, 6) NOT NULL DEFAULT 0,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
deleted BOOLEAN DEFAULT FALSE
);
`);
} else {
try {
await query(`
DO $$
BEGIN
BEGIN
ALTER TABLE users
ADD COLUMN deleted BOOLEAN DEFAULT FALSE;
EXCEPTION
WHEN duplicate_column THEN NULL;
END;
END $$;
`);
} catch (error) {
console.error("Error adding deleted column to users table:", error);
}
}
const modelPricesTableExists = await query(`
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_name = 'model_prices'
);
`);
const defaultInputPrice = parseFloat(
process.env.DEFAULT_MODEL_INPUT_PRICE || "60"
);
const defaultOutputPrice = parseFloat(
process.env.DEFAULT_MODEL_OUTPUT_PRICE || "60"
);
const defaultPerMsgPrice = parseFloat(
process.env.DEFAULT_MODEL_PER_MSG_PRICE || "-1"
);
if (!modelPricesTableExists.rows[0].exists) {
await query(`
CREATE TABLE IF NOT EXISTS model_prices (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
base_model_id TEXT,
input_price NUMERIC(10, 6) DEFAULT ${defaultInputPrice},
output_price NUMERIC(10, 6) DEFAULT ${defaultOutputPrice},
per_msg_price NUMERIC(10, 6) DEFAULT ${defaultPerMsgPrice},
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
`);
} else {
try {
await query(`
DO $$
BEGIN
BEGIN
ALTER TABLE model_prices
ADD COLUMN per_msg_price NUMERIC(10, 6) DEFAULT ${defaultPerMsgPrice};
EXCEPTION
WHEN duplicate_column THEN NULL;
END;
END $$;
`);
} catch (error) {
console.error("Error adding per_msg_price column:", error);
}
try {
await query(`
DO $$
BEGIN
BEGIN
ALTER TABLE model_prices
ADD COLUMN base_model_id TEXT;
EXCEPTION
WHEN duplicate_column THEN NULL;
END;
END $$;
`);
} catch (error) {
console.error("Error adding base_model_id column:", error);
}
}
const userUsageRecordsTableExists = await query(`
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_name = 'user_usage_records'
);
`);
if (!userUsageRecordsTableExists.rows[0].exists) {
await query(`
CREATE TABLE IF NOT EXISTS user_usage_records (
id SERIAL PRIMARY KEY,
user_id TEXT NOT NULL,
nickname VARCHAR(255) NOT NULL,
use_time TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
model_name VARCHAR(255) NOT NULL,
input_tokens INTEGER NOT NULL,
output_tokens INTEGER NOT NULL,
cost DECIMAL(10, 4) NOT NULL,
balance_after DECIMAL(10, 4) NOT NULL,
FOREIGN KEY (user_id) REFERENCES users(id)
);
`);
}
console.log("Database tables initialized successfully");
} catch (error) {
console.error("Failed to initialize database tables:", error);
throw error;
}
}
export async function initDatabase() {
try {
await ensureTablesExist();
console.log("Database initialized successfully");
} catch (error) {
console.error("Failed to initialize database:", error);
throw error;
}
}
export interface ModelPrice {
id: string;
name: string;
input_price: number;
output_price: number;
per_msg_price: number;
updated_at: Date;
}
export interface UserUsageRecord {
id: number;
userId: number;
nickname: string;
useTime: Date;
modelName: string;
inputTokens: number;
outputTokens: number;
cost: number;
balanceAfter: number;
}
export async function getOrCreateModelPrices(
models: Array<{ id: string; name: string; base_model_id?: string }>
): Promise<ModelPrice[]> {
try {
const defaultInputPrice = parseFloat(
process.env.DEFAULT_MODEL_INPUT_PRICE || "60"
);
const defaultOutputPrice = parseFloat(
process.env.DEFAULT_MODEL_OUTPUT_PRICE || "60"
);
const defaultPerMsgPrice = parseFloat(
process.env.DEFAULT_MODEL_PER_MSG_PRICE || "-1"
);
const modelIds = models.map((m) => m.id);
const baseModelIds = models.map((m) => m.base_model_id).filter((id) => id);
const existingModelsResult = await query(
`SELECT * FROM model_prices WHERE id = ANY($1::text[])`,
[modelIds]
);
const baseModelsResult = await query(
`SELECT * FROM model_prices WHERE id = ANY($1::text[])`,
[baseModelIds]
);
const existingModels = new Map(
existingModelsResult.rows.map((row) => [row.id, row])
);
const baseModels = new Map(
baseModelsResult.rows.map((row) => [row.id, row])
);
const modelsToUpdate = models.filter((m) => existingModels.has(m.id));
const missingModels = models.filter((m) => !existingModels.has(m.id));
if (modelsToUpdate.length > 0) {
for (const model of modelsToUpdate) {
await query(`UPDATE model_prices SET name = $2 WHERE id = $1`, [
model.id,
model.name,
]);
}
}
if (missingModels.length > 0) {
for (const model of missingModels) {
const baseModel = model.base_model_id
? baseModels.get(model.base_model_id)
: null;
await query(
`INSERT INTO model_prices (id, name, input_price, output_price, per_msg_price)
VALUES ($1, $2, $3, $4, $5)
RETURNING *`,
[
model.id,
model.name,
baseModel?.input_price ?? defaultInputPrice,
baseModel?.output_price ?? defaultOutputPrice,
baseModel?.per_msg_price ?? defaultPerMsgPrice,
]
);
}
}
const updatedModelsResult = await query(
`SELECT * FROM model_prices WHERE id = ANY($1::text[])`,
[modelIds]
);
return updatedModelsResult.rows.map((row) => ({
id: row.id,
name: row.name,
input_price: Number(row.input_price),
output_price: Number(row.output_price),
per_msg_price: Number(row.per_msg_price),
updated_at: row.updated_at,
}));
} catch (error) {
console.error("Error in getOrCreateModelPrices:", error);
throw error;
}
}
export async function updateModelPrice(
id: string,
input_price: number,
output_price: number,
per_msg_price: number
): Promise<ModelPrice | null> {
try {
const result = await query(
`UPDATE model_prices
SET
input_price = CAST($2 AS NUMERIC(10,6)),
output_price = CAST($3 AS NUMERIC(10,6)),
per_msg_price = CAST($4 AS NUMERIC(10,6)),
updated_at = CURRENT_TIMESTAMP
WHERE id = $1
RETURNING *`,
[id, input_price, output_price, per_msg_price]
);
if (result.rows[0]) {
return {
id: result.rows[0].id,
name: result.rows[0].model_name,
input_price: Number(result.rows[0].input_price),
output_price: Number(result.rows[0].output_price),
per_msg_price: Number(result.rows[0].per_msg_price),
updated_at: result.rows[0].updated_at,
};
}
return null;
} catch (error) {
console.error("Error updating model price:", error);
throw error;
}
}
export async function updateUserBalance(userId: string, balance: number) {
try {
const result = await query(
`UPDATE users
SET balance = $2
WHERE id = $1
RETURNING id, email, balance`,
[userId, balance]
);
return result.rows[0];
} catch (error) {
console.error("Error in updateUserBalance:", error);
throw error;
}
}
export const pool = {
connect: async () => {
if (isVercel) {
return {
query: async (text: string, params?: any[]) => {
const client = await getVercelClient();
const result = await client.query({
text,
values: params || [],
});
return result;
},
release: () => {},
};
} else {
return (pgPool || (getClient() as Pool)).connect();
}
},
query: async (text: string, params?: any[]) => {
if (isVercel) {
const client = await getVercelClient();
return client.query({
text,
values: params || [],
});
} else {
return (pgPool || (getClient() as Pool)).query(text, params);
}
},
end: async () => {
if (isVercel) {
if (vercelPool?.client) {
await vercelPool.client.end();
vercelPool.isConnected = false;
}
} else if (pgPool) {
await pgPool.end();
}
},
};