File size: 3,879 Bytes
7b2787b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
"""
FlowGraph - FastAPI Application Entry Point.

A lightweight, async-first workflow orchestration engine for building agent pipelines.
"""

from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
import logging

from app.config import settings
from app.api.routes import graph, tools, websocket
from app.workflows.code_review import register_code_review_workflow

# Import builtin tools to register them
import app.tools.builtin  # noqa: F401


# Configure logging
logging.basicConfig(
    level=getattr(logging, settings.LOG_LEVEL),
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)


@asynccontextmanager
async def lifespan(app: FastAPI):
    """Application lifespan handler."""
    # Startup
    logger.info(f"Starting {settings.APP_NAME} v{settings.APP_VERSION}")
    
    # Register the demo workflow
    await register_code_review_workflow()
    
    yield
    
    # Shutdown
    logger.info("Shutting down...")


# Create FastAPI application
app = FastAPI(
    title=settings.APP_NAME,
    description="""
## Workflow Engine API

A minimal but powerful workflow/graph engine for building agent workflows.

### Features
- **Nodes**: Python functions that read and modify shared state
- **Edges**: Define execution flow between nodes
- **Branching**: Conditional routing based on state values
- **Looping**: Support for iterative workflows
- **Real-time Updates**: WebSocket support for live execution streaming

### Quick Start
1. List available tools: `GET /tools`
2. Create a graph: `POST /graph/create`
3. Run the graph: `POST /graph/run`
4. Check execution state: `GET /graph/state/{run_id}`

### Demo Workflow
A pre-registered Code Review workflow is available with ID: `code-review-demo`
    """,
    version=settings.APP_VERSION,
    docs_url="/docs",
    redoc_url="/redoc",
    lifespan=lifespan,
)


# Add CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)


# Include routers
app.include_router(graph.router)
app.include_router(tools.router)
app.include_router(websocket.router)


# ============================================================
# Root Endpoints
# ============================================================

@app.get("/", tags=["Root"])
async def root():
    """API root - returns basic info and links."""
    return {
        "name": settings.APP_NAME,
        "version": settings.APP_VERSION,
        "description": "A minimal workflow/graph engine for agent workflows",
        "docs": "/docs",
        "redoc": "/redoc",
        "endpoints": {
            "graphs": "/graph",
            "tools": "/tools",
            "websocket_run": "/ws/run/{graph_id}",
            "websocket_subscribe": "/ws/subscribe/{run_id}",
        },
        "demo_workflow": "code-review-demo",
    }


@app.get("/health", tags=["Root"])
async def health():
    """Health check endpoint."""
    from app.storage.memory import graph_storage, run_storage
    
    return {
        "status": "healthy",
        "version": settings.APP_VERSION,
        "graphs_count": len(graph_storage),
        "runs_count": len(run_storage),
    }


# ============================================================
# Error Handlers
# ============================================================

@app.exception_handler(Exception)
async def global_exception_handler(request, exc):
    """Global exception handler for unhandled errors."""
    logger.exception(f"Unhandled error: {exc}")
    return JSONResponse(
        status_code=500,
        content={
            "error": "Internal Server Error",
            "detail": str(exc) if settings.DEBUG else "An unexpected error occurred",
        },
    )