File size: 5,872 Bytes
f695e34
7b2787b
 
 
 
f695e34
 
7b2787b
 
f695e34
 
7b2787b
 
85e43f2
7b2787b
85e43f2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
---
title: FlowGraph
emoji: πŸ”„
colorFrom: blue
colorTo: purple
sdk: docker
pinned: false
license: mit
app_port: 7860
---

# FlowGraph

A lightweight, async-first workflow orchestration engine for building agent pipelines in Python.

![Python](https://img.shields.io/badge/Python-3.9+-blue?style=flat-square)
![FastAPI](https://img.shields.io/badge/FastAPI-0.104+-green?style=flat-square)
![License](https://img.shields.io/badge/License-MIT-yellow?style=flat-square)

A minimal but powerful graph-based workflow engine similar to [LangGraph](https://github.com/langchain-ai/langgraph). Define sequences of steps (nodes), connect them with edges, maintain shared state, and run workflows via REST APIs.

**Live Demo:** https://kbsss-flowgraph.hf.space/docs

---

## Features

| Feature | Description |
|---------|-------------|
| Nodes | Python functions that read and modify shared state |
| Edges | Define which node runs after which |
| Branching | Conditional routing based on state values |
| Looping | Run nodes repeatedly until conditions are met |
| Async | Full async/await support for scalability |
| WebSocket | Real-time execution streaming |
| Visualization | Auto-generated Mermaid diagrams |

---

## Quick Start

### With Docker (Recommended)

```bash
git clone https://github.com/kbss0000/flowgraph.git
cd flowgraph
docker compose up -d
curl http://localhost:8000/health
```

### Without Docker

```bash
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
python run.py
```

**Access Points:**
- API: http://localhost:8000
- Swagger Docs: http://localhost:8000/docs

---

## API Reference

### Graph Endpoints

| Method | Endpoint | Description |
|--------|----------|-------------|
| `POST` | `/graph/create` | Create a new workflow graph |
| `GET` | `/graph/` | List all graphs |
| `GET` | `/graph/{graph_id}` | Get graph details + Mermaid diagram |
| `POST` | `/graph/run` | Execute a graph |
| `GET` | `/graph/state/{run_id}` | Get execution state |

### Tool Endpoints

| Method | Endpoint | Description |
|--------|----------|-------------|
| `GET` | `/tools/` | List all registered tools |
| `POST` | `/tools/register` | Register a new tool dynamically |

### WebSocket

| Endpoint | Description |
|----------|-------------|
| `/ws/run/{graph_id}` | Execute with real-time streaming |

---

## Sample Workflow: Code Review

The included demo workflow analyzes Python code quality:

```
Extract Functions -> Check Complexity -> Detect Issues --+--> END (pass)
                                                         |
                                                         +--> Improve -> (loop back)
```

### Try It

```bash
curl -X POST "https://kbsss-flowgraph.hf.space/graph/run" \
  -H "Content-Type: application/json" \
  -d '{
    "graph_id": "code-review-demo",
    "initial_state": {
      "code": "def hello():\n    print(\"world\")",
      "quality_threshold": 6.0
    }
  }'
```

---

## Architecture

```
flowgraph/
β”œβ”€β”€ app/
β”‚   β”œβ”€β”€ main.py              # FastAPI entry point
β”‚   β”œβ”€β”€ config.py            # Configuration
β”‚   β”œβ”€β”€ api/
β”‚   β”‚   β”œβ”€β”€ schemas.py       # Pydantic models
β”‚   β”‚   └── routes/
β”‚   β”‚       β”œβ”€β”€ graph.py     # Graph CRUD + execution
β”‚   β”‚       β”œβ”€β”€ tools.py     # Tool management
β”‚   β”‚       └── websocket.py # Real-time streaming
β”‚   β”œβ”€β”€ engine/
β”‚   β”‚   β”œβ”€β”€ state.py         # Immutable state management
β”‚   β”‚   β”œβ”€β”€ node.py          # Node definitions + decorators
β”‚   β”‚   β”œβ”€β”€ graph.py         # Graph structure + validation
β”‚   β”‚   └── executor.py      # Async workflow executor
β”‚   β”œβ”€β”€ tools/
β”‚   β”‚   β”œβ”€β”€ registry.py      # Tool registry
β”‚   β”‚   └── builtin.py       # Built-in tools
β”‚   β”œβ”€β”€ workflows/
β”‚   β”‚   └── code_review.py   # Demo workflow
β”‚   └── storage/
β”‚       └── memory.py        # In-memory storage
β”œβ”€β”€ tests/                   # Test suite
β”œβ”€β”€ Dockerfile
β”œβ”€β”€ docker-compose.yml
└── requirements.txt
```

---

## Design Decisions

| Decision | Rationale |
|----------|-----------|
| Immutable state | Predictable flow, easier debugging, clear state transitions |
| Async-first | Scalability for long-running or I/O-bound workflows |
| Tool registry | Decouples node logic from handlers, enables dynamic registration |
| Named conditions | Clean serialization, human-readable graph definitions |
| In-memory storage | Simplicity first; easily swappable for Redis/PostgreSQL |
| Max iterations | Safety guard against infinite loops |

---

## Testing

```bash
# Run tests in Docker
docker compose exec workflow-engine pytest tests/ -v

# Run locally
pytest tests/ -v
```

---

## What I Would Improve

With more time, I would add:

1. Persistent Storage - PostgreSQL/Redis for production
2. Parallel Execution - Run independent nodes concurrently
3. Checkpointing - Save/restore execution state
4. Retry Logic - Automatic retry on node failures
5. Metrics - Prometheus/Grafana integration
6. Authentication - API key / JWT support
7. Visual Editor - Web UI for building workflows

---

## Creating Custom Workflows

### 1. Define a Node Handler

```python
from app.tools.registry import register_tool

@register_tool("my_processor")
def my_processor(data: str) -> dict:
    return {"result": data.upper()}
```

### 2. Create via API

```json
POST /graph/create
{
  "name": "my_workflow",
  "nodes": [
    {"name": "step1", "handler": "my_processor"},
    {"name": "step2", "handler": "another_tool"}
  ],
  "edges": {"step1": "step2"},
  "entry_point": "step1"
}
```

### 3. Run It

```json
POST /graph/run
{
  "graph_id": "returned_graph_id",
  "initial_state": {"data": "hello"}
}
```

---

## License

MIT License - see [LICENSE](LICENSE) for details.