Spaces:
Sleeping
Sleeping
| /** | |
| * GitHub Sync Service | |
| * | |
| * Handles synchronization of issues and PRs from GitHub to the database. | |
| * Uses ETag caching for efficient API usage (5,000 req/hr limit). | |
| * Uses Promise.allSettled for resilient parallel fetching. | |
| */ | |
| import { Octokit } from "@octokit/rest"; | |
| import { db } from "@/db"; | |
| import { issues, repositories, triageData, users } from "@/db/schema"; | |
| import { eq, and, inArray } from "drizzle-orm"; | |
| import { v4 as uuidv4 } from "uuid"; | |
| import { GitHubIssue, AuthorAssociation } from "@/lib/types/github"; | |
| import { publishIssueCreated, publishIssueUpdated, publishIssueDeleted, publishSyncComplete, isAblyConfigured } from "@/lib/ably-client"; | |
| // Sync interval: 5 minutes | |
| export const SYNC_INTERVAL_MS = 300000; | |
| // ============================================================================= | |
| // Types | |
| // ============================================================================= | |
| interface SyncResult { | |
| repoId: string; | |
| repoName: string; | |
| success: boolean; | |
| created: number; | |
| updated: number; | |
| deleted: number; | |
| skipped: boolean; // True if 304 Not Modified | |
| error?: string; | |
| } | |
| interface SyncStats { | |
| reposProcessed: number; | |
| reposSkipped: number; // Repos that returned 304 Not Modified | |
| issuesUpdated: number; | |
| issuesDeleted: number; | |
| errors: string[]; | |
| } | |
| interface SyncOptions { | |
| role?: 'MAINTAINER' | 'CONTRIBUTOR'; | |
| username?: string; // For contributor-specific filtering | |
| } | |
| // ============================================================================= | |
| // Issue Deletion | |
| // ============================================================================= | |
| export async function deleteIssue(issueId: string): Promise<void> { | |
| // Triage data will cascade delete due to foreign key | |
| await db.delete(triageData).where(eq(triageData.issueId, issueId)); | |
| await db.delete(issues).where(eq(issues.id, issueId)); | |
| } | |
| export async function deleteIssueByGithubId(githubIssueId: number): Promise<void> { | |
| const issue = await db.select() | |
| .from(issues) | |
| .where(eq(issues.githubIssueId, githubIssueId)) | |
| .limit(1); | |
| if (issue[0]) { | |
| await deleteIssue(issue[0].id); | |
| } | |
| } | |
| // ============================================================================= | |
| // Get Issues for Repository | |
| // ============================================================================= | |
| export async function getIssuesByRepoId(repoId: string) { | |
| return db.select({ | |
| id: issues.id, | |
| githubIssueId: issues.githubIssueId, | |
| number: issues.number, | |
| title: issues.title, | |
| body: issues.body, | |
| authorName: issues.authorName, | |
| repoId: issues.repoId, | |
| repoName: issues.repoName, | |
| owner: issues.owner, | |
| repo: issues.repo, | |
| htmlUrl: issues.htmlUrl, | |
| state: issues.state, | |
| isPR: issues.isPR, | |
| authorAssociation: issues.authorAssociation, | |
| headSha: issues.headSha, | |
| updatedAt: issues.updatedAt, | |
| createdAt: issues.createdAt, | |
| }).from(issues).where(eq(issues.repoId, repoId)); | |
| } | |
| // ============================================================================= | |
| // Update Repository ETag | |
| // ============================================================================= | |
| async function updateRepositoryEtag(repoId: string, etag: string | null): Promise<void> { | |
| await db.update(repositories) | |
| .set({ | |
| etag: etag, | |
| lastSyncedAt: new Date().toISOString() | |
| }) | |
| .where(eq(repositories.id, repoId)); | |
| } | |
| async function getRepositoryEtag(repoId: string): Promise<string | null> { | |
| const repo = await db.select({ etag: repositories.etag }) | |
| .from(repositories) | |
| .where(eq(repositories.id, repoId)) | |
| .limit(1); | |
| return repo[0]?.etag || null; | |
| } | |
| // ============================================================================= | |
| // Single Repository Sync (with ETag Caching) | |
| // ============================================================================= | |
| async function syncRepository( | |
| octokit: Octokit, | |
| repoId: string, | |
| owner: string, | |
| repo: string, | |
| options: SyncOptions = {} | |
| ): Promise<SyncResult> { | |
| const repoName = `${owner}/${repo}`; | |
| try { | |
| // Get stored ETag for conditional request | |
| const storedEtag = await getRepositoryEtag(repoId); | |
| // Build headers for conditional request | |
| const headers: Record<string, string> = {}; | |
| if (storedEtag) { | |
| headers['If-None-Match'] = storedEtag; | |
| } | |
| // Fetch open issues with conditional request | |
| let openItems: GitHubIssue[] = []; | |
| let newEtag: string | null = null; | |
| try { | |
| const response = await octokit.issues.listForRepo({ | |
| owner, | |
| repo, | |
| state: "open", | |
| per_page: 100, | |
| headers, | |
| }); | |
| // Get ETag from response headers | |
| newEtag = response.headers.etag || null; | |
| // If we got data, paginate for the rest | |
| openItems = response.data as GitHubIssue[]; | |
| // Paginate if there are more items | |
| if (response.data.length === 100) { | |
| const remainingItems = await octokit.paginate(octokit.issues.listForRepo, { | |
| owner, | |
| repo, | |
| state: "open", | |
| per_page: 100, | |
| page: 2, // Start from page 2 | |
| }); | |
| openItems = [...openItems, ...(remainingItems as GitHubIssue[])]; | |
| } | |
| } catch (error: any) { | |
| // Check for 304 Not Modified | |
| if (error.status === 304) { | |
| console.log(`[Sync] ${repoName}: No changes (304 Not Modified)`); | |
| // Update last synced timestamp even for 304 | |
| await db.update(repositories) | |
| .set({ lastSyncedAt: new Date().toISOString() }) | |
| .where(eq(repositories.id, repoId)); | |
| return { repoId, repoName, success: true, created: 0, updated: 0, deleted: 0, skipped: true }; | |
| } | |
| throw error; | |
| } | |
| // Update stored ETag if we got a new one | |
| if (newEtag) { | |
| await updateRepositoryEtag(repoId, newEtag); | |
| } | |
| // Get current issues in DB for this repo | |
| const rawDbIssues = await getIssuesByRepoId(repoId); | |
| // Deduplicate DB issues by number (same PR may have been inserted multiple times) | |
| const dbIssuesByNumber = new Map<number, typeof rawDbIssues[0]>(); | |
| const duplicateIds: string[] = []; | |
| for (const i of rawDbIssues) { | |
| if (dbIssuesByNumber.has(i.number)) { | |
| duplicateIds.push(i.id); // mark as duplicate for cleanup | |
| } else { | |
| dbIssuesByNumber.set(i.number, i); | |
| } | |
| } | |
| // Clean up existing duplicates in DB | |
| for (const id of duplicateIds) { | |
| await db.delete(issues).where(eq(issues.id, id)); | |
| } | |
| const dbIssues = Array.from(dbIssuesByNumber.values()); | |
| // Track GitHub issue numbers we've seen from the open list | |
| const openNumbers = new Set<number>(); | |
| // Track potential mentors to update roles | |
| const potentialMentors = new Set<string>(); | |
| let created = 0; | |
| let updated = 0; | |
| let deleted = 0; | |
| // Helper to check for mentor status | |
| const checkMentorStatus = (item: any) => { | |
| const assoc = item.author_association; | |
| if (assoc === "OWNER" || assoc === "MEMBER" || assoc === "COLLABORATOR") { | |
| if (item.user?.login) { | |
| potentialMentors.add(item.user.login); | |
| } | |
| } | |
| }; | |
| // For Contributors: filter to only their authored items (PRs and issues) | |
| const shouldIncludeItem = (item: GitHubIssue): boolean => { | |
| if (options.role === 'CONTRIBUTOR' && options.username) { | |
| // Contributors see their own authored PRs and issues | |
| return item.user.login === options.username; | |
| } | |
| // Maintainers see everything | |
| return true; | |
| }; | |
| // Process open items | |
| for (const ghItem of openItems) { | |
| openNumbers.add(ghItem.number); | |
| checkMentorStatus(ghItem); | |
| // Skip items that don't match role filter | |
| if (!shouldIncludeItem(ghItem)) { | |
| continue; | |
| } | |
| const isPR = !!ghItem.pull_request; | |
| const existingIssue = dbIssuesByNumber.get(ghItem.number); | |
| if (existingIssue) { | |
| // Update if state changed or other fields | |
| if (existingIssue.state !== ghItem.state || | |
| existingIssue.title !== ghItem.title || | |
| existingIssue.authorAssociation !== ghItem.author_association) { | |
| await db.update(issues) | |
| .set({ | |
| state: ghItem.state, | |
| title: ghItem.title, | |
| body: ghItem.body || null, | |
| authorAssociation: ghItem.author_association, | |
| }) | |
| .where(eq(issues.id, existingIssue.id)); | |
| updated++; | |
| if (isAblyConfigured()) { | |
| await publishIssueUpdated({ | |
| id: existingIssue.id, | |
| githubIssueId: ghItem.id, | |
| number: ghItem.number, | |
| title: ghItem.title, | |
| repoName, | |
| owner, | |
| repo, | |
| isPR, | |
| state: ghItem.state, | |
| }); | |
| } | |
| } | |
| } else { | |
| // Create new issue | |
| const newId = uuidv4(); | |
| await db.insert(issues).values({ | |
| id: newId, | |
| githubIssueId: ghItem.id, | |
| number: ghItem.number, | |
| title: ghItem.title, | |
| body: ghItem.body || null, | |
| authorName: ghItem.user.login, | |
| repoId, | |
| repoName, | |
| owner, | |
| repo, | |
| htmlUrl: ghItem.html_url, | |
| state: ghItem.state, | |
| isPR, | |
| authorAssociation: ghItem.author_association, | |
| createdAt: new Date().toISOString(), | |
| }).onConflictDoNothing(); | |
| created++; | |
| if (isAblyConfigured()) { | |
| await publishIssueCreated({ | |
| id: newId, | |
| githubIssueId: ghItem.id, | |
| number: ghItem.number, | |
| title: ghItem.title, | |
| repoName, | |
| owner, | |
| repo, | |
| isPR, | |
| state: ghItem.state, | |
| }); | |
| } | |
| } | |
| } | |
| // ========================================================================= | |
| // STATE RECONCILIATION: Delete issues/PRs that are closed on GitHub | |
| // If an issue is in our DB but NOT in GitHub's open list, delete it | |
| // ========================================================================= | |
| for (const dbIssue of dbIssues) { | |
| // If issue is in DB but NOT in GitHub's open list, it's closed - delete it | |
| if (!openNumbers.has(dbIssue.number)) { | |
| await db.delete(issues) | |
| .where(eq(issues.id, dbIssue.id)); | |
| deleted++; | |
| if (isAblyConfigured()) { | |
| await publishIssueDeleted({ | |
| id: dbIssue.id, | |
| githubIssueId: dbIssue.githubIssueId, | |
| number: dbIssue.number, | |
| title: dbIssue.title, | |
| repoName, | |
| owner, | |
| repo, | |
| isPR: dbIssue.isPR, | |
| state: 'closed', | |
| }); | |
| } | |
| console.log(`[Sync] ${repoName}: Deleted #${dbIssue.number} (closed on GitHub)`); | |
| } | |
| } | |
| // Update Mentor Roles | |
| if (potentialMentors.size > 0) { | |
| for (const username of potentialMentors) { | |
| // Determine if user exists and upgrade role if eligible | |
| // We only upgrade if role is null or CONTRIBUTOR. We don't touch MAINTAINER. | |
| const user = await db.select().from(users).where(eq(users.username, username)).limit(1); | |
| if (user.length > 0) { | |
| const currentRole = user[0].role; | |
| if (!currentRole || currentRole === 'CONTRIBUTOR') { | |
| await db.update(users) | |
| .set({ role: 'MENTOR' }) | |
| .where(eq(users.id, user[0].id)); | |
| } | |
| } | |
| } | |
| } | |
| return { repoId, repoName, success: true, created, updated, deleted, skipped: false }; | |
| } catch (error) { | |
| const errorMessage = error instanceof Error ? error.message : String(error); | |
| console.error(`Sync error for ${repoName}:`, errorMessage); | |
| return { repoId, repoName, success: false, created: 0, updated: 0, deleted: 0, skipped: false, error: errorMessage }; | |
| } | |
| } | |
| // ============================================================================= | |
| // Full Sync (All Repositories) | |
| // ============================================================================= | |
| export async function runFullSync( | |
| userId: string, | |
| accessToken: string, | |
| options: SyncOptions = {} | |
| ): Promise<SyncStats> { | |
| const octokit = new Octokit({ auth: accessToken }); | |
| // Get all repositories for this user | |
| const userRepos = await db.select() | |
| .from(repositories) | |
| .where(eq(repositories.userId, userId)); | |
| if (userRepos.length === 0) { | |
| return { reposProcessed: 0, reposSkipped: 0, issuesUpdated: 0, issuesDeleted: 0, errors: [] }; | |
| } | |
| // Sync all repos in parallel using Promise.allSettled | |
| const syncPromises = userRepos.map(repo => { | |
| // repo.name may contain full path like "owner/repo", extract just the repo name | |
| const repoNameOnly = repo.name.includes('/') ? repo.name.split('/')[1] : repo.name; | |
| return syncRepository(octokit, repo.id, repo.owner, repoNameOnly, options); | |
| }); | |
| const results = await Promise.allSettled(syncPromises); | |
| // Aggregate results | |
| const stats: SyncStats = { | |
| reposProcessed: 0, | |
| reposSkipped: 0, | |
| issuesUpdated: 0, | |
| issuesDeleted: 0, | |
| errors: [], | |
| }; | |
| for (const result of results) { | |
| if (result.status === "fulfilled") { | |
| const syncResult = result.value; | |
| stats.reposProcessed++; | |
| if (syncResult.skipped) { | |
| stats.reposSkipped++; | |
| } else { | |
| stats.issuesUpdated += syncResult.created + syncResult.updated; | |
| stats.issuesDeleted += syncResult.deleted; | |
| } | |
| if (!syncResult.success && syncResult.error) { | |
| stats.errors.push(`${syncResult.repoName}: ${syncResult.error}`); | |
| } | |
| } else { | |
| stats.errors.push(`Sync failed: ${result.reason}`); | |
| } | |
| } | |
| // Publish sync complete event | |
| if (isAblyConfigured()) { | |
| try { | |
| await publishSyncComplete({ | |
| reposProcessed: stats.reposProcessed, | |
| issuesUpdated: stats.issuesUpdated, | |
| issuesDeleted: stats.issuesDeleted, | |
| }); | |
| } catch (ablyError) { | |
| console.error("Failed to publish sync complete:", ablyError); | |
| } | |
| } | |
| console.log(`[Sync] Complete: ${stats.reposProcessed} repos, ${stats.reposSkipped} skipped (304), ${stats.issuesUpdated} updated, ${stats.issuesDeleted} deleted`); | |
| return stats; | |
| } | |
| // ============================================================================= | |
| // Sync Single Repository | |
| // ============================================================================= | |
| export async function syncSingleRepository( | |
| accessToken: string, | |
| repoId: string, | |
| owner: string, | |
| repo: string, | |
| options: SyncOptions = {} | |
| ): Promise<SyncResult> { | |
| const octokit = new Octokit({ auth: accessToken }); | |
| const result = await syncRepository(octokit, repoId, owner, repo, options); | |
| // ===== Auto-trigger RAG indexing after successful sync ===== | |
| if (result.success && (result.created > 0 || result.updated > 0)) { | |
| try { | |
| const aiEngineUrl = process.env.AI_ENGINE_URL || "http://localhost:7860"; | |
| const repoName = `${owner}/${repo}`; | |
| // Check if we need to index (avoid redundant indexing) | |
| const checkResponse = await fetch( | |
| `${aiEngineUrl}/rag/check-index?repo_name=${encodeURIComponent(repoName)}`, | |
| { | |
| headers: { | |
| "X-API-Key": process.env.API_KEY || "", | |
| "Content-Type": "application/json" | |
| } | |
| } | |
| ); | |
| if (checkResponse.ok) { | |
| const checkData = await checkResponse.json(); | |
| const chunkCount = checkData.chunk_count || 0; | |
| if (chunkCount < 100) { | |
| console.log(`[Sync] Triggering RAG indexing for ${repoName} (current chunks: ${chunkCount})`); | |
| // Fire-and-forget indexing (don't await to avoid blocking sync) | |
| fetch(`${aiEngineUrl}/rag/index`, { | |
| method: "POST", | |
| headers: { | |
| "Content-Type": "application/json", | |
| "X-API-Key": process.env.API_KEY || "", | |
| }, | |
| body: JSON.stringify({ | |
| repo_name: repoName, | |
| github_access_token: accessToken, | |
| }), | |
| }).catch((err) => { | |
| console.error(`[Sync] RAG indexing failed for ${repoName}:`, err.message); | |
| }); | |
| } else { | |
| console.log(`[Sync] Skipping RAG indexing for ${repoName}, already has ${chunkCount} chunks`); | |
| } | |
| } | |
| } catch (err) { | |
| // Log but don't fail the sync if RAG indexing check errors | |
| console.error("[Sync] RAG index check failed:", err); | |
| } | |
| } | |
| return result; | |
| } | |
| // ============================================================================= | |
| // Contributor Sync - Fetch all issues/PRs authored by this user from GitHub | |
| // ============================================================================= | |
| export async function runContributorSync( | |
| userId: string, | |
| username: string, | |
| accessToken: string | |
| ): Promise<SyncStats> { | |
| const octokit = new Octokit({ auth: accessToken }); | |
| const stats: SyncStats = { | |
| reposProcessed: 0, | |
| reposSkipped: 0, | |
| issuesUpdated: 0, | |
| issuesDeleted: 0, | |
| errors: [], | |
| }; | |
| try { | |
| // Search for all open issues and PRs authored by this user | |
| const [prsResponse, issuesResponse] = await Promise.all([ | |
| octokit.search.issuesAndPullRequests({ | |
| q: `author:${username} is:pr is:open`, | |
| per_page: 100, | |
| sort: "updated", | |
| order: "desc", | |
| }), | |
| octokit.search.issuesAndPullRequests({ | |
| q: `author:${username} is:issue is:open`, | |
| per_page: 100, | |
| sort: "updated", | |
| order: "desc", | |
| }) | |
| ]); | |
| const allGitHubItems = [...prsResponse.data.items, ...issuesResponse.data.items]; | |
| // Get existing issues for this contributor from DB | |
| const rawExistingIssues = await db.select() | |
| .from(issues) | |
| .where(eq(issues.authorName, username)); | |
| // Deduplicate by number+repoId, keeping first occurrence | |
| const existingByKey = new Map<string, typeof rawExistingIssues[0]>(); | |
| const contribDupIds: string[] = []; | |
| for (const i of rawExistingIssues) { | |
| const key = `${i.repoId}:${i.number}`; | |
| if (existingByKey.has(key)) { | |
| contribDupIds.push(i.id); | |
| } else { | |
| existingByKey.set(key, i); | |
| } | |
| } | |
| for (const id of contribDupIds) { | |
| await db.delete(issues).where(eq(issues.id, id)); | |
| } | |
| const existingIssues = Array.from(existingByKey.values()); | |
| // Cache for repo lookups/creates | |
| const repoCache = new Map<string, string>(); | |
| // Helper to get or create a repository entry | |
| const getOrCreateRepo = async (owner: string, repo: string, repoName: string): Promise<string> => { | |
| const cacheKey = repoName; | |
| if (repoCache.has(cacheKey)) { | |
| return repoCache.get(cacheKey)!; | |
| } | |
| // Check if repo exists in DB | |
| const existingRepo = await db.select({ id: repositories.id }) | |
| .from(repositories) | |
| .where(eq(repositories.name, repoName)) | |
| .limit(1); | |
| if (existingRepo[0]) { | |
| repoCache.set(cacheKey, existingRepo[0].id); | |
| return existingRepo[0].id; | |
| } | |
| // Create a new repository entry for contributor tracking | |
| const newRepoId = uuidv4(); | |
| await db.insert(repositories).values({ | |
| id: newRepoId, | |
| githubRepoId: 0, // Placeholder - we don't have the actual GitHub repo ID from search | |
| name: repoName, | |
| owner: owner, | |
| userId: userId, // Associate with the contributor | |
| createdAt: new Date().toISOString(), | |
| }).onConflictDoNothing(); | |
| repoCache.set(cacheKey, newRepoId); | |
| return newRepoId; | |
| }; | |
| // Track seen numbers per repo for reconciliation | |
| const openKeys = new Set<string>(); | |
| // Process each GitHub item | |
| for (const item of allGitHubItems) { | |
| const repoUrl = item.repository_url || ""; | |
| const repoMatch = repoUrl.match(/repos\/([^/]+)\/([^/]+)/); | |
| const owner = repoMatch?.[1] || ""; | |
| const repo = repoMatch?.[2] || ""; | |
| const repoName = `${owner}/${repo}`; | |
| const isPR = !!item.pull_request; | |
| // Get or create the repository entry (needed for key lookup) | |
| const repoId = await getOrCreateRepo(owner, repo, repoName); | |
| const key = `${repoId}:${item.number}`; | |
| openKeys.add(key); | |
| const existing = existingByKey.get(key); | |
| if (existing) { | |
| // Update if changed | |
| if (existing.state !== item.state || existing.title !== item.title) { | |
| await db.update(issues) | |
| .set({ | |
| state: item.state, | |
| title: item.title, | |
| body: item.body || null, | |
| }) | |
| .where(eq(issues.id, existing.id)); | |
| stats.issuesUpdated++; | |
| } | |
| } else { | |
| // Create new issue | |
| const newId = uuidv4(); | |
| await db.insert(issues).values({ | |
| id: newId, | |
| githubIssueId: item.id, | |
| number: item.number, | |
| title: item.title, | |
| body: item.body || null, | |
| authorName: item.user?.login || username, | |
| repoId: repoId, | |
| repoName, | |
| owner, | |
| repo, | |
| htmlUrl: item.html_url, | |
| state: item.state || "open", | |
| isPR, | |
| authorAssociation: item.author_association, | |
| createdAt: new Date().toISOString(), | |
| }).onConflictDoNothing(); | |
| stats.issuesUpdated++; | |
| } | |
| } | |
| // Delete issues that are no longer open on GitHub | |
| for (const existing of existingIssues) { | |
| const key = `${existing.repoId}:${existing.number}`; | |
| if (!openKeys.has(key)) { | |
| await db.delete(issues).where(eq(issues.id, existing.id)); | |
| stats.issuesDeleted++; | |
| } | |
| } | |
| console.log(`[ContributorSync] ${username}: ${stats.issuesUpdated} updated, ${stats.issuesDeleted} deleted`); | |
| } catch (error) { | |
| const errorMessage = error instanceof Error ? error.message : String(error); | |
| console.error(`[ContributorSync] Error for ${username}:`, errorMessage); | |
| stats.errors.push(errorMessage); | |
| } | |
| return stats; | |
| } | |
| // ============================================================================= | |
| // Maintainer Sync (All open issues/PRs) | |
| // ============================================================================= | |
| export async function runMaintainerSync( | |
| userId: string, | |
| accessToken: string | |
| ): Promise<SyncStats> { | |
| return runFullSync(userId, accessToken, { | |
| role: 'MAINTAINER', | |
| }); | |
| } | |
| // ============================================================================= | |
| // Reconcile Single Issue - Check specific issue state on GitHub | |
| // ============================================================================= | |
| interface IssueReconcileResult { | |
| success: boolean; | |
| owner: string; | |
| repo: string; | |
| issueNumber: number; | |
| githubState: 'open' | 'closed' | 'not_found'; | |
| dbState: 'open' | 'closed' | 'not_found'; | |
| action: 'none' | 'deleted' | 'updated' | 'created'; | |
| message: string; | |
| } | |
| /** | |
| * Reconcile a specific issue by checking its state on GitHub | |
| * and updating the database accordingly. | |
| * | |
| * This is useful for immediately syncing a specific issue | |
| * (e.g., cosmicMagnetar/openTriage#1) without waiting for full sync. | |
| */ | |
| export async function reconcileSingleIssue( | |
| accessToken: string, | |
| owner: string, | |
| repo: string, | |
| issueNumber: number | |
| ): Promise<IssueReconcileResult> { | |
| const octokit = new Octokit({ auth: accessToken }); | |
| const repoName = `${owner}/${repo}`; | |
| try { | |
| // 1. Check issue state on GitHub | |
| let githubIssue: GitHubIssue | null = null; | |
| let githubState: 'open' | 'closed' | 'not_found' = 'not_found'; | |
| try { | |
| const response = await octokit.issues.get({ | |
| owner, | |
| repo, | |
| issue_number: issueNumber, | |
| }); | |
| githubIssue = response.data as GitHubIssue; | |
| githubState = githubIssue.state as 'open' | 'closed'; | |
| } catch (error: any) { | |
| if (error.status === 404) { | |
| githubState = 'not_found'; | |
| } else { | |
| throw error; | |
| } | |
| } | |
| // 2. Check current state in database | |
| const dbIssue = await db.select() | |
| .from(issues) | |
| .where( | |
| and( | |
| eq(issues.owner, owner), | |
| eq(issues.repo, repo), | |
| eq(issues.number, issueNumber) | |
| ) | |
| ) | |
| .limit(1); | |
| const existingIssue = dbIssue[0]; | |
| const dbState: 'open' | 'closed' | 'not_found' = existingIssue | |
| ? (existingIssue.state as 'open' | 'closed') | |
| : 'not_found'; | |
| // 3. Reconcile based on states | |
| let action: 'none' | 'deleted' | 'updated' | 'created' = 'none'; | |
| let message = ''; | |
| if (githubState === 'closed' || githubState === 'not_found') { | |
| // Issue is closed or doesn't exist on GitHub - delete from DB | |
| if (existingIssue) { | |
| await db.delete(issues).where(eq(issues.id, existingIssue.id)); | |
| action = 'deleted'; | |
| message = `Issue #${issueNumber} is ${githubState} on GitHub - removed from database`; | |
| // Publish deletion event | |
| if (isAblyConfigured()) { | |
| await publishIssueDeleted({ | |
| id: existingIssue.id, | |
| githubIssueId: existingIssue.githubIssueId, | |
| number: issueNumber, | |
| title: existingIssue.title, | |
| repoName, | |
| owner, | |
| repo, | |
| isPR: existingIssue.isPR, | |
| state: 'closed', | |
| }); | |
| } | |
| console.log(`[Reconcile] ${repoName}#${issueNumber}: Deleted (${githubState} on GitHub)`); | |
| } else { | |
| message = `Issue #${issueNumber} is ${githubState} on GitHub and not in database - no action needed`; | |
| } | |
| } else if (githubState === 'open' && githubIssue) { | |
| // Issue is open on GitHub | |
| if (existingIssue) { | |
| // Update existing issue | |
| if (existingIssue.state !== 'open' || existingIssue.title !== githubIssue.title) { | |
| await db.update(issues) | |
| .set({ | |
| state: 'open', | |
| title: githubIssue.title, | |
| body: githubIssue.body || null, | |
| }) | |
| .where(eq(issues.id, existingIssue.id)); | |
| action = 'updated'; | |
| message = `Issue #${issueNumber} updated to match GitHub state`; | |
| } else { | |
| message = `Issue #${issueNumber} is already in sync`; | |
| } | |
| } else { | |
| // Issue is open on GitHub but not in DB - would need repo context to create | |
| message = `Issue #${issueNumber} is open on GitHub but not tracked. Run full sync to add it.`; | |
| } | |
| } | |
| return { | |
| success: true, | |
| owner, | |
| repo, | |
| issueNumber, | |
| githubState, | |
| dbState, | |
| action, | |
| message, | |
| }; | |
| } catch (error) { | |
| const errorMessage = error instanceof Error ? error.message : String(error); | |
| console.error(`[Reconcile] Error for ${repoName}#${issueNumber}:`, errorMessage); | |
| return { | |
| success: false, | |
| owner, | |
| repo, | |
| issueNumber, | |
| githubState: 'not_found', | |
| dbState: 'not_found', | |
| action: 'none', | |
| message: `Error reconciling issue: ${errorMessage}`, | |
| }; | |
| } | |
| } | |
| /** | |
| * Reconcile the critical tracked issue: cosmicMagnetar/openTriage#1 | |
| * This ensures the issue state is immediately synced from GitHub. | |
| */ | |
| export async function reconcileOpenTriageIssue1(accessToken: string): Promise<IssueReconcileResult> { | |
| return reconcileSingleIssue(accessToken, 'cosmicMagnetar', 'openTriage', 1); | |
| } | |
| /** | |
| * Fetch user's PRs directly from repos they've contributed to using the Pulls API. | |
| * This bypasses the GitHub Search API indexing delay. | |
| * | |
| * We check repos where the user already has PRs tracked, plus repos from recent activity. | |
| */ | |
| export async function syncContributorPRsDirect( | |
| userId: string, | |
| username: string, | |
| accessToken: string | |
| ): Promise<{ added: number; updated: number; repos: string[] }> { | |
| const octokit = new Octokit({ auth: accessToken }); | |
| const result = { added: 0, updated: 0, repos: [] as string[] }; | |
| try { | |
| // Get unique repos where this user already has tracked issues | |
| const existingIssues = await db.select({ | |
| owner: issues.owner, | |
| repo: issues.repo, | |
| repoId: issues.repoId, | |
| }) | |
| .from(issues) | |
| .where(eq(issues.authorName, username)); | |
| const uniqueRepos = new Map<string, { owner: string; repo: string; repoId: string | null }>(); | |
| for (const issue of existingIssues) { | |
| const key = `${issue.owner}/${issue.repo}`; | |
| if (!uniqueRepos.has(key) && issue.owner && issue.repo) { | |
| uniqueRepos.set(key, { | |
| owner: issue.owner, | |
| repo: issue.repo, | |
| repoId: issue.repoId, | |
| }); | |
| } | |
| } | |
| // Also fetch recent activity to find new repos where user has PRs | |
| try { | |
| const events = await octokit.activity.listPublicEventsForUser({ | |
| username, | |
| per_page: 50, | |
| }); | |
| for (const event of events.data) { | |
| if (event.type === 'PullRequestEvent' && event.repo?.name) { | |
| const [owner, repo] = event.repo.name.split('/'); | |
| const key = event.repo.name; | |
| if (!uniqueRepos.has(key) && owner && repo) { | |
| uniqueRepos.set(key, { owner, repo, repoId: null }); | |
| } | |
| } | |
| } | |
| } catch (eventError) { | |
| // Ignore event fetch errors - just use existing repos | |
| console.log(`[DirectSync] Could not fetch events for ${username}`); | |
| } | |
| // Helper to get or create repo | |
| const getOrCreateRepo = async (owner: string, repo: string, repoName: string): Promise<string> => { | |
| const existingRepo = await db.select({ id: repositories.id }) | |
| .from(repositories) | |
| .where(eq(repositories.name, repoName)) | |
| .limit(1); | |
| if (existingRepo[0]) { | |
| return existingRepo[0].id; | |
| } | |
| const newRepoId = uuidv4(); | |
| await db.insert(repositories).values({ | |
| id: newRepoId, | |
| githubRepoId: 0, | |
| name: repoName, | |
| owner: owner, | |
| userId: userId, | |
| createdAt: new Date().toISOString(), | |
| }).onConflictDoNothing(); | |
| return newRepoId; | |
| }; | |
| // For each repo, fetch open PRs by this author directly from Pulls API | |
| for (const [repoName, repoInfo] of uniqueRepos) { | |
| try { | |
| const prs = await octokit.pulls.list({ | |
| owner: repoInfo.owner, | |
| repo: repoInfo.repo, | |
| state: 'open', | |
| per_page: 30, | |
| }); | |
| // Filter to only this user's PRs | |
| const userPRs = prs.data.filter(pr => pr.user?.login === username); | |
| for (const pr of userPRs) { | |
| // Get or create the repository entry first (needed for number+repoId check) | |
| const repoId = repoInfo.repoId || await getOrCreateRepo(repoInfo.owner, repoInfo.repo, repoName); | |
| // Check if PR already exists by number+repoId (true unique identifier) | |
| const existing = await db.select({ id: issues.id, state: issues.state }) | |
| .from(issues) | |
| .where(and(eq(issues.number, pr.number), eq(issues.repoId, repoId))) | |
| .limit(1); | |
| if (existing[0]) { | |
| // Update state if needed | |
| if (existing[0].state !== pr.state) { | |
| await db.update(issues) | |
| .set({ state: pr.state, title: pr.title, body: pr.body || null }) | |
| .where(eq(issues.id, existing[0].id)); | |
| result.updated++; | |
| } | |
| } else { | |
| // Create new PR entry | |
| const newId = uuidv4(); | |
| await db.insert(issues).values({ | |
| id: newId, | |
| githubIssueId: pr.id, | |
| number: pr.number, | |
| title: pr.title, | |
| body: pr.body || null, | |
| authorName: username, | |
| repoId: repoId, | |
| repoName, | |
| owner: repoInfo.owner, | |
| repo: repoInfo.repo, | |
| htmlUrl: pr.html_url, | |
| state: pr.state || 'open', | |
| isPR: true, | |
| authorAssociation: pr.author_association as AuthorAssociation, | |
| createdAt: new Date().toISOString(), | |
| }).onConflictDoNothing(); | |
| result.added++; | |
| } | |
| } | |
| if (userPRs.length > 0) { | |
| result.repos.push(repoName); | |
| } | |
| } catch (repoError) { | |
| // Skip repos we can't access (private, deleted, etc.) | |
| console.log(`[DirectSync] Skipping ${repoName}: ${repoError instanceof Error ? repoError.message : String(repoError)}`); | |
| } | |
| } | |
| console.log(`[DirectSync] ${username}: ${result.added} added, ${result.updated} updated from ${result.repos.length} repos`); | |
| } catch (error) { | |
| console.error(`[DirectSync] Error for ${username}:`, error); | |
| } | |
| return result; | |
| } | |