flowgraph / README.md
kbsss's picture
Upload folder using huggingface_hub
85e43f2 verified
metadata
title: FlowGraph
emoji: πŸ”„
colorFrom: blue
colorTo: purple
sdk: docker
pinned: false
license: mit
app_port: 7860

FlowGraph

A lightweight, async-first workflow orchestration engine for building agent pipelines in Python.

Python FastAPI License

A minimal but powerful graph-based workflow engine similar to LangGraph. Define sequences of steps (nodes), connect them with edges, maintain shared state, and run workflows via REST APIs.

Live Demo: https://kbsss-flowgraph.hf.space/docs


Features

Feature Description
Nodes Python functions that read and modify shared state
Edges Define which node runs after which
Branching Conditional routing based on state values
Looping Run nodes repeatedly until conditions are met
Async Full async/await support for scalability
WebSocket Real-time execution streaming
Visualization Auto-generated Mermaid diagrams

Quick Start

With Docker (Recommended)

git clone https://github.com/kbss0000/flowgraph.git
cd flowgraph
docker compose up -d
curl http://localhost:8000/health

Without Docker

python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
python run.py

Access Points:


API Reference

Graph Endpoints

Method Endpoint Description
POST /graph/create Create a new workflow graph
GET /graph/ List all graphs
GET /graph/{graph_id} Get graph details + Mermaid diagram
POST /graph/run Execute a graph
GET /graph/state/{run_id} Get execution state

Tool Endpoints

Method Endpoint Description
GET /tools/ List all registered tools
POST /tools/register Register a new tool dynamically

WebSocket

Endpoint Description
/ws/run/{graph_id} Execute with real-time streaming

Sample Workflow: Code Review

The included demo workflow analyzes Python code quality:

Extract Functions -> Check Complexity -> Detect Issues --+--> END (pass)
                                                         |
                                                         +--> Improve -> (loop back)

Try It

curl -X POST "https://kbsss-flowgraph.hf.space/graph/run" \
  -H "Content-Type: application/json" \
  -d '{
    "graph_id": "code-review-demo",
    "initial_state": {
      "code": "def hello():\n    print(\"world\")",
      "quality_threshold": 6.0
    }
  }'

Architecture

flowgraph/
β”œβ”€β”€ app/
β”‚   β”œβ”€β”€ main.py              # FastAPI entry point
β”‚   β”œβ”€β”€ config.py            # Configuration
β”‚   β”œβ”€β”€ api/
β”‚   β”‚   β”œβ”€β”€ schemas.py       # Pydantic models
β”‚   β”‚   └── routes/
β”‚   β”‚       β”œβ”€β”€ graph.py     # Graph CRUD + execution
β”‚   β”‚       β”œβ”€β”€ tools.py     # Tool management
β”‚   β”‚       └── websocket.py # Real-time streaming
β”‚   β”œβ”€β”€ engine/
β”‚   β”‚   β”œβ”€β”€ state.py         # Immutable state management
β”‚   β”‚   β”œβ”€β”€ node.py          # Node definitions + decorators
β”‚   β”‚   β”œβ”€β”€ graph.py         # Graph structure + validation
β”‚   β”‚   └── executor.py      # Async workflow executor
β”‚   β”œβ”€β”€ tools/
β”‚   β”‚   β”œβ”€β”€ registry.py      # Tool registry
β”‚   β”‚   └── builtin.py       # Built-in tools
β”‚   β”œβ”€β”€ workflows/
β”‚   β”‚   └── code_review.py   # Demo workflow
β”‚   └── storage/
β”‚       └── memory.py        # In-memory storage
β”œβ”€β”€ tests/                   # Test suite
β”œβ”€β”€ Dockerfile
β”œβ”€β”€ docker-compose.yml
└── requirements.txt

Design Decisions

Decision Rationale
Immutable state Predictable flow, easier debugging, clear state transitions
Async-first Scalability for long-running or I/O-bound workflows
Tool registry Decouples node logic from handlers, enables dynamic registration
Named conditions Clean serialization, human-readable graph definitions
In-memory storage Simplicity first; easily swappable for Redis/PostgreSQL
Max iterations Safety guard against infinite loops

Testing

# Run tests in Docker
docker compose exec workflow-engine pytest tests/ -v

# Run locally
pytest tests/ -v

What I Would Improve

With more time, I would add:

  1. Persistent Storage - PostgreSQL/Redis for production
  2. Parallel Execution - Run independent nodes concurrently
  3. Checkpointing - Save/restore execution state
  4. Retry Logic - Automatic retry on node failures
  5. Metrics - Prometheus/Grafana integration
  6. Authentication - API key / JWT support
  7. Visual Editor - Web UI for building workflows

Creating Custom Workflows

1. Define a Node Handler

from app.tools.registry import register_tool

@register_tool("my_processor")
def my_processor(data: str) -> dict:
    return {"result": data.upper()}

2. Create via API

POST /graph/create
{
  "name": "my_workflow",
  "nodes": [
    {"name": "step1", "handler": "my_processor"},
    {"name": "step2", "handler": "another_tool"}
  ],
  "edges": {"step1": "step2"},
  "entry_point": "step1"
}

3. Run It

POST /graph/run
{
  "graph_id": "returned_graph_id",
  "initial_state": {"data": "hello"}
}

License

MIT License - see LICENSE for details.