| import axios from "axios"; |
| import config from "../config.js"; |
| import { getJob, updateJobStatus } from "./jobService.js"; |
| import { sendEvent, closeConnection } from "../lib/sse.js"; |
|
|
| const AXIOS_TIMEOUT = 120000; |
|
|
| |
| |
| |
| |
| |
| async function pollUntilReady(url, intervalMs = 2000, maxAttempts = 120) { |
| for (let i = 0; i < maxAttempts; i++) { |
| const res = await axios.get(url, { timeout: AXIOS_TIMEOUT }); |
| if (res.data.status !== "processing") { |
| return res.data; |
| } |
| await new Promise((r) => setTimeout(r, intervalMs)); |
| } |
| throw new Error(`Polling timed out after ${maxAttempts} attempts for ${url}`); |
| } |
|
|
| |
| |
| async function callStage(jobId, stageName, percent, apiFn) { |
| sendEvent(jobId, { |
| type: "progress", |
| stage: stageName, |
| message: "", |
| percent, |
| }); |
|
|
| let data; |
| try { |
| data = await apiFn(); |
| } catch (err) { |
| |
| if ( |
| err.response && |
| err.response.data && |
| err.response.data.databricks_timeout |
| ) { |
| sendEvent(jobId, { |
| type: "failover", |
| stage: stageName, |
| message: |
| "Databricks latency detected. Failing over to local DuckDB execution...", |
| percent, |
| }); |
| data = await apiFn(); |
| } else { |
| throw err; |
| } |
| } |
| return data; |
| } |
|
|
| async function runPipeline(jobId) { |
| let stage = "INIT"; |
| let percent = 5; |
|
|
| try { |
| |
| stage = "INIT"; |
| percent = 5; |
| sendEvent(jobId, { |
| type: "progress", |
| stage, |
| message: "Job initialized. Validating uploaded files...", |
| percent, |
| }); |
|
|
| const job = await getJob(jobId); |
| if (!job) throw new Error("Job not found in database"); |
| if (!job.files || job.files.length === 0) |
| throw new Error("No uploaded files found for this job"); |
|
|
| let pdfResult = { |
| tables_extracted: 0, |
| scanned_pages: [], |
| text_path: "", |
| ratios: {}, |
| }; |
| let fraudFeatures = {}; |
| let entities = {}; |
| let financialJson = {}; |
| let graphResult = { nodes: [], edges: [] }; |
| let researchFindings = {}; |
| let scoringResult = {}; |
| let stressResults = {}; |
| let camResult = { cam_text: "", cam_sections: {}, citations: [] }; |
| let structurallyFragile = false; |
| const tmpPath = `${config.sharedTmpPath}/${jobId}`; |
|
|
| |
| stage = "GO_PDF"; |
| percent = 10; |
| pdfResult = await callStage(jobId, stage, percent, async () => { |
| sendEvent(jobId, { |
| type: "progress", |
| stage, |
| message: "Go service parsing PDFs concurrently via goroutines...", |
| percent, |
| }); |
| const res = await axios.post( |
| `${config.goServiceUrl}/parse`, |
| { job_id: jobId, tmp_path: tmpPath }, |
| { timeout: AXIOS_TIMEOUT }, |
| ); |
| return res.data; |
| }); |
|
|
| |
| stage = "GO_FRAUD"; |
| percent = 16; |
| fraudFeatures = await callStage(jobId, stage, percent, async () => { |
| sendEvent(jobId, { |
| type: "progress", |
| stage, |
| message: |
| "Fraud math engine running GST-Bank variance analysis in 4ms...", |
| percent, |
| }); |
| const res = await axios.post( |
| `${config.goServiceUrl}/fraud`, |
| { job_id: jobId, tmp_path: tmpPath }, |
| { timeout: AXIOS_TIMEOUT }, |
| ); |
| return res.data; |
| }); |
|
|
| |
| |
| |
| |
| stage = "AI_OCR"; |
| percent = 22; |
|
|
| |
| const pdfFile = job.files.find((f) => |
| f.original_name.toLowerCase().endsWith(".pdf"), |
| ); |
|
|
| if (pdfFile) { |
| await callStage(jobId, stage, percent, async () => { |
| sendEvent(jobId, { |
| type: "progress", |
| stage, |
| message: |
| "DeepSeek-OCR processing document pages, classifying and extracting...", |
| percent, |
| }); |
|
|
| const filePath = `${tmpPath}/${pdfFile.file_type}__${pdfFile.original_name}`; |
|
|
| |
| await axios.post( |
| `${config.aiServiceUrl}/api/v1/process-document`, |
| { |
| job_id: jobId, |
| file_path: filePath, |
| doc_type: pdfFile.file_type || "annual_report", |
| }, |
| { timeout: AXIOS_TIMEOUT }, |
| ); |
|
|
| |
| const ocrResult = await pollUntilReady( |
| `${config.aiServiceUrl}/api/v1/status/${jobId}`, |
| 2000, |
| 90, |
| ); |
|
|
| |
| if (ocrResult.result) { |
| if (ocrResult.result.entity_extraction) { |
| entities = ocrResult.result.entity_extraction; |
| } |
| if (ocrResult.result.financial_extraction) { |
| financialJson = ocrResult.result.financial_extraction; |
| } |
| } |
|
|
| return ocrResult; |
| }); |
| } else { |
| sendEvent(jobId, { |
| type: "progress", |
| stage, |
| message: "No PDF files found. Skipping OCR stage.", |
| percent, |
| }); |
| } |
|
|
| |
| |
| |
| stage = "AI_NER"; |
| percent = 32; |
| sendEvent(jobId, { |
| type: "progress", |
| stage, |
| message: entities.company_name |
| ? `NER complete β extracted entity: ${entities.company_name}` |
| : "Entity extraction complete (from document processing pipeline).", |
| percent, |
| }); |
|
|
| |
| |
| |
| stage = "AI_RAG"; |
| percent = 38; |
| const ragData = await callStage(jobId, stage, percent, async () => { |
| |
| sendEvent(jobId, { |
| type: "progress", |
| stage, |
| message: |
| "RAG module embedding document chunks into Qdrant vector store...", |
| percent: 38, |
| }); |
|
|
| |
| const VALID_RAG_DOC_TYPES = new Set([ |
| "annual_report", |
| "rating_report", |
| "legal_notice", |
| "gst_filing", |
| "gst_3b", |
| "gst_2a", |
| "gst_1", |
| ]); |
| const docTypes = [...new Set(job.files.map((f) => f.file_type))].filter( |
| (t) => VALID_RAG_DOC_TYPES.has(t), |
| ); |
| if (docTypes.length === 0) docTypes.push("annual_report"); |
|
|
| await axios.post( |
| `${config.aiServiceUrl}/api/v1/rag/ingest`, |
| { |
| job_id: jobId, |
| company_name: job.company_name, |
| doc_types: docTypes, |
| }, |
| { timeout: AXIOS_TIMEOUT }, |
| ); |
|
|
| |
| await pollUntilReady( |
| `${config.aiServiceUrl}/api/v1/rag/ingest-status/${jobId}`, |
| 2000, |
| 90, |
| ); |
|
|
| |
| sendEvent(jobId, { |
| type: "progress", |
| stage, |
| message: |
| "Claude structured extraction running on RAG-retrieved chunks...", |
| percent: 42, |
| }); |
|
|
| await axios.post( |
| `${config.aiServiceUrl}/api/v1/rag/extract`, |
| { job_id: jobId }, |
| { timeout: AXIOS_TIMEOUT }, |
| ); |
|
|
| |
| const extractionResult = await pollUntilReady( |
| `${config.aiServiceUrl}/api/v1/rag/extraction/${jobId}`, |
| 2000, |
| 90, |
| ); |
|
|
| return extractionResult; |
| }); |
|
|
| |
| if (ragData && ragData.status === "ready") { |
| financialJson = { ...financialJson, ...ragData }; |
| } |
|
|
| |
| stage = "AI_GRAPH"; |
| percent = 50; |
| graphResult = await callStage(jobId, stage, percent, async () => { |
| sendEvent(jobId, { |
| type: "progress", |
| stage, |
| message: |
| "Building entity relationship graph to detect related-party anomalies...", |
| percent, |
| }); |
|
|
| const borrowerName = |
| entities.company_name || job.company_name || "Unknown"; |
| const entityExtractionPath = `${tmpPath}/ocr_output.json`; |
|
|
| |
| await axios.post( |
| `${config.aiServiceUrl}/api/v1/entity-graph/build`, |
| { |
| job_id: jobId, |
| borrower_name: borrowerName, |
| entity_extraction_path: entityExtractionPath, |
| }, |
| { timeout: AXIOS_TIMEOUT }, |
| ); |
|
|
| |
| const graphData = await pollUntilReady( |
| `${config.aiServiceUrl}/api/v1/entity-graph/${jobId}`, |
| 2000, |
| 90, |
| ); |
|
|
| return { |
| nodes: graphData.nodes || [], |
| edges: graphData.edges || [], |
| }; |
| }); |
|
|
| |
| stage = "AI_RESEARCH"; |
| percent = 60; |
| researchFindings = await callStage(jobId, stage, percent, async () => { |
| sendEvent(jobId, { |
| type: "progress", |
| stage, |
| message: |
| "Research agent searching NCLT, eCourts, news, and regulatory databases...", |
| percent, |
| }); |
|
|
| |
| const promoterNames = []; |
| if (entities.promoters && Array.isArray(entities.promoters)) { |
| for (const p of entities.promoters) { |
| if (p.name) promoterNames.push(p.name); |
| } |
| } |
|
|
| |
| await axios.post( |
| `${config.aiServiceUrl}/api/v1/research-agent/run`, |
| { |
| job_id: jobId, |
| company_name: job.company_name, |
| promoter_names: promoterNames, |
| industry: job.industry || null, |
| cin: entities.cin || null, |
| }, |
| { timeout: AXIOS_TIMEOUT }, |
| ); |
|
|
| |
| const researchData = await pollUntilReady( |
| `${config.aiServiceUrl}/api/v1/research-agent/status/${jobId}`, |
| 3000, |
| 80, |
| ); |
|
|
| return researchData; |
| }); |
|
|
| |
| |
| |
| |
| |
| stage = "AI_SCORING"; |
| percent = 72; |
| scoringResult = await callStage(jobId, stage, percent, async () => { |
| sendEvent(jobId, { |
| type: "progress", |
| stage, |
| message: |
| "LightGBM 4-model ensemble computing risk score with SHAP explainability...", |
| percent, |
| }); |
|
|
| |
| await axios.post( |
| `${config.aiServiceUrl}/api/v1/scoring/run`, |
| { job_id: jobId }, |
| { timeout: AXIOS_TIMEOUT }, |
| ); |
|
|
| |
| const scoreData = await pollUntilReady( |
| `${config.aiServiceUrl}/api/v1/scoring/result/${jobId}`, |
| 2000, |
| 90, |
| ); |
|
|
| return scoreData.result || scoreData; |
| }); |
|
|
| |
| |
| |
| stage = "AI_STRESS"; |
| percent = 82; |
| sendEvent(jobId, { |
| type: "progress", |
| stage, |
| message: |
| "Stress scenarios extracted: Revenue Shock, Rate Hike, GST Scrutiny...", |
| percent, |
| }); |
|
|
| stressResults = scoringResult.stress_tests || {}; |
| structurallyFragile = scoringResult.structurally_fragile || false; |
|
|
| |
| |
| stage = "AI_CAM"; |
| percent = 90; |
| camResult = await callStage(jobId, stage, percent, async () => { |
| sendEvent(jobId, { |
| type: "progress", |
| stage, |
| message: |
| "3-persona credit committee generating Credit Appraisal Memo...", |
| percent, |
| }); |
|
|
| |
| await axios.post( |
| `${config.aiServiceUrl}/api/v1/cam/generate`, |
| { job_id: jobId }, |
| { timeout: AXIOS_TIMEOUT }, |
| ); |
|
|
| |
| const camData = await pollUntilReady( |
| `${config.aiServiceUrl}/api/v1/cam/result/${jobId}`, |
| 3000, |
| 60, |
| ); |
|
|
| return camData.result || camData; |
| }); |
|
|
| |
| stage = "COMPLETE"; |
| percent = 100; |
|
|
| |
| const scoreBreakdown = { |
| final_score: scoringResult.final_score, |
| decision: scoringResult.decision, |
| loan_limit_crore: scoringResult.loan_limit_crore, |
| interest_rate_pct: scoringResult.interest_rate_pct, |
| interest_rate_str: scoringResult.interest_rate_pct |
| ? `${scoringResult.interest_rate_pct}%` |
| : null, |
| decision_reason: scoringResult.decision_reason, |
| layer1_rule_based: scoringResult.layer1_score, |
| layer2_ml_refinement: |
| scoringResult.capped_deviation ?? |
| scoringResult.final_score - scoringResult.layer1_score, |
| model_1_financial_health: |
| scoringResult.weighted_financial_health ?? |
| scoringResult.score_financial_health, |
| model_2_credit_behaviour: |
| scoringResult.weighted_credit_behaviour ?? |
| scoringResult.score_credit_behaviour, |
| model_3_external_risk: |
| scoringResult.weighted_external_risk ?? |
| scoringResult.score_external_risk, |
| model_4_text_risk: |
| scoringResult.weighted_text_signals ?? scoringResult.score_text_signals, |
| confidence: scoringResult.distribution_anomaly |
| ? "ANOMALY_CAPPED" |
| : "HIGH", |
| }; |
|
|
| |
| const shapValues = (scoringResult.shap_drivers || []).map((s) => ({ |
| feature: s.human_label || s.feature, |
| value: s.shap_value, |
| impact: -(s.shap_value || 0), |
| source: s.feature, |
| })); |
|
|
| |
| const baseDecision = scoringResult.decision; |
| const SCENARIO_KEY_MAP = { |
| revenue_shock: "revenue_shock", |
| rate_hike_200bps: "rate_hike", |
| gst_scrutiny: "gst_scrutiny", |
| }; |
| const stressResultsArray = Object.entries(stressResults || {}) |
| .filter(([k]) => !k.startsWith("_")) |
| .map(([key, sr]) => ({ |
| scenario: SCENARIO_KEY_MAP[key.toLowerCase()] || key.toLowerCase(), |
| flipped: sr.flipped || false, |
| original_decision: baseDecision, |
| stressed_decision: sr.decision, |
| stressed_score: sr.stressed_score, |
| recommendation: sr.action, |
| })); |
|
|
| |
| const mappedNodes = (graphResult.nodes || []).map((n) => ({ |
| id: n.id, |
| name: n.label || n.name || "Unknown", |
| type: (n.type || "company").toLowerCase(), |
| risk_level: n.is_flagged ? "HIGH" : n.is_borrower ? "MEDIUM" : "LOW", |
| historical_match: n.flag_type === "HISTORICAL_REJECTION_MATCH" || false, |
| })); |
| const mappedEdges = (graphResult.edges || []).map((e) => ({ |
| source: e.source, |
| target: e.target, |
| relationship: e.label || e.type, |
| amount_crore: e.properties?.amount_crore || null, |
| is_probable_match: e.properties?.confidence === "PROBABLE_MATCH" || false, |
| })); |
|
|
| |
| const rawFindings = researchFindings || {}; |
| const mappedKeyFindings = (rawFindings.key_findings || []).map((f, idx) => { |
| if (typeof f === "string") { |
| const text = f.toLowerCase(); |
| let severity = "MEDIUM"; |
| if ( |
| text.includes("nclt") || |
| text.includes("ed ") || |
| text.includes("cbi") || |
| text.includes("fraud") || |
| text.includes("arrest") |
| ) |
| severity = "CRITICAL"; |
| else if ( |
| text.includes("default") || |
| text.includes("npa") || |
| text.includes("downgrad") |
| ) |
| severity = "HIGH"; |
| else if (text.includes("rating") || text.includes("compliance")) |
| severity = "MEDIUM"; |
| else severity = "LOW"; |
| return { |
| severity, |
| finding: f, |
| source_url: (rawFindings.sources || [])[idx] || null, |
| is_verified: true, |
| }; |
| } |
| return f; |
| }); |
| const researchForFrontend = { |
| promoter_risk: rawFindings.promoter_risk || "LOW", |
| litigation_risk: rawFindings.litigation_risk || "NONE", |
| sector_risk: rawFindings.sector_risk || "NEUTRAL", |
| sector_sentiment_score: rawFindings.sector_sentiment_score ?? 0, |
| key_findings: mappedKeyFindings, |
| news_articles: (rawFindings.news_articles || []).map((a) => ({ |
| title: a.title || "", |
| url: a.url || "", |
| snippet: (a.snippet || "").slice(0, 300), |
| category: a.category || "", |
| })), |
| rejected_findings: (rawFindings.rejected_findings || []).map((rf) => ({ |
| title: rf.title || "", |
| url: rf.url || "", |
| reason: rf.reason || "", |
| confidence_band: rf.confidence_band || "DISCARDED", |
| })), |
| }; |
|
|
| const camSections = camResult.cam_sections || |
| camResult.sections || { |
| forensic_accountant: |
| camResult.accountant_output?.content || camResult.cam_text || "", |
| compliance_officer: camResult.compliance_output?.content || "", |
| chief_risk_officer: camResult.cro_output?.content || "", |
| }; |
|
|
| const analysisResult = { |
| job_id: jobId, |
| company_name: job.company_name, |
| industry: job.industry || null, |
| fraud_features: fraudFeatures, |
| score_breakdown: scoreBreakdown, |
| shap_values: shapValues, |
| shap_by_model: scoringResult.shap_by_model || {}, |
| stress_results: stressResultsArray, |
| entity_nodes: mappedNodes, |
| entity_edges: mappedEdges, |
| research_findings: researchForFrontend, |
| officer_notes_applied: false, |
| officer_score_delta: 0, |
| cam_generated: true, |
| cam_text: |
| camResult.cam_text || camResult.accountant_output?.content || "", |
| cam_sections: camSections, |
| citations: camResult.citations || camResult.audit_trail || [], |
| structurally_fragile: structurallyFragile, |
| processing_time_seconds: null, |
| }; |
|
|
| await updateJobStatus(jobId, "completed", analysisResult); |
|
|
| sendEvent(jobId, { |
| type: "complete", |
| stage: "COMPLETE", |
| message: "Analysis complete. Credit decision ready.", |
| percent: 100, |
| data: analysisResult, |
| }); |
|
|
| closeConnection(jobId); |
| } catch (err) { |
| sendEvent(jobId, { |
| type: "error", |
| stage, |
| message: err.message || "Pipeline failed", |
| percent, |
| }); |
| await updateJobStatus(jobId, "failed", null).catch(() => {}); |
| closeConnection(jobId); |
| } |
| } |
|
|
| export { runPipeline }; |
|
|