| import { env } from "$env/dynamic/private"; |
| import { collections } from "$lib/server/database"; |
| import type { Message } from "$lib/types/Message"; |
| import { error } from "@sveltejs/kit"; |
| import { pathToFileURL } from "node:url"; |
| import { unlink } from "node:fs/promises"; |
| import { uploadFile } from "@huggingface/hub"; |
| import parquet from "parquetjs"; |
| import { z } from "zod"; |
| import { logger } from "$lib/server/logger.js"; |
|
|
| |
| |
|
|
| export async function POST({ request }) { |
| if (!env.PARQUET_EXPORT_DATASET || !env.PARQUET_EXPORT_HF_TOKEN) { |
| throw error(500, "Parquet export is not configured."); |
| } |
|
|
| const { model } = z |
| .object({ |
| model: z.string(), |
| }) |
| .parse(await request.json()); |
|
|
| const schema = new parquet.ParquetSchema({ |
| title: { type: "UTF8" }, |
| created_at: { type: "TIMESTAMP_MILLIS" }, |
| updated_at: { type: "TIMESTAMP_MILLIS" }, |
| messages: { |
| repeated: true, |
| fields: { |
| from: { type: "UTF8" }, |
| content: { type: "UTF8" }, |
| score: { type: "INT_8", optional: true }, |
| }, |
| }, |
| }); |
|
|
| const fileName = `/tmp/conversations-${new Date().toJSON().slice(0, 10)}-${Date.now()}.parquet`; |
|
|
| const writer = await parquet.ParquetWriter.openFile(schema, fileName); |
|
|
| let count = 0; |
| logger.info("Exporting conversations for model", model); |
|
|
| for await (const conversation of collections.settings.aggregate<{ |
| title: string; |
| created_at: Date; |
| updated_at: Date; |
| messages: Message[]; |
| }>([ |
| { |
| $match: { |
| shareConversationsWithModelAuthors: true, |
| sessionId: { $exists: true }, |
| userId: { $exists: false }, |
| }, |
| }, |
| { |
| $lookup: { |
| from: "conversations", |
| localField: "sessionId", |
| foreignField: "sessionId", |
| as: "conversations", |
| pipeline: [{ $match: { model, userId: { $exists: false } } }], |
| }, |
| }, |
| { $unwind: "$conversations" }, |
| { |
| $project: { |
| title: "$conversations.title", |
| created_at: "$conversations.createdAt", |
| updated_at: "$conversations.updatedAt", |
| messages: "$conversations.messages", |
| }, |
| }, |
| ])) { |
| await writer.appendRow({ |
| title: conversation.title, |
| created_at: conversation.created_at, |
| updated_at: conversation.updated_at, |
| messages: conversation.messages.map((message: Message) => ({ |
| from: message.from, |
| content: message.content, |
| ...(message.score ? { score: message.score } : undefined), |
| })), |
| }); |
| ++count; |
|
|
| if (count % 1_000 === 0) { |
| logger.info("Exported", count, "conversations"); |
| } |
| } |
|
|
| logger.info("exporting convos with userId"); |
|
|
| for await (const conversation of collections.settings.aggregate<{ |
| title: string; |
| created_at: Date; |
| updated_at: Date; |
| messages: Message[]; |
| }>([ |
| { $match: { shareConversationsWithModelAuthors: true, userId: { $exists: true } } }, |
| { |
| $lookup: { |
| from: "conversations", |
| localField: "userId", |
| foreignField: "userId", |
| as: "conversations", |
| pipeline: [{ $match: { model } }], |
| }, |
| }, |
| { $unwind: "$conversations" }, |
| { |
| $project: { |
| title: "$conversations.title", |
| created_at: "$conversations.createdAt", |
| updated_at: "$conversations.updatedAt", |
| messages: "$conversations.messages", |
| }, |
| }, |
| ])) { |
| await writer.appendRow({ |
| title: conversation.title, |
| created_at: conversation.created_at, |
| updated_at: conversation.updated_at, |
| messages: conversation.messages.map((message: Message) => ({ |
| from: message.from, |
| content: message.content, |
| ...(message.score ? { score: message.score } : undefined), |
| })), |
| }); |
| ++count; |
|
|
| if (count % 1_000 === 0) { |
| logger.info("Exported", count, "conversations"); |
| } |
| } |
|
|
| await writer.close(); |
|
|
| logger.info("Uploading", fileName, "to Hugging Face Hub"); |
|
|
| await uploadFile({ |
| file: pathToFileURL(fileName) as URL, |
| credentials: { accessToken: env.PARQUET_EXPORT_HF_TOKEN }, |
| repo: { |
| type: "dataset", |
| name: env.PARQUET_EXPORT_DATASET, |
| }, |
| }); |
|
|
| logger.info("Upload done"); |
|
|
| await unlink(fileName); |
|
|
| return new Response(); |
| } |
|
|