Terrasyncra / app /main.py
nexusbert's picture
Initial TerraSyncra AI deployment - CPU optimized with lazy loading and Qwen 1.8B model
9ebe82e
# TerraSyncra_backend/app/main.py
import os
import sys
import logging
import uuid
import asyncio
import json
import base64
from fastapi import FastAPI, Body, UploadFile, File, Form, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from typing import Optional
import uvicorn
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if BASE_DIR not in sys.path:
sys.path.insert(0, BASE_DIR)
from app.tasks.rag_updater import schedule_updates
from app.utils import config
from app.agents.crew_pipeline import run_pipeline
from app.agents.soil_agent import analyze_soil
from app.agents.disease_agent import classify_disease_from_image, classify_disease_from_text
from app.agents.live_voice_agent import handle_live_voice_websocket
logging.basicConfig(
format="%(asctime)s [%(levelname)s] %(message)s",
level=logging.INFO
)
app = FastAPI(
title="TerraSyncra AI Backend",
description="Backend service for TerraSyncra AI with RAG updates, multilingual support, expert AI pipeline, soil analysis, disease detection, and live voice interactions",
version="1.4.0"
)
app.add_middleware(
CORSMiddleware,
allow_origins=getattr(config, "ALLOWED_ORIGINS", ["*"]),
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.on_event("startup")
def startup_event():
logging.info("Starting TerraSyncra AI backend...")
schedule_updates()
@app.get("/")
def home():
"""Health check endpoint."""
return {
"status": "TerraSyncra AI backend running",
"version": "1.4.0",
"vectorstore_path": config.VECTORSTORE_PATH
}
@app.post("/ask")
def ask_farmbot(
query: str = Body(..., embed=True),
session_id: str = Body(None, embed=True)
):
"""
Ask TerraSyncra AI a farming-related question.
- Supports Hausa, Igbo, Yoruba, Swahili, Amharic, and English.
- Automatically detects user language, translates if needed,
and returns response in the same language.
- Maintains separate conversation memory per session_id.
"""
if not session_id:
session_id = str(uuid.uuid4()) # assign new session if missing
logging.info(f"Received query: {query} [session_id={session_id}]")
answer_data = run_pipeline(query, session_id=session_id)
detected_lang = answer_data.get("detected_language", "Unknown")
logging.info(f"Detected language: {detected_lang}")
return {
"query": query,
"answer": answer_data.get("answer"),
"session_id": answer_data.get("session_id"),
"detected_language": detected_lang
}
@app.post("/analyze-soil")
def analyze_soil_endpoint(
report_data: str = Body(..., embed=True, description="Soil report or lab results text"),
location: Optional[str] = Body(None, embed=True, description="Field location (e.g., state name)"),
crop_type: Optional[str] = Body(None, embed=True, description="Intended crop type"),
field_size: Optional[str] = Body(None, embed=True, description="Field size (e.g., '2 hectares')"),
previous_crops: Optional[str] = Body(None, embed=True, description="Previous crops grown"),
additional_notes: Optional[str] = Body(None, embed=True, description="Additional field information")
):
"""
Expert soil analysis endpoint.
Accepts soil report data and optional field information.
Returns comprehensive soil analysis and recommendations using Gemini 3 Flash.
"""
logging.info("Received soil analysis request")
field_data = {}
if location:
field_data["location"] = location
if crop_type:
field_data["crop_type"] = crop_type
if field_size:
field_data["field_size"] = field_size
if previous_crops:
field_data["previous_crops"] = previous_crops
if additional_notes:
field_data["additional_notes"] = additional_notes
result = analyze_soil(report_data, field_data if field_data else None)
return result
@app.post("/detect-disease-image")
async def detect_disease_image(
image: UploadFile = File(..., description="Image file of plant or animal showing disease symptoms"),
query: Optional[str] = Form(None, description="Optional text query or description")
):
"""
Disease detection from image upload.
Accepts image file and optional text query.
Returns disease classification and treatment recommendations using Gemini 2.0 Flash Exp.
Supports: JPEG, PNG, and other image formats.
"""
logging.info(f"Received disease detection request (image: {image.filename})")
# Read image bytes
image_bytes = await image.read()
image_mime_type = image.content_type or "image/jpeg"
result = classify_disease_from_image(image_bytes, image_mime_type, query)
return result
@app.post("/detect-disease-text")
def detect_disease_text(
description: str = Body(..., embed=True, description="Text description of disease symptoms or condition"),
language: Optional[str] = Body("en", embed=True, description="Language code (en, ig, ha, yo)")
):
"""
Disease detection from text/voice description.
Accepts text description of symptoms.
Returns disease classification and treatment recommendations using Gemini 2.0 Flash Exp.
Supports multilingual input (English, Igbo, Hausa, Yoruba).
"""
logging.info(f"Received disease detection request (text, language: {language})")
result = classify_disease_from_text(description, language)
return result
@app.websocket("/live-voice")
async def live_voice_websocket(websocket: WebSocket):
"""
WebSocket endpoint for live voice interaction with TerraSyncra.
Supports:
- Real-time bidirectional audio streaming
- Optional image upload at session start for disease detection
- Multilingual voice input/output (Igbo, Hausa, Yoruba, English)
Protocol:
1. Client connects via WebSocket
2. Client can optionally send an image first (as JSON with base64 encoded image)
Format: {"type": "image", "data": "base64_string", "mime_type": "image/jpeg"}
3. Client streams audio chunks as raw bytes (PCM format, 16kHz, mono, 16-bit)
OR as JSON: {"type": "audio", "data": "base64_string"}
4. Server streams audio responses back as raw bytes
5. Server may send JSON messages for status/transcripts:
- {"type": "connected", "message": "..."}
- {"type": "image_sent", "message": "..."}
- {"type": "transcript", "text": "..."}
- {"type": "error", "message": "..."}
Audio format: PCM, 16kHz sample rate, mono channel, 16-bit depth
"""
await websocket.accept()
logging.info("WebSocket connection established for live voice")
# Start live voice session (will handle image/audio internally)
await handle_live_voice_websocket(websocket)
@app.post("/live-voice-start")
async def live_voice_start(
image: Optional[UploadFile] = File(None, description="Optional image to analyze with voice"),
use_disease_mode: bool = Form(True, description="Focus on disease detection if True")
):
"""
Initialize a live voice session (alternative to WebSocket for HTTP-based clients).
Returns session configuration that can be used with Gemini Live API directly.
Note: For full bidirectional streaming, use the WebSocket endpoint /live-voice instead.
"""
logging.info("Live voice session initialization requested")
image_bytes = None
image_mime_type = "image/jpeg"
if image:
image_bytes = await image.read()
image_mime_type = image.content_type or "image/jpeg"
logging.info(f"Image uploaded: {image.filename}, type: {image_mime_type}")
from app.agents.live_voice_agent import create_live_voice_session
result = await create_live_voice_session(image_bytes, image_mime_type, use_disease_mode)
return result
if __name__ == "__main__":
uvicorn.run(
"app.main:app",
host="0.0.0.0",
port=getattr(config, "PORT", 7860),
reload=bool(getattr(config, "DEBUG", False))
)