File size: 2,985 Bytes
c09f67c | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 | import { replicationCache } from "@midday/cache/replication-cache";
import { teamPermissionsCache } from "@midday/cache/team-permissions-cache";
import type { DatabaseWithPrimary } from "@midday/db/client";
import { getUserTeamId } from "@midday/db/queries";
import { logger } from "@midday/logger";
import type { MiddlewareHandler } from "hono";
/**
* Database middleware that handles replication lag based on mutation operations
* For mutations: always use primary DB
* For queries: use primary DB if the team recently performed a mutation
*/
export const withPrimaryReadAfterWrite: MiddlewareHandler = async (c, next) => {
// Get session and database from context
const session = c.get("session");
const db = c.get("db");
// Determine operation type based on HTTP method
const method = c.req.method;
const operationType = ["POST", "PUT", "PATCH", "DELETE"].includes(method)
? "mutation"
: "query";
let teamId: string | null = null;
// For OAuth sessions, use the token's team, not the user's current team
if (session?.oauth) {
teamId = session.teamId || null;
}
// For non-OAuth sessions, get user's current team
else if (session?.user?.id) {
const cacheKey = `user:${session.user.id}:team`;
teamId = (await teamPermissionsCache.get(cacheKey)) || null;
if (!teamId && session.user.id) {
try {
// Get user's current team
const userTeamId = await getUserTeamId(db, session.user.id);
if (userTeamId) {
teamId = userTeamId;
await teamPermissionsCache.set(cacheKey, userTeamId);
}
} catch (error) {
logger.warn("Failed to fetch user team", {
userId: session.user.id,
error: error instanceof Error ? error.message : "Unknown error",
});
}
}
}
let finalDb = db;
if (teamId) {
// For mutations, always use primary DB and update the team's timestamp
if (operationType === "mutation") {
await replicationCache.set(teamId);
// Use primary-only mode to maintain interface consistency
const dbWithPrimary = db as DatabaseWithPrimary;
if (dbWithPrimary.usePrimaryOnly) {
finalDb = dbWithPrimary.usePrimaryOnly();
}
// If usePrimaryOnly doesn't exist, we're already using the primary DB
}
// For queries, check if the team recently performed a mutation
else {
const timestamp = await replicationCache.get(teamId);
const now = Date.now();
// If the timestamp exists and hasn't expired, use primary DB
if (timestamp && now < timestamp) {
// Use primary-only mode to maintain interface consistency
const dbWithPrimary = db as DatabaseWithPrimary;
if (dbWithPrimary.usePrimaryOnly) {
finalDb = dbWithPrimary.usePrimaryOnly();
}
}
}
}
// Set database and context in Hono context
c.set("db", finalDb);
c.set("session", session);
c.set("teamId", teamId);
await next();
};
|