import { FastifyInstance } from 'fastify'; import { prisma } from '../services/prisma'; import { logger } from '../logger'; import OpenAI from 'openai'; const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY, timeout: 20_000 }); export async function analyticsRoutes(fastify: FastifyInstance) { /** * GET /v1/analytics/usage * Returns volume statistics: messages, users, and estimated token consumption. */ fastify.get('/usage', async (req, reply) => { const organizationId = req.organizationId; if (!organizationId) { return reply.code(400).send({ error: 'Organization ID is required' }); } try { const [ totalMessages, inboundMessages, outboundMessages, totalUsers, activeUsersLast24h, usageAgg, usageByFeature, ] = await Promise.all([ prisma.message.count({ where: { organizationId } }), prisma.message.count({ where: { organizationId, direction: 'INBOUND' } }), prisma.message.count({ where: { organizationId, direction: 'OUTBOUND' } }), prisma.user.count({ where: { organizationId } }), prisma.user.count({ where: { organizationId, lastActivityAt: { gte: new Date(Date.now() - 24 * 60 * 60 * 1000) } } }), // Real costs from UsageEvent table prisma.usageEvent.aggregate({ where: { organizationId }, _sum: { tokensIn: true, tokensOut: true, costUsd: true }, }), prisma.usageEvent.groupBy({ by: ['feature'], where: { organizationId }, _sum: { costUsd: true, tokensIn: true, tokensOut: true }, _count: { _all: true }, }), ]); const realTokensIn = usageAgg._sum.tokensIn ?? 0; const realTokensOut = usageAgg._sum.tokensOut ?? 0; const realCostUsd = usageAgg._sum.costUsd ?? 0; return { messages: { total: totalMessages, inbound: inboundMessages, outbound: outboundMessages }, users: { total: totalUsers, activeLast24h: activeUsersLast24h }, costs: { tokensIn: realTokensIn, tokensOut: realTokensOut, totalTokens: realTokensIn + realTokensOut, totalUsd: realCostUsd, byFeature: usageByFeature.map((f: any) => ({ feature: f.feature, calls: f._count._all, tokensIn: f._sum.tokensIn ?? 0, tokensOut: f._sum.tokensOut ?? 0, costUsd: f._sum.costUsd ?? 0, })), } }; } catch (err) { logger.error({ err }, '[ANALYTICS] Usage fetch failed:'); return reply.code(500).send({ error: 'Failed to fetch usage analytics' }); } }); /** * GET /v1/analytics/pedagogy * Returns pedagogical performance: completion rates and scores. */ fastify.get('/pedagogy', async (req, reply) => { const organizationId = req.organizationId; if (!organizationId) { return reply.code(400).send({ error: 'Organization ID is required' }); } try { const enrollments = await prisma.enrollment.findMany({ where: { organizationId }, select: { status: true, currentDay: true } }); const total = enrollments.length; const completed = enrollments.filter(e => e.status === 'COMPLETED').length; const active = enrollments.filter(e => e.status === 'ACTIVE').length; const averageProgress = total > 0 ? enrollments.reduce((acc, curr) => acc + curr.currentDay, 0) / total : 0; const scores = await prisma.userProgress.aggregate({ where: { organizationId }, _avg: { score: true }, _max: { score: true } }); return { completion: { total, completed, active, rate: total > 0 ? (completed / total) * 100 : 0 }, performance: { averageProgressDays: averageProgress, averageScore: scores._avg.score || 0, maxScore: scores._max.score || 0 } }; } catch (err) { logger.error({ err }, '[ANALYTICS] Pedagogy fetch failed:'); return reply.code(500).send({ error: 'Failed to fetch pedagogical analytics' }); } }); /** * GET /v1/analytics/campaigns * Returns CRM campaign funnel: sent, delivered, read, failed. */ fastify.get('/campaigns', async (req, reply) => { const organizationId = req.organizationId; if (!organizationId) { return reply.code(400).send({ error: 'Organization ID is required' }); } try { const stats = await prisma.campaignHistory.groupBy({ by: ['status'], where: { organizationId }, _count: { _all: true } }); const counts: Record = { SENT: 0, DELIVERED: 0, READ: 0, FAILED: 0 }; stats.forEach((s: any) => { counts[s.status] = s._count._all; }); const total = counts.SENT + counts.DELIVERED + counts.READ + counts.FAILED; // Funnel logic: DELIVERED usually implies it was SENT, etc. // But here we count specific statuses. return { summary: { total, sent: counts.SENT, delivered: counts.DELIVERED, read: counts.READ, failed: counts.FAILED, deliveryRate: total > 0 ? ((counts.DELIVERED + counts.READ) / total) * 100 : 0, readRate: (counts.DELIVERED + counts.READ) > 0 ? (counts.READ / (counts.DELIVERED + counts.READ)) * 100 : 0 }, funnel: [ { name: 'Envoyés', value: total, fill: '#6366f1' }, { name: 'Livrés', value: counts.DELIVERED + counts.READ, fill: '#8b5cf6' }, { name: 'Lus', value: counts.READ, fill: '#ec4899' } ] }; } catch (err) { logger.error({ err }, '[ANALYTICS] Campaigns fetch failed:'); return reply.code(500).send({ error: 'Failed to fetch campaign analytics' }); } }); /** * POST /v1/analytics/query * Natural language → SQL → results (Text-to-SQL). * Only SELECT queries allowed. organizationId is always injected. */ fastify.post('/query', async (req, reply) => { const organizationId = (req as any).organizationId; if (!organizationId) return reply.code(400).send({ error: 'Organization context required' }); const { question, language = 'FR' } = req.body as { question?: string; language?: string }; if (!question?.trim()) return reply.code(400).send({ error: 'Question is required' }); // Minimal schema context — enough for the LLM to generate valid queries const SCHEMA_CONTEXT = ` Tables disponibles (PostgreSQL, toutes filtrées par organizationId): - "User"(id, phone, name, language, activity, currentStreak, createdAt, deletedAt) - "Enrollment"(id, userId, trackId, status, currentDay, startedAt, completedAt, deletedAt) - "UserProgress"(id, userId, trackId, exerciseStatus, confidenceScore, iterationCount) - "Message"(id, userId, direction, content, createdAt) - "Track"(id, title, duration, language) - "CampaignHistory"(id, contactId, status, createdAt) - "UsageEvent"(id, feature, provider, tokensIn, tokensOut, costUsd, createdAt) - "WalletTransaction"(id, amount, type, balanceAfter, createdAt) - "Contact"(id, phoneNumber, name, language, tags, createdAt) Règles IMPÉRATIVES: 1. Tu dois générer UNIQUEMENT un SELECT SQL valide. 2. TOUJOURS inclure WHERE "organizationId" = '' dans la clause WHERE. 3. LIMIT 100 maximum. 4. Utilise des alias lisibles pour les colonnes (AS). 5. Ne génère PAS de CTE, sous-requêtes complexes, ou JOIN sur plus de 3 tables. 6. Réponds UNIQUEMENT avec le SQL brut, sans markdown, sans explications. `; let sql: string; try { const completion = await openai.chat.completions.create({ model: 'gpt-4o-mini', temperature: 0, messages: [ { role: 'system', content: SCHEMA_CONTEXT }, { role: 'user', content: `Génère le SQL pour répondre à cette question (langue interface: ${language}):\n"${question}"` }, ], }); sql = (completion.choices[0]?.message?.content ?? '').trim(); } catch (err: any) { logger.error({ err }, '[TEXT-SQL] LLM generation failed'); return reply.code(503).send({ error: 'AI service unavailable' }); } // Defense 1: UNION check on raw SQL before any normalization (catches mixed-case / whitespace tricks) if (/union/i.test(sql)) { logger.warn('[TEXT-SQL] UNION detected — query rejected'); return reply.code(400).send({ error: 'UNION not allowed' }); } // Defense 2: only allow simple SELECT — reject dangerous patterns const normalized = sql.replace(/\s+/g, ' ').trim().toUpperCase(); if (!normalized.startsWith('SELECT')) { logger.warn('[TEXT-SQL] Non-SELECT query rejected'); return reply.code(400).send({ error: 'Only SELECT queries are allowed' }); } const DANGEROUS_PATTERNS = [/\bUNION\b/, /\bINSERT\b/, /\bUPDATE\b/, /\bDELETE\b/, /\bDROP\b/, /\bEXEC\b/, /\bEXECUTE\b/, /--/, /\/\*/, /;\s*SELECT/i]; if (DANGEROUS_PATTERNS.some(p => p.test(normalized))) { logger.warn('[TEXT-SQL] Dangerous pattern detected — query rejected'); return reply.code(400).send({ error: 'Query contains disallowed patterns' }); } // Defense 3: allowlist of tables the org is permitted to query const ALLOWED_TABLES = new Set([ 'Message', 'User', 'Enrollment', 'UserProgress', 'Contact', 'WalletTransaction', 'AuditLog', 'BusinessProfile', 'KnowledgeBaseEntry', 'TrackDay', 'Track', 'Campaign', 'CampaignHistory', 'UsageEvent', ]); const referencedTables = [...sql.matchAll(/(?:FROM|JOIN)\s+"?(\w+)"?/gi)].map(m => m[1]); const disallowedTable = referencedTables.find(t => !ALLOWED_TABLES.has(t)); if (disallowedTable) { logger.warn({ disallowedTable }, '[TEXT-SQL] Disallowed table reference — query rejected'); return reply.code(400).send({ error: `Table not allowed: ${disallowedTable}` }); } // Inject correct organizationId (replace placeholder used by LLM) const safeSql = sql .replace(/''/g, `'${organizationId}'`) .replace(/""/g, `'${organizationId}'`); // Verify the organizationId is present as a safety net if (!safeSql.includes(organizationId)) { logger.warn({ organizationId }, '[TEXT-SQL] Query missing organizationId — rejected'); return reply.code(400).send({ error: 'Generated query does not scope to the organization' }); } try { const rows = await prisma.$queryRawUnsafe[]>(safeSql); return { rows, count: rows.length }; } catch (err: any) { logger.error({ err }, '[TEXT-SQL] Query execution failed'); return reply.code(422).send({ error: 'Query execution failed', detail: err.message }); } }); }