flowgraph / README.md
kbsss's picture
Upload folder using huggingface_hub
85e43f2 verified
---
title: FlowGraph
emoji: πŸ”„
colorFrom: blue
colorTo: purple
sdk: docker
pinned: false
license: mit
app_port: 7860
---
# FlowGraph
A lightweight, async-first workflow orchestration engine for building agent pipelines in Python.
![Python](https://img.shields.io/badge/Python-3.9+-blue?style=flat-square)
![FastAPI](https://img.shields.io/badge/FastAPI-0.104+-green?style=flat-square)
![License](https://img.shields.io/badge/License-MIT-yellow?style=flat-square)
A minimal but powerful graph-based workflow engine similar to [LangGraph](https://github.com/langchain-ai/langgraph). Define sequences of steps (nodes), connect them with edges, maintain shared state, and run workflows via REST APIs.
**Live Demo:** https://kbsss-flowgraph.hf.space/docs
---
## Features
| Feature | Description |
|---------|-------------|
| Nodes | Python functions that read and modify shared state |
| Edges | Define which node runs after which |
| Branching | Conditional routing based on state values |
| Looping | Run nodes repeatedly until conditions are met |
| Async | Full async/await support for scalability |
| WebSocket | Real-time execution streaming |
| Visualization | Auto-generated Mermaid diagrams |
---
## Quick Start
### With Docker (Recommended)
```bash
git clone https://github.com/kbss0000/flowgraph.git
cd flowgraph
docker compose up -d
curl http://localhost:8000/health
```
### Without Docker
```bash
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
python run.py
```
**Access Points:**
- API: http://localhost:8000
- Swagger Docs: http://localhost:8000/docs
---
## API Reference
### Graph Endpoints
| Method | Endpoint | Description |
|--------|----------|-------------|
| `POST` | `/graph/create` | Create a new workflow graph |
| `GET` | `/graph/` | List all graphs |
| `GET` | `/graph/{graph_id}` | Get graph details + Mermaid diagram |
| `POST` | `/graph/run` | Execute a graph |
| `GET` | `/graph/state/{run_id}` | Get execution state |
### Tool Endpoints
| Method | Endpoint | Description |
|--------|----------|-------------|
| `GET` | `/tools/` | List all registered tools |
| `POST` | `/tools/register` | Register a new tool dynamically |
### WebSocket
| Endpoint | Description |
|----------|-------------|
| `/ws/run/{graph_id}` | Execute with real-time streaming |
---
## Sample Workflow: Code Review
The included demo workflow analyzes Python code quality:
```
Extract Functions -> Check Complexity -> Detect Issues --+--> END (pass)
|
+--> Improve -> (loop back)
```
### Try It
```bash
curl -X POST "https://kbsss-flowgraph.hf.space/graph/run" \
-H "Content-Type: application/json" \
-d '{
"graph_id": "code-review-demo",
"initial_state": {
"code": "def hello():\n print(\"world\")",
"quality_threshold": 6.0
}
}'
```
---
## Architecture
```
flowgraph/
β”œβ”€β”€ app/
β”‚ β”œβ”€β”€ main.py # FastAPI entry point
β”‚ β”œβ”€β”€ config.py # Configuration
β”‚ β”œβ”€β”€ api/
β”‚ β”‚ β”œβ”€β”€ schemas.py # Pydantic models
β”‚ β”‚ └── routes/
β”‚ β”‚ β”œβ”€β”€ graph.py # Graph CRUD + execution
β”‚ β”‚ β”œβ”€β”€ tools.py # Tool management
β”‚ β”‚ └── websocket.py # Real-time streaming
β”‚ β”œβ”€β”€ engine/
β”‚ β”‚ β”œβ”€β”€ state.py # Immutable state management
β”‚ β”‚ β”œβ”€β”€ node.py # Node definitions + decorators
β”‚ β”‚ β”œβ”€β”€ graph.py # Graph structure + validation
β”‚ β”‚ └── executor.py # Async workflow executor
β”‚ β”œβ”€β”€ tools/
β”‚ β”‚ β”œβ”€β”€ registry.py # Tool registry
β”‚ β”‚ └── builtin.py # Built-in tools
β”‚ β”œβ”€β”€ workflows/
β”‚ β”‚ └── code_review.py # Demo workflow
β”‚ └── storage/
β”‚ └── memory.py # In-memory storage
β”œβ”€β”€ tests/ # Test suite
β”œβ”€β”€ Dockerfile
β”œβ”€β”€ docker-compose.yml
└── requirements.txt
```
---
## Design Decisions
| Decision | Rationale |
|----------|-----------|
| Immutable state | Predictable flow, easier debugging, clear state transitions |
| Async-first | Scalability for long-running or I/O-bound workflows |
| Tool registry | Decouples node logic from handlers, enables dynamic registration |
| Named conditions | Clean serialization, human-readable graph definitions |
| In-memory storage | Simplicity first; easily swappable for Redis/PostgreSQL |
| Max iterations | Safety guard against infinite loops |
---
## Testing
```bash
# Run tests in Docker
docker compose exec workflow-engine pytest tests/ -v
# Run locally
pytest tests/ -v
```
---
## What I Would Improve
With more time, I would add:
1. Persistent Storage - PostgreSQL/Redis for production
2. Parallel Execution - Run independent nodes concurrently
3. Checkpointing - Save/restore execution state
4. Retry Logic - Automatic retry on node failures
5. Metrics - Prometheus/Grafana integration
6. Authentication - API key / JWT support
7. Visual Editor - Web UI for building workflows
---
## Creating Custom Workflows
### 1. Define a Node Handler
```python
from app.tools.registry import register_tool
@register_tool("my_processor")
def my_processor(data: str) -> dict:
return {"result": data.upper()}
```
### 2. Create via API
```json
POST /graph/create
{
"name": "my_workflow",
"nodes": [
{"name": "step1", "handler": "my_processor"},
{"name": "step2", "handler": "another_tool"}
],
"edges": {"step1": "step2"},
"entry_point": "step1"
}
```
### 3. Run It
```json
POST /graph/run
{
"graph_id": "returned_graph_id",
"initial_state": {"data": "hello"}
}
```
---
## License
MIT License - see [LICENSE](LICENSE) for details.