# TerraSyncra_backend/app/main.py import os import sys import logging import uuid import asyncio import json import base64 from fastapi import FastAPI, Body, UploadFile, File, Form, WebSocket, WebSocketDisconnect from fastapi.middleware.cors import CORSMiddleware from typing import Optional import uvicorn BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if BASE_DIR not in sys.path: sys.path.insert(0, BASE_DIR) from app.tasks.rag_updater import schedule_updates from app.utils import config from app.agents.crew_pipeline import run_pipeline from app.agents.soil_agent import analyze_soil from app.agents.disease_agent import classify_disease_from_image, classify_disease_from_text from app.agents.live_voice_agent import handle_live_voice_websocket logging.basicConfig( format="%(asctime)s [%(levelname)s] %(message)s", level=logging.INFO ) app = FastAPI( title="TerraSyncra AI Backend", description="Backend service for TerraSyncra AI with RAG updates, multilingual support, expert AI pipeline, soil analysis, disease detection, and live voice interactions", version="1.4.0" ) app.add_middleware( CORSMiddleware, allow_origins=getattr(config, "ALLOWED_ORIGINS", ["*"]), allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.on_event("startup") def startup_event(): logging.info("Starting TerraSyncra AI backend...") schedule_updates() @app.get("/") def home(): """Health check endpoint.""" return { "status": "TerraSyncra AI backend running", "version": "1.4.0", "vectorstore_path": config.VECTORSTORE_PATH } @app.post("/ask") def ask_farmbot( query: str = Body(..., embed=True), session_id: str = Body(None, embed=True) ): """ Ask TerraSyncra AI a farming-related question. - Supports Hausa, Igbo, Yoruba, Swahili, Amharic, and English. - Automatically detects user language, translates if needed, and returns response in the same language. - Maintains separate conversation memory per session_id. """ if not session_id: session_id = str(uuid.uuid4()) # assign new session if missing logging.info(f"Received query: {query} [session_id={session_id}]") answer_data = run_pipeline(query, session_id=session_id) detected_lang = answer_data.get("detected_language", "Unknown") logging.info(f"Detected language: {detected_lang}") return { "query": query, "answer": answer_data.get("answer"), "session_id": answer_data.get("session_id"), "detected_language": detected_lang } @app.post("/analyze-soil") def analyze_soil_endpoint( report_data: str = Body(..., embed=True, description="Soil report or lab results text"), location: Optional[str] = Body(None, embed=True, description="Field location (e.g., state name)"), crop_type: Optional[str] = Body(None, embed=True, description="Intended crop type"), field_size: Optional[str] = Body(None, embed=True, description="Field size (e.g., '2 hectares')"), previous_crops: Optional[str] = Body(None, embed=True, description="Previous crops grown"), additional_notes: Optional[str] = Body(None, embed=True, description="Additional field information") ): """ Expert soil analysis endpoint. Accepts soil report data and optional field information. Returns comprehensive soil analysis and recommendations using Gemini 3 Flash. """ logging.info("Received soil analysis request") field_data = {} if location: field_data["location"] = location if crop_type: field_data["crop_type"] = crop_type if field_size: field_data["field_size"] = field_size if previous_crops: field_data["previous_crops"] = previous_crops if additional_notes: field_data["additional_notes"] = additional_notes result = analyze_soil(report_data, field_data if field_data else None) return result @app.post("/detect-disease-image") async def detect_disease_image( image: UploadFile = File(..., description="Image file of plant or animal showing disease symptoms"), query: Optional[str] = Form(None, description="Optional text query or description") ): """ Disease detection from image upload. Accepts image file and optional text query. Returns disease classification and treatment recommendations using Gemini 2.0 Flash Exp. Supports: JPEG, PNG, and other image formats. """ logging.info(f"Received disease detection request (image: {image.filename})") # Read image bytes image_bytes = await image.read() image_mime_type = image.content_type or "image/jpeg" result = classify_disease_from_image(image_bytes, image_mime_type, query) return result @app.post("/detect-disease-text") def detect_disease_text( description: str = Body(..., embed=True, description="Text description of disease symptoms or condition"), language: Optional[str] = Body("en", embed=True, description="Language code (en, ig, ha, yo)") ): """ Disease detection from text/voice description. Accepts text description of symptoms. Returns disease classification and treatment recommendations using Gemini 2.0 Flash Exp. Supports multilingual input (English, Igbo, Hausa, Yoruba). """ logging.info(f"Received disease detection request (text, language: {language})") result = classify_disease_from_text(description, language) return result @app.websocket("/live-voice") async def live_voice_websocket(websocket: WebSocket): """ WebSocket endpoint for live voice interaction with TerraSyncra. Supports: - Real-time bidirectional audio streaming - Optional image upload at session start for disease detection - Multilingual voice input/output (Igbo, Hausa, Yoruba, English) Protocol: 1. Client connects via WebSocket 2. Client can optionally send an image first (as JSON with base64 encoded image) Format: {"type": "image", "data": "base64_string", "mime_type": "image/jpeg"} 3. Client streams audio chunks as raw bytes (PCM format, 16kHz, mono, 16-bit) OR as JSON: {"type": "audio", "data": "base64_string"} 4. Server streams audio responses back as raw bytes 5. Server may send JSON messages for status/transcripts: - {"type": "connected", "message": "..."} - {"type": "image_sent", "message": "..."} - {"type": "transcript", "text": "..."} - {"type": "error", "message": "..."} Audio format: PCM, 16kHz sample rate, mono channel, 16-bit depth """ await websocket.accept() logging.info("WebSocket connection established for live voice") # Start live voice session (will handle image/audio internally) await handle_live_voice_websocket(websocket) @app.post("/live-voice-start") async def live_voice_start( image: Optional[UploadFile] = File(None, description="Optional image to analyze with voice"), use_disease_mode: bool = Form(True, description="Focus on disease detection if True") ): """ Initialize a live voice session (alternative to WebSocket for HTTP-based clients). Returns session configuration that can be used with Gemini Live API directly. Note: For full bidirectional streaming, use the WebSocket endpoint /live-voice instead. """ logging.info("Live voice session initialization requested") image_bytes = None image_mime_type = "image/jpeg" if image: image_bytes = await image.read() image_mime_type = image.content_type or "image/jpeg" logging.info(f"Image uploaded: {image.filename}, type: {image_mime_type}") from app.agents.live_voice_agent import create_live_voice_session result = await create_live_voice_session(image_bytes, image_mime_type, use_disease_mode) return result if __name__ == "__main__": uvicorn.run( "app.main:app", host="0.0.0.0", port=getattr(config, "PORT", 7860), reload=bool(getattr(config, "DEBUG", False)) )