|
|
import logging |
|
|
from fastapi import APIRouter, HTTPException |
|
|
from fastapi.concurrency import run_in_threadpool |
|
|
from app.schemas.payload import ExecutionRequest |
|
|
from app.services.rivergen import run_rivergen_flow |
|
|
|
|
|
|
|
|
logger = logging.getLogger("api_execution") |
|
|
|
|
|
router = APIRouter(tags=["Execution"]) |
|
|
|
|
|
@router.post( |
|
|
"/execute", |
|
|
response_model=dict, |
|
|
summary="Execute AI Flow", |
|
|
description="Processes natural language prompts via the RiverGen Engine." |
|
|
) |
|
|
async def execute_prompt(request: ExecutionRequest): |
|
|
""" |
|
|
Primary endpoint to process natural language prompts against data sources. |
|
|
Uses threadpooling to prevent blocking the async event loop. |
|
|
""" |
|
|
request_id = request.request_id or "unknown" |
|
|
logger.info(f"π [API] Received execution request: {request_id}") |
|
|
|
|
|
try: |
|
|
|
|
|
payload = request.model_dump() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
result = await run_in_threadpool(run_rivergen_flow, payload) |
|
|
|
|
|
|
|
|
if result.get("status") == "error" or "error" in result: |
|
|
error_msg = result.get("error", "Unknown processing error") |
|
|
|
|
|
|
|
|
last_feedback = result.get("last_feedback", "") |
|
|
if last_feedback: |
|
|
detailed_detail = f"{error_msg} \n\nπ REASON: {last_feedback}" |
|
|
else: |
|
|
detailed_detail = error_msg |
|
|
|
|
|
logger.warning(f"β οΈ [API] Logic Error for {request_id}: {error_msg}") |
|
|
|
|
|
|
|
|
raise HTTPException(status_code=400, detail=detailed_detail) |
|
|
|
|
|
logger.info(f"β
[API] Success for {request_id}") |
|
|
return result |
|
|
|
|
|
except HTTPException: |
|
|
|
|
|
raise |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
logger.error(f"β [API] System Crash for {request_id}: {str(e)}", exc_info=True) |
|
|
raise HTTPException( |
|
|
status_code=500, |
|
|
detail=f"Internal Server Error. Please contact support with Request ID: {request_id}" |
|
|
) |