AgriTech / ledger_endpoints.py
Tahasaif3's picture
'code'
800595f
"""
FastAPI endpoints for LedgerAgent integration
Connects voice/manual entries to autonomous agent processing
"""
from fastapi import APIRouter, Body, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import Optional, List
import asyncio
from datetime import datetime
from ageents.ledgerAgent import (
process_voice_entry,
process_manual_entry,
process_batch_entries,
VoiceEntryData,
ManualEntryData,
)
router = APIRouter(prefix="/api/ledger", tags=["ledger-agent"])
# ================== REQUEST MODELS ==================
class VoiceEntryRequest(BaseModel):
transcript: str
category: str
amount: float
notes: str
confidence: float = 0.0
user_id: str
device_id: str = "voice-app"
class ManualEntryRequest(BaseModel):
category: str
amount: float
date: str
payment_method: str
notes: str = ""
user_id: str
device_id: str = "manual-form"
class BatchEntryRequest(BaseModel):
entries: List[dict]
user_id: str
class ProcessingResponse(BaseModel):
status: str
entry_id: Optional[str] = None
message: str
timestamp: str = None
class Config:
json_schema_extra = {
"example": {
"status": "success",
"entry_id": "550e8400-e29b-41d4-a716-446655440000",
"message": "Entry processed and synced",
"timestamp": "2024-01-15T10:30:00Z"
}
}
# ================== VOICE ENTRY ENDPOINT ==================
@router.post("/voice-entry", response_model=ProcessingResponse)
async def process_voice_entry_endpoint(
request: VoiceEntryRequest,
background_tasks: BackgroundTasks
):
"""
Process voice-parsed entry through LedgerAgent.
Flow:
1. Frontend records voice β†’ parses category/amount
2. User confirms entry (clicks "Confirm & Save")
3. Frontend sends to this endpoint with parsed data + transcript
4. Agent validates, determines entry type, saves to Firebase
5. Frontend receives "expense-added" event and updates table
Example request:
```json
{
"transcript": "khad 1500 rupay DAP 50kg",
"category": "fertilizer",
"amount": 1500,
"notes": "DAP 50kg for cotton field",
"confidence": 0.85,
"user_id": "user_abc123",
"device_id": "phone-safari"
}
```
"""
try:
# Validate user exists
if not request.user_id:
raise HTTPException(status_code=400, detail="user_id is required")
voice_data = VoiceEntryData(
transcript=request.transcript,
category=request.category,
amount=request.amount,
notes=request.notes,
confidence=request.confidence,
user_id=request.user_id,
device_id=request.device_id,
recorded_at=datetime.utcnow().isoformat(),
)
# Process asynchronously in background
async def process_async():
try:
result = await process_voice_entry(voice_data)
print(f"βœ… Voice entry {result.id} processed successfully")
except Exception as e:
print(f"❌ Background processing failed: {str(e)}")
background_tasks.add_task(process_async)
return ProcessingResponse(
status="processing",
message="Voice entry received. Agent is processing...",
timestamp=datetime.utcnow().isoformat(),
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Error processing voice entry: {str(e)}"
)
# ================== MANUAL ENTRY ENDPOINT ==================
@router.post("/manual-entry", response_model=ProcessingResponse)
async def process_manual_entry_endpoint(
request: ManualEntryRequest,
background_tasks: BackgroundTasks
):
"""
Process manually submitted expense form through LedgerAgent.
Flow:
1. User fills ExpenseManualForm in frontend
2. Frontend calls this endpoint with form data
3. Agent validates and processes
4. Entry saved to Firebase and synced automatically
Example request:
```json
{
"category": "seeds",
"amount": 5000,
"date": "2024-01-15",
"payment_method": "mobile",
"notes": "Wheat seeds from dealer",
"user_id": "user_abc123"
}
```
"""
try:
if not request.user_id:
raise HTTPException(status_code=400, detail="user_id is required")
manual_data = ManualEntryData(
category=request.category,
amount=request.amount,
date=request.date,
payment_method=request.payment_method,
notes=request.notes,
user_id=request.user_id,
device_id=request.device_id,
)
async def process_async():
try:
result = await process_manual_entry(manual_data)
print(f"βœ… Manual entry {result.id} processed successfully")
except Exception as e:
print(f"❌ Background processing failed: {str(e)}")
background_tasks.add_task(process_async)
return ProcessingResponse(
status="processing",
message="Entry received. Agent is processing and saving...",
timestamp=datetime.utcnow().isoformat(),
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Error processing manual entry: {str(e)}"
)
# ================== BATCH PROCESSING ENDPOINT ==================
@router.post("/batch-entries", response_model=dict)
async def process_batch_entries_endpoint(
request: BatchEntryRequest,
background_tasks: BackgroundTasks
):
"""
Process multiple entries in batch (e.g., offline sync).
Useful when user was offline and accumulated entries locally,
then reconnects and wants to sync all at once.
Example request:
```json
{
"user_id": "user_abc123",
"entries": [
{
"type": "voice",
"transcript": "khad 1500 rupay",
"category": "fertilizer",
"amount": 1500,
"notes": "DAP",
"confidence": 0.85
},
{
"type": "manual",
"category": "fuel",
"amount": 2000,
"date": "2024-01-15",
"payment_method": "cash",
"notes": "Diesel"
}
]
}
```
"""
try:
if not request.user_id:
raise HTTPException(status_code=400, detail="user_id is required")
if not request.entries:
raise HTTPException(status_code=400, detail="No entries provided")
async def process_async():
try:
results = await process_batch_entries(request.entries, request.user_id)
print(f"βœ… Batch processed: {len(results)} entries")
except Exception as e:
print(f"❌ Batch processing failed: {str(e)}")
background_tasks.add_task(process_async)
return {
"status": "processing",
"batch_size": len(request.entries),
"message": f"Processing {len(request.entries)} entries...",
"timestamp": datetime.utcnow().isoformat(),
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Error processing batch: {str(e)}"
)
# ================== HEALTH CHECK ==================
@router.get("/health")
async def health_check():
"""Check if agent is ready to process entries."""
return {
"status": "ready",
"service": "LedgerAgent",
"timestamp": datetime.utcnow().isoformat(),
}
# ================== EXPORT ==================
__all__ = ["router"]