Spaces:
Sleeping
Sleeping
| import logging | |
| from fastapi import APIRouter, HTTPException | |
| from backend.ai.analyzer import analyze_business_description, analyze_custom_task | |
| from backend.ai.workflow_builder import build_workflow_definition | |
| from backend.ai.workflow_suggester import suggest_workflow_options | |
| from backend.engine.compiler import compile_workflow | |
| from backend.engine.executor import WorkflowExecutor | |
| from backend.integrations.file_parser import parse_uploaded_payload | |
| from backend.models.schemas import ( | |
| AnalyzeRequest, | |
| BuildWorkflowRequest, | |
| BuildWorkflowResponse, | |
| CustomTaskRequest, | |
| DeployRequest, | |
| DeploymentResponse, | |
| EscalationReplyRequest, | |
| EscalationResponse, | |
| FileUploadRequest, | |
| FileUploadResponse, | |
| OwnerStatusResponse, | |
| StopAutomationRequest, | |
| StopAutomationResponse, | |
| WorkflowSuggestionRequest, | |
| ) | |
| from backend.storage.database import db | |
| router = APIRouter() | |
| executor = WorkflowExecutor() | |
| logger = logging.getLogger(__name__) | |
| def analyze(request: AnalyzeRequest) -> dict: | |
| try: | |
| analysis = analyze_business_description(request.description) | |
| except Exception as exc: | |
| logger.exception("Analyze request failed") | |
| raise HTTPException(status_code=500, detail=f"Analyze failed: {exc}") from exc | |
| owner = db.ensure_owner(request.owner_id, request.owner_email) | |
| owner["business_description"] = request.description | |
| owner["business_analysis"] = analysis | |
| db.save_owner(owner) | |
| return analysis | |
| def custom_task(request: CustomTaskRequest) -> dict: | |
| try: | |
| owner = db.get_owner(request.owner_id) | |
| except KeyError as exc: | |
| raise HTTPException(status_code=404, detail=str(exc)) from exc | |
| try: | |
| return analyze_custom_task( | |
| business_description=owner.get("business_description", ""), | |
| existing_workflows=db.list_workflows(request.owner_id), | |
| custom_task=request.custom_task, | |
| ) | |
| except Exception as exc: | |
| logger.exception("Custom task analysis failed") | |
| raise HTTPException(status_code=500, detail=f"Custom task analysis failed: {exc}") from exc | |
| def suggest_workflows(request: WorkflowSuggestionRequest) -> dict: | |
| try: | |
| owner = db.get_owner(request.owner_id) | |
| except KeyError as exc: | |
| raise HTTPException(status_code=404, detail=str(exc)) from exc | |
| try: | |
| return suggest_workflow_options( | |
| task_name=request.task_name, | |
| task_description=request.task_description, | |
| category=request.category, | |
| spreadsheet_info=owner.get("spreadsheet_config", {"connected": False}), | |
| uploaded_files=db.list_data_files(request.owner_id), | |
| ) | |
| except Exception as exc: | |
| logger.exception("Workflow suggestion failed") | |
| raise HTTPException(status_code=500, detail=f"Workflow suggestion failed: {exc}") from exc | |
| def build_workflow(request: BuildWorkflowRequest) -> BuildWorkflowResponse: | |
| try: | |
| owner = db.get_owner(request.owner_id) | |
| except KeyError as exc: | |
| raise HTTPException(status_code=404, detail=str(exc)) from exc | |
| try: | |
| workflow_json = build_workflow_definition(request=request, owner=owner) | |
| compiled = compile_workflow(workflow_json) | |
| except Exception as exc: | |
| logger.exception("Workflow build failed") | |
| raise HTTPException(status_code=500, detail=f"Workflow build failed: {exc}") from exc | |
| return BuildWorkflowResponse(workflow=compiled) | |
| def deploy_workflow(request: DeployRequest) -> DeploymentResponse: | |
| try: | |
| owner = db.get_owner(request.owner_id) | |
| except KeyError as exc: | |
| raise HTTPException(status_code=404, detail=str(exc)) from exc | |
| deployments = [] | |
| for workflow in request.workflows: | |
| compiled = compile_workflow(workflow.model_dump()) | |
| saved = db.save_workflow(request.owner_id, compiled) | |
| deployments.append(saved) | |
| owner["state"] = "live" | |
| db.save_owner(owner) | |
| return DeploymentResponse( | |
| status="deployed", | |
| workflows=deployments, | |
| message="All workflows are live.", | |
| ) | |
| def stop_automation(request: StopAutomationRequest) -> StopAutomationResponse: | |
| try: | |
| owner = db.get_owner(request.owner_id) | |
| except KeyError as exc: | |
| raise HTTPException(status_code=404, detail=str(exc)) from exc | |
| workflows = db.deactivate_workflows(request.owner_id) | |
| owner["state"] = "paused" | |
| db.save_owner(owner) | |
| return StopAutomationResponse( | |
| status="stopped", | |
| workflows=workflows, | |
| message="Automation stopped.", | |
| ) | |
| def upload_data(request: FileUploadRequest) -> FileUploadResponse: | |
| parsed = parse_uploaded_payload(request.filename, request.content, request.purpose) | |
| record = db.save_data_file(request.owner_id, request.filename, request.file_type, request.purpose, parsed) | |
| return FileUploadResponse(file=record) | |
| def reset_data() -> dict[str, str]: | |
| db.reset() | |
| return {"status": "ok", "message": "In-memory data reset."} | |
| def status(owner_id: str) -> OwnerStatusResponse: | |
| try: | |
| owner = db.get_owner(owner_id) | |
| except KeyError as exc: | |
| raise HTTPException(status_code=404, detail=str(exc)) from exc | |
| return OwnerStatusResponse( | |
| owner=owner, | |
| workflows=db.list_workflows(owner_id), | |
| recent_executions=db.list_execution_logs(owner_id), | |
| ) | |
| def escalations(owner_id: str) -> dict: | |
| return {"items": db.list_escalations(owner_id)} | |
| def escalation_reply(request: EscalationReplyRequest) -> EscalationResponse: | |
| escalation = db.resolve_escalation(request.escalation_id, request.response) | |
| return EscalationResponse(escalation=escalation) | |
| def simulate_run(owner_id: str, workflow_id: str, trigger: dict) -> dict: | |
| workflow = db.get_workflow(owner_id, workflow_id) | |
| result = executor.execute(workflow, trigger, db=db, owner_id=owner_id) | |
| db.save_execution_log(owner_id, workflow_id, trigger, result["steps"], result["outcome"], result.get("error")) | |
| return result | |
| def debug_groq() -> dict: | |
| from backend.ai.client import llm_client | |
| try: | |
| text = llm_client.generate_text("Reply with exactly: Groq debug ok") | |
| return {"status": "ok", "response": text} | |
| except Exception as exc: | |
| logger.exception("Groq debug call failed") | |
| raise HTTPException(status_code=500, detail=f"Groq debug failed: {exc}") from exc | |