| """ |
| FastAPI endpoints for LedgerAgent integration |
| Connects voice/manual entries to autonomous agent processing |
| """ |
|
|
| from fastapi import APIRouter, Body, HTTPException, BackgroundTasks |
| from pydantic import BaseModel |
| from typing import Optional, List |
| import asyncio |
| from datetime import datetime |
|
|
| from ageents.ledgerAgent import ( |
| process_voice_entry, |
| process_manual_entry, |
| process_batch_entries, |
| VoiceEntryData, |
| ManualEntryData, |
| ) |
|
|
| router = APIRouter(prefix="/api/ledger", tags=["ledger-agent"]) |
|
|
| |
|
|
| class VoiceEntryRequest(BaseModel): |
| transcript: str |
| category: str |
| amount: float |
| notes: str |
| confidence: float = 0.0 |
| user_id: str |
| device_id: str = "voice-app" |
|
|
| class ManualEntryRequest(BaseModel): |
| category: str |
| amount: float |
| date: str |
| payment_method: str |
| notes: str = "" |
| user_id: str |
| device_id: str = "manual-form" |
|
|
| class BatchEntryRequest(BaseModel): |
| entries: List[dict] |
| user_id: str |
|
|
| class ProcessingResponse(BaseModel): |
| status: str |
| entry_id: Optional[str] = None |
| message: str |
| timestamp: str = None |
|
|
| class Config: |
| json_schema_extra = { |
| "example": { |
| "status": "success", |
| "entry_id": "550e8400-e29b-41d4-a716-446655440000", |
| "message": "Entry processed and synced", |
| "timestamp": "2024-01-15T10:30:00Z" |
| } |
| } |
|
|
| |
|
|
| @router.post("/voice-entry", response_model=ProcessingResponse) |
| async def process_voice_entry_endpoint( |
| request: VoiceEntryRequest, |
| background_tasks: BackgroundTasks |
| ): |
| """ |
| Process voice-parsed entry through LedgerAgent. |
| |
| Flow: |
| 1. Frontend records voice β parses category/amount |
| 2. User confirms entry (clicks "Confirm & Save") |
| 3. Frontend sends to this endpoint with parsed data + transcript |
| 4. Agent validates, determines entry type, saves to Firebase |
| 5. Frontend receives "expense-added" event and updates table |
| |
| Example request: |
| ```json |
| { |
| "transcript": "khad 1500 rupay DAP 50kg", |
| "category": "fertilizer", |
| "amount": 1500, |
| "notes": "DAP 50kg for cotton field", |
| "confidence": 0.85, |
| "user_id": "user_abc123", |
| "device_id": "phone-safari" |
| } |
| ``` |
| """ |
| try: |
| |
| if not request.user_id: |
| raise HTTPException(status_code=400, detail="user_id is required") |
| |
| voice_data = VoiceEntryData( |
| transcript=request.transcript, |
| category=request.category, |
| amount=request.amount, |
| notes=request.notes, |
| confidence=request.confidence, |
| user_id=request.user_id, |
| device_id=request.device_id, |
| recorded_at=datetime.utcnow().isoformat(), |
| ) |
| |
| |
| async def process_async(): |
| try: |
| result = await process_voice_entry(voice_data) |
| print(f"β
Voice entry {result.id} processed successfully") |
| except Exception as e: |
| print(f"β Background processing failed: {str(e)}") |
| |
| background_tasks.add_task(process_async) |
| |
| return ProcessingResponse( |
| status="processing", |
| message="Voice entry received. Agent is processing...", |
| timestamp=datetime.utcnow().isoformat(), |
| ) |
| |
| except HTTPException: |
| raise |
| except Exception as e: |
| raise HTTPException( |
| status_code=500, |
| detail=f"Error processing voice entry: {str(e)}" |
| ) |
|
|
| |
|
|
| @router.post("/manual-entry", response_model=ProcessingResponse) |
| async def process_manual_entry_endpoint( |
| request: ManualEntryRequest, |
| background_tasks: BackgroundTasks |
| ): |
| """ |
| Process manually submitted expense form through LedgerAgent. |
| |
| Flow: |
| 1. User fills ExpenseManualForm in frontend |
| 2. Frontend calls this endpoint with form data |
| 3. Agent validates and processes |
| 4. Entry saved to Firebase and synced automatically |
| |
| Example request: |
| ```json |
| { |
| "category": "seeds", |
| "amount": 5000, |
| "date": "2024-01-15", |
| "payment_method": "mobile", |
| "notes": "Wheat seeds from dealer", |
| "user_id": "user_abc123" |
| } |
| ``` |
| """ |
| try: |
| if not request.user_id: |
| raise HTTPException(status_code=400, detail="user_id is required") |
| |
| manual_data = ManualEntryData( |
| category=request.category, |
| amount=request.amount, |
| date=request.date, |
| payment_method=request.payment_method, |
| notes=request.notes, |
| user_id=request.user_id, |
| device_id=request.device_id, |
| ) |
| |
| async def process_async(): |
| try: |
| result = await process_manual_entry(manual_data) |
| print(f"β
Manual entry {result.id} processed successfully") |
| except Exception as e: |
| print(f"β Background processing failed: {str(e)}") |
| |
| background_tasks.add_task(process_async) |
| |
| return ProcessingResponse( |
| status="processing", |
| message="Entry received. Agent is processing and saving...", |
| timestamp=datetime.utcnow().isoformat(), |
| ) |
| |
| except HTTPException: |
| raise |
| except Exception as e: |
| raise HTTPException( |
| status_code=500, |
| detail=f"Error processing manual entry: {str(e)}" |
| ) |
|
|
| |
|
|
| @router.post("/batch-entries", response_model=dict) |
| async def process_batch_entries_endpoint( |
| request: BatchEntryRequest, |
| background_tasks: BackgroundTasks |
| ): |
| """ |
| Process multiple entries in batch (e.g., offline sync). |
| |
| Useful when user was offline and accumulated entries locally, |
| then reconnects and wants to sync all at once. |
| |
| Example request: |
| ```json |
| { |
| "user_id": "user_abc123", |
| "entries": [ |
| { |
| "type": "voice", |
| "transcript": "khad 1500 rupay", |
| "category": "fertilizer", |
| "amount": 1500, |
| "notes": "DAP", |
| "confidence": 0.85 |
| }, |
| { |
| "type": "manual", |
| "category": "fuel", |
| "amount": 2000, |
| "date": "2024-01-15", |
| "payment_method": "cash", |
| "notes": "Diesel" |
| } |
| ] |
| } |
| ``` |
| """ |
| try: |
| if not request.user_id: |
| raise HTTPException(status_code=400, detail="user_id is required") |
| |
| if not request.entries: |
| raise HTTPException(status_code=400, detail="No entries provided") |
| |
| async def process_async(): |
| try: |
| results = await process_batch_entries(request.entries, request.user_id) |
| print(f"β
Batch processed: {len(results)} entries") |
| except Exception as e: |
| print(f"β Batch processing failed: {str(e)}") |
| |
| background_tasks.add_task(process_async) |
| |
| return { |
| "status": "processing", |
| "batch_size": len(request.entries), |
| "message": f"Processing {len(request.entries)} entries...", |
| "timestamp": datetime.utcnow().isoformat(), |
| } |
| |
| except HTTPException: |
| raise |
| except Exception as e: |
| raise HTTPException( |
| status_code=500, |
| detail=f"Error processing batch: {str(e)}" |
| ) |
|
|
| |
|
|
| @router.get("/health") |
| async def health_check(): |
| """Check if agent is ready to process entries.""" |
| return { |
| "status": "ready", |
| "service": "LedgerAgent", |
| "timestamp": datetime.utcnow().isoformat(), |
| } |
|
|
| |
|
|
| __all__ = ["router"] |