|
|
""" |
|
|
FinEE API - FastAPI Backend |
|
|
============================ |
|
|
|
|
|
RESTful API for financial entity extraction with: |
|
|
- Single/batch extraction endpoints |
|
|
- RAG-enhanced extraction |
|
|
- PDF/Image processing |
|
|
- Multi-turn chat |
|
|
- Analytics |
|
|
|
|
|
Author: Ranjit Behera |
|
|
""" |
|
|
|
|
|
import os |
|
|
import json |
|
|
import logging |
|
|
from datetime import datetime |
|
|
from typing import List, Dict, Optional, Any |
|
|
from pathlib import Path |
|
|
|
|
|
from fastapi import FastAPI, HTTPException, UploadFile, File, BackgroundTasks |
|
|
from fastapi.middleware.cors import CORSMiddleware |
|
|
from fastapi.responses import JSONResponse |
|
|
from pydantic import BaseModel, Field |
|
|
import uvicorn |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ExtractionRequest(BaseModel): |
|
|
"""Single message extraction request.""" |
|
|
message: str = Field(..., description="Bank SMS or email to extract from") |
|
|
use_rag: bool = Field(True, description="Use RAG for context-aware extraction") |
|
|
use_llm: bool = Field(False, description="Use LLM for complex cases") |
|
|
|
|
|
|
|
|
class BatchExtractionRequest(BaseModel): |
|
|
"""Batch extraction request.""" |
|
|
messages: List[str] = Field(..., description="List of messages to extract") |
|
|
use_rag: bool = True |
|
|
use_llm: bool = False |
|
|
|
|
|
|
|
|
class ExtractionResult(BaseModel): |
|
|
"""Extraction result.""" |
|
|
amount: Optional[float] = None |
|
|
type: Optional[str] = None |
|
|
account: Optional[str] = None |
|
|
bank: Optional[str] = None |
|
|
date: Optional[str] = None |
|
|
time: Optional[str] = None |
|
|
reference: Optional[str] = None |
|
|
merchant: Optional[str] = None |
|
|
beneficiary: Optional[str] = None |
|
|
vpa: Optional[str] = None |
|
|
category: Optional[str] = None |
|
|
is_p2m: Optional[bool] = None |
|
|
balance: Optional[float] = None |
|
|
status: Optional[str] = None |
|
|
confidence: float = 0.0 |
|
|
|
|
|
|
|
|
class ExtractionResponse(BaseModel): |
|
|
"""API response for extraction.""" |
|
|
success: bool |
|
|
data: Optional[ExtractionResult] = None |
|
|
raw_text: Optional[str] = None |
|
|
rag_context: Optional[Dict] = None |
|
|
processing_time_ms: float = 0 |
|
|
error: Optional[str] = None |
|
|
|
|
|
|
|
|
class ChatMessage(BaseModel): |
|
|
"""Chat message.""" |
|
|
role: str = Field(..., description="'user' or 'assistant'") |
|
|
content: str |
|
|
|
|
|
|
|
|
class ChatRequest(BaseModel): |
|
|
"""Chat request for multi-turn analysis.""" |
|
|
messages: List[ChatMessage] |
|
|
context: Optional[Dict] = None |
|
|
|
|
|
|
|
|
class AnalyticsRequest(BaseModel): |
|
|
"""Analytics request.""" |
|
|
transactions: List[Dict] |
|
|
period: Optional[str] = "month" |
|
|
group_by: Optional[str] = "category" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
app = FastAPI( |
|
|
title="FinEE API", |
|
|
description="Financial Entity Extraction API for Indian Banking", |
|
|
version="2.0.0", |
|
|
docs_url="/docs", |
|
|
redoc_url="/redoc", |
|
|
) |
|
|
|
|
|
|
|
|
app.add_middleware( |
|
|
CORSMiddleware, |
|
|
allow_origins=["*"], |
|
|
allow_credentials=True, |
|
|
allow_methods=["*"], |
|
|
allow_headers=["*"], |
|
|
) |
|
|
|
|
|
|
|
|
_extractor = None |
|
|
_rag_engine = None |
|
|
|
|
|
|
|
|
def get_extractor(): |
|
|
"""Lazy load extractor.""" |
|
|
global _extractor |
|
|
if _extractor is None: |
|
|
try: |
|
|
from finee import FinancialExtractor |
|
|
_extractor = FinancialExtractor(use_llm=False) |
|
|
logger.info("Extractor initialized") |
|
|
except ImportError: |
|
|
logger.warning("FinEE not installed, using mock extractor") |
|
|
_extractor = MockExtractor() |
|
|
return _extractor |
|
|
|
|
|
|
|
|
def get_rag_engine(): |
|
|
"""Lazy load RAG engine.""" |
|
|
global _rag_engine |
|
|
if _rag_engine is None: |
|
|
try: |
|
|
from finee.rag import RAGEngine |
|
|
_rag_engine = RAGEngine() |
|
|
logger.info("RAG engine initialized") |
|
|
except ImportError: |
|
|
logger.warning("RAG not available") |
|
|
_rag_engine = None |
|
|
return _rag_engine |
|
|
|
|
|
|
|
|
class MockExtractor: |
|
|
"""Mock extractor for testing.""" |
|
|
def extract(self, text: str) -> Dict: |
|
|
import re |
|
|
result = {} |
|
|
|
|
|
|
|
|
amount_match = re.search(r'Rs\.?\s*([\d,]+(?:\.\d{2})?)', text) |
|
|
if amount_match: |
|
|
result['amount'] = float(amount_match.group(1).replace(',', '')) |
|
|
|
|
|
|
|
|
if any(w in text.lower() for w in ['debit', 'debited', 'paid', 'spent']): |
|
|
result['type'] = 'debit' |
|
|
elif any(w in text.lower() for w in ['credit', 'credited', 'received']): |
|
|
result['type'] = 'credit' |
|
|
|
|
|
return result |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.get("/") |
|
|
async def root(): |
|
|
"""Health check.""" |
|
|
return { |
|
|
"status": "healthy", |
|
|
"service": "FinEE API", |
|
|
"version": "2.0.0", |
|
|
"timestamp": datetime.utcnow().isoformat() |
|
|
} |
|
|
|
|
|
|
|
|
@app.get("/health") |
|
|
async def health(): |
|
|
"""Detailed health check.""" |
|
|
return { |
|
|
"status": "healthy", |
|
|
"components": { |
|
|
"extractor": _extractor is not None, |
|
|
"rag": _rag_engine is not None, |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
@app.post("/extract", response_model=ExtractionResponse) |
|
|
async def extract(request: ExtractionRequest): |
|
|
""" |
|
|
Extract financial entities from a single message. |
|
|
|
|
|
- **message**: The bank SMS, email, or notification text |
|
|
- **use_rag**: Enable RAG for context-aware extraction |
|
|
- **use_llm**: Use LLM for complex cases (slower but more accurate) |
|
|
""" |
|
|
import time |
|
|
start = time.time() |
|
|
|
|
|
try: |
|
|
extractor = get_extractor() |
|
|
rag = get_rag_engine() if request.use_rag else None |
|
|
|
|
|
|
|
|
rag_context = None |
|
|
if rag: |
|
|
context = rag.retrieve(request.message) |
|
|
rag_context = { |
|
|
"merchant_info": context.merchant_info, |
|
|
"similar_transactions": context.similar_transactions, |
|
|
"category_hierarchy": context.category_hierarchy, |
|
|
} |
|
|
|
|
|
|
|
|
result = extractor.extract(request.message) |
|
|
|
|
|
|
|
|
if rag_context and rag_context.get("merchant_info"): |
|
|
if not result.get("merchant"): |
|
|
result["merchant"] = rag_context["merchant_info"]["name"] |
|
|
if not result.get("category"): |
|
|
result["category"] = rag_context["merchant_info"]["category"] |
|
|
if "is_p2m" not in result: |
|
|
result["is_p2m"] = rag_context["merchant_info"]["is_p2m"] |
|
|
|
|
|
processing_time = (time.time() - start) * 1000 |
|
|
|
|
|
return ExtractionResponse( |
|
|
success=True, |
|
|
data=ExtractionResult(**result), |
|
|
raw_text=request.message, |
|
|
rag_context=rag_context, |
|
|
processing_time_ms=round(processing_time, 2) |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Extraction failed: {e}") |
|
|
return ExtractionResponse( |
|
|
success=False, |
|
|
error=str(e), |
|
|
processing_time_ms=round((time.time() - start) * 1000, 2) |
|
|
) |
|
|
|
|
|
|
|
|
@app.post("/extract/batch") |
|
|
async def extract_batch(request: BatchExtractionRequest): |
|
|
""" |
|
|
Extract entities from multiple messages. |
|
|
|
|
|
- **messages**: List of messages to process |
|
|
- Returns list of extraction results |
|
|
""" |
|
|
results = [] |
|
|
|
|
|
for message in request.messages: |
|
|
req = ExtractionRequest( |
|
|
message=message, |
|
|
use_rag=request.use_rag, |
|
|
use_llm=request.use_llm |
|
|
) |
|
|
result = await extract(req) |
|
|
results.append(result) |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"total": len(results), |
|
|
"successful": sum(1 for r in results if r.success), |
|
|
"results": results |
|
|
} |
|
|
|
|
|
|
|
|
@app.post("/parse/pdf") |
|
|
async def parse_pdf(file: UploadFile = File(...)): |
|
|
""" |
|
|
Parse bank statement PDF and extract transactions. |
|
|
|
|
|
- **file**: PDF file of bank statement |
|
|
- Returns list of extracted transactions |
|
|
""" |
|
|
if not file.filename.endswith('.pdf'): |
|
|
raise HTTPException(400, "Only PDF files are supported") |
|
|
|
|
|
try: |
|
|
|
|
|
content = await file.read() |
|
|
|
|
|
|
|
|
transactions = [] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"filename": file.filename, |
|
|
"transactions": transactions, |
|
|
"message": "PDF parsing not yet implemented" |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
raise HTTPException(500, f"PDF parsing failed: {e}") |
|
|
|
|
|
|
|
|
@app.post("/parse/image") |
|
|
async def parse_image(file: UploadFile = File(...)): |
|
|
""" |
|
|
Parse screenshot/image using OCR and extract entities. |
|
|
|
|
|
- **file**: Image file (PNG, JPG) |
|
|
- Returns extracted text and entities |
|
|
""" |
|
|
allowed = ['.png', '.jpg', '.jpeg', '.webp'] |
|
|
ext = Path(file.filename).suffix.lower() |
|
|
|
|
|
if ext not in allowed: |
|
|
raise HTTPException(400, f"Only {allowed} files are supported") |
|
|
|
|
|
try: |
|
|
content = await file.read() |
|
|
|
|
|
|
|
|
extracted_text = "" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if extracted_text: |
|
|
extractor = get_extractor() |
|
|
result = extractor.extract(extracted_text) |
|
|
else: |
|
|
result = {} |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"filename": file.filename, |
|
|
"extracted_text": extracted_text, |
|
|
"entities": result, |
|
|
"message": "Image OCR not yet implemented" |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
raise HTTPException(500, f"Image parsing failed: {e}") |
|
|
|
|
|
|
|
|
@app.post("/chat") |
|
|
async def chat(request: ChatRequest): |
|
|
""" |
|
|
Multi-turn chat for financial analysis. |
|
|
|
|
|
- **messages**: Conversation history |
|
|
- **context**: Optional transaction context |
|
|
""" |
|
|
try: |
|
|
|
|
|
user_messages = [m for m in request.messages if m.role == "user"] |
|
|
if not user_messages: |
|
|
raise HTTPException(400, "No user message found") |
|
|
|
|
|
last_message = user_messages[-1].content |
|
|
|
|
|
|
|
|
intent = detect_intent(last_message) |
|
|
|
|
|
|
|
|
response = generate_response(intent, last_message, request.context) |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"response": response, |
|
|
"intent": intent, |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
raise HTTPException(500, f"Chat failed: {e}") |
|
|
|
|
|
|
|
|
@app.post("/analytics") |
|
|
async def analytics(request: AnalyticsRequest): |
|
|
""" |
|
|
Generate spending analytics from transactions. |
|
|
|
|
|
- **transactions**: List of extracted transactions |
|
|
- **period**: Time period (week, month, year) |
|
|
- **group_by**: Grouping (category, merchant, type) |
|
|
""" |
|
|
try: |
|
|
transactions = request.transactions |
|
|
|
|
|
if not transactions: |
|
|
return {"success": True, "data": {}} |
|
|
|
|
|
|
|
|
groups = {} |
|
|
total = 0 |
|
|
|
|
|
for txn in transactions: |
|
|
key = txn.get(request.group_by, "other") |
|
|
amount = txn.get("amount", 0) |
|
|
txn_type = txn.get("type", "debit") |
|
|
|
|
|
if key not in groups: |
|
|
groups[key] = {"total": 0, "count": 0, "transactions": []} |
|
|
|
|
|
if txn_type == "debit": |
|
|
groups[key]["total"] += amount |
|
|
total += amount |
|
|
|
|
|
groups[key]["count"] += 1 |
|
|
groups[key]["transactions"].append(txn) |
|
|
|
|
|
|
|
|
for key in groups: |
|
|
groups[key]["percentage"] = round(groups[key]["total"] / total * 100, 1) if total > 0 else 0 |
|
|
|
|
|
|
|
|
sorted_groups = dict(sorted(groups.items(), key=lambda x: x[1]["total"], reverse=True)) |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"period": request.period, |
|
|
"group_by": request.group_by, |
|
|
"total_spent": total, |
|
|
"transaction_count": len(transactions), |
|
|
"breakdown": sorted_groups |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
raise HTTPException(500, f"Analytics failed: {e}") |
|
|
|
|
|
|
|
|
@app.get("/merchants") |
|
|
async def list_merchants( |
|
|
category: Optional[str] = None, |
|
|
limit: int = 50 |
|
|
): |
|
|
""" |
|
|
List known merchants from knowledge base. |
|
|
|
|
|
- **category**: Filter by category |
|
|
- **limit**: Max results |
|
|
""" |
|
|
rag = get_rag_engine() |
|
|
|
|
|
if not rag: |
|
|
return {"success": False, "error": "RAG not available"} |
|
|
|
|
|
merchants = [] |
|
|
for name, merchant in rag.merchant_kb.merchants.items(): |
|
|
if category and merchant.category != category: |
|
|
continue |
|
|
|
|
|
merchants.append({ |
|
|
"name": merchant.name, |
|
|
"category": merchant.category, |
|
|
"vpa": merchant.vpa, |
|
|
"is_p2m": merchant.is_p2m, |
|
|
}) |
|
|
|
|
|
if len(merchants) >= limit: |
|
|
break |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"count": len(merchants), |
|
|
"merchants": merchants |
|
|
} |
|
|
|
|
|
|
|
|
@app.get("/categories") |
|
|
async def list_categories(): |
|
|
"""List available transaction categories.""" |
|
|
from finee.rag import CategoryTaxonomy |
|
|
|
|
|
categories = [] |
|
|
for name, info in CategoryTaxonomy.TAXONOMY.items(): |
|
|
categories.append({ |
|
|
"name": name, |
|
|
"parent": info.get("parent"), |
|
|
"children": info.get("children", []), |
|
|
"keywords": info.get("keywords", []) |
|
|
}) |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"categories": categories |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def detect_intent(message: str) -> str: |
|
|
"""Simple intent detection.""" |
|
|
message_lower = message.lower() |
|
|
|
|
|
if any(w in message_lower for w in ['how much', 'total', 'spent', 'spending']): |
|
|
return "spending_query" |
|
|
elif any(w in message_lower for w in ['compare', 'vs', 'versus', 'difference']): |
|
|
return "comparison" |
|
|
elif any(w in message_lower for w in ['category', 'break', 'breakdown']): |
|
|
return "category_breakdown" |
|
|
elif any(w in message_lower for w in ['extract', 'parse', 'analyze']): |
|
|
return "extraction" |
|
|
else: |
|
|
return "general" |
|
|
|
|
|
|
|
|
def generate_response(intent: str, message: str, context: Optional[Dict]) -> str: |
|
|
"""Generate chat response based on intent.""" |
|
|
if intent == "spending_query": |
|
|
if context and "transactions" in context: |
|
|
total = sum(t.get("amount", 0) for t in context["transactions"] if t.get("type") == "debit") |
|
|
return f"Based on your transactions, you've spent ₹{total:,.2f}" |
|
|
return "Please share your transaction data for spending analysis." |
|
|
|
|
|
elif intent == "category_breakdown": |
|
|
return "I can break down your spending by category. Please share transaction data." |
|
|
|
|
|
elif intent == "comparison": |
|
|
return "To compare periods, please specify the time ranges you'd like to compare." |
|
|
|
|
|
elif intent == "extraction": |
|
|
return "Share a bank message and I'll extract the financial details." |
|
|
|
|
|
else: |
|
|
return "I can help you analyze transactions, extract entities, or provide spending insights. What would you like to know?" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def start_server(host: str = "0.0.0.0", port: int = 8000): |
|
|
"""Start the API server.""" |
|
|
uvicorn.run(app, host=host, port=port) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
import argparse |
|
|
|
|
|
parser = argparse.ArgumentParser(description="FinEE API Server") |
|
|
parser.add_argument("--host", default="0.0.0.0", help="Host to bind") |
|
|
parser.add_argument("--port", type=int, default=8000, help="Port to bind") |
|
|
parser.add_argument("--reload", action="store_true", help="Enable auto-reload") |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
if args.reload: |
|
|
uvicorn.run("api:app", host=args.host, port=args.port, reload=True) |
|
|
else: |
|
|
start_server(args.host, args.port) |
|
|
|