flowgraph / app /storage /memory.py
kbsss's picture
Upload folder using huggingface_hub
7b2787b verified
"""
In-Memory Storage for Workflow Engine.
Provides thread-safe storage for graphs and execution runs.
Can be easily replaced with a database implementation.
"""
from typing import Any, Dict, List, Optional
from datetime import datetime
import asyncio
from dataclasses import dataclass, field
@dataclass
class StoredGraph:
"""A stored graph definition."""
graph_id: str
name: str
definition: Dict[str, Any]
created_at: datetime = field(default_factory=datetime.now)
updated_at: datetime = field(default_factory=datetime.now)
def to_dict(self) -> Dict[str, Any]:
return {
"graph_id": self.graph_id,
"name": self.name,
"definition": self.definition,
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat(),
}
@dataclass
class StoredRun:
"""A stored execution run."""
run_id: str
graph_id: str
status: str
initial_state: Dict[str, Any]
current_state: Dict[str, Any] = field(default_factory=dict)
final_state: Optional[Dict[str, Any]] = None
execution_log: List[Dict[str, Any]] = field(default_factory=list)
current_node: Optional[str] = None
iteration: int = 0
started_at: datetime = field(default_factory=datetime.now)
completed_at: Optional[datetime] = None
error: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
return {
"run_id": self.run_id,
"graph_id": self.graph_id,
"status": self.status,
"initial_state": self.initial_state,
"current_state": self.current_state,
"final_state": self.final_state,
"execution_log": self.execution_log,
"current_node": self.current_node,
"iteration": self.iteration,
"started_at": self.started_at.isoformat(),
"completed_at": self.completed_at.isoformat() if self.completed_at else None,
"error": self.error,
}
class GraphStorage:
"""
Thread-safe in-memory storage for workflow graphs.
Stores graph definitions by their ID, allowing creation,
retrieval, update, and deletion operations.
"""
def __init__(self):
self._graphs: Dict[str, StoredGraph] = {}
self._lock = asyncio.Lock()
async def save(self, graph_id: str, name: str, definition: Dict[str, Any]) -> StoredGraph:
"""
Save a graph definition.
Args:
graph_id: Unique graph identifier
name: Graph name
definition: Graph definition dict
Returns:
The stored graph
"""
async with self._lock:
stored = StoredGraph(
graph_id=graph_id,
name=name,
definition=definition,
)
self._graphs[graph_id] = stored
return stored
async def get(self, graph_id: str) -> Optional[StoredGraph]:
"""Get a graph by ID."""
async with self._lock:
return self._graphs.get(graph_id)
async def update(self, graph_id: str, definition: Dict[str, Any]) -> Optional[StoredGraph]:
"""Update a graph definition."""
async with self._lock:
if graph_id not in self._graphs:
return None
stored = self._graphs[graph_id]
stored.definition = definition
stored.updated_at = datetime.now()
return stored
async def delete(self, graph_id: str) -> bool:
"""Delete a graph."""
async with self._lock:
if graph_id in self._graphs:
del self._graphs[graph_id]
return True
return False
async def list_all(self) -> List[StoredGraph]:
"""List all stored graphs."""
async with self._lock:
return list(self._graphs.values())
async def exists(self, graph_id: str) -> bool:
"""Check if a graph exists."""
async with self._lock:
return graph_id in self._graphs
def __len__(self) -> int:
return len(self._graphs)
class RunStorage:
"""
Thread-safe in-memory storage for execution runs.
Stores run state, allowing real-time updates and queries
for ongoing and completed runs.
"""
def __init__(self):
self._runs: Dict[str, StoredRun] = {}
self._lock = asyncio.Lock()
async def create(
self,
run_id: str,
graph_id: str,
initial_state: Dict[str, Any]
) -> StoredRun:
"""
Create a new run.
Args:
run_id: Unique run identifier
graph_id: Associated graph ID
initial_state: Initial state data
Returns:
The stored run
"""
async with self._lock:
stored = StoredRun(
run_id=run_id,
graph_id=graph_id,
status="pending",
initial_state=initial_state,
current_state=initial_state.copy(),
)
self._runs[run_id] = stored
return stored
async def get(self, run_id: str) -> Optional[StoredRun]:
"""Get a run by ID."""
async with self._lock:
return self._runs.get(run_id)
async def update_state(
self,
run_id: str,
current_state: Dict[str, Any],
current_node: Optional[str] = None,
iteration: Optional[int] = None
) -> Optional[StoredRun]:
"""Update the current state of a run."""
async with self._lock:
if run_id not in self._runs:
return None
stored = self._runs[run_id]
stored.current_state = current_state
stored.status = "running"
if current_node is not None:
stored.current_node = current_node
if iteration is not None:
stored.iteration = iteration
return stored
async def add_log_entry(
self,
run_id: str,
entry: Dict[str, Any]
) -> Optional[StoredRun]:
"""Add an entry to the execution log."""
async with self._lock:
if run_id not in self._runs:
return None
self._runs[run_id].execution_log.append(entry)
return self._runs[run_id]
async def complete(
self,
run_id: str,
final_state: Dict[str, Any],
execution_log: List[Dict[str, Any]]
) -> Optional[StoredRun]:
"""Mark a run as completed."""
async with self._lock:
if run_id not in self._runs:
return None
stored = self._runs[run_id]
stored.status = "completed"
stored.final_state = final_state
stored.execution_log = execution_log
stored.completed_at = datetime.now()
return stored
async def fail(
self,
run_id: str,
error: str,
final_state: Optional[Dict[str, Any]] = None
) -> Optional[StoredRun]:
"""Mark a run as failed."""
async with self._lock:
if run_id not in self._runs:
return None
stored = self._runs[run_id]
stored.status = "failed"
stored.error = error
stored.final_state = final_state
stored.completed_at = datetime.now()
return stored
async def list_all(self) -> List[StoredRun]:
"""List all runs."""
async with self._lock:
return list(self._runs.values())
async def list_by_graph(self, graph_id: str) -> List[StoredRun]:
"""List all runs for a specific graph."""
async with self._lock:
return [r for r in self._runs.values() if r.graph_id == graph_id]
async def delete(self, run_id: str) -> bool:
"""Delete a run."""
async with self._lock:
if run_id in self._runs:
del self._runs[run_id]
return True
return False
def __len__(self) -> int:
return len(self._runs)
# Global storage instances
graph_storage = GraphStorage()
run_storage = RunStorage()