flowgraph / app /workflows /code_review.py
kbsss's picture
Upload folder using huggingface_hub
7b2787b verified
"""
Code Review Workflow Implementation.
This is the sample workflow demonstrating the workflow engine capabilities:
1. Extract functions from code
2. Check complexity
3. Detect issues
4. Suggest improvements
5. Loop until quality_score >= threshold
"""
from typing import Any, Dict
import logging
from app.engine.graph import Graph, END
from app.engine.node import node, NodeType
from app.tools.builtin import (
extract_functions,
calculate_complexity,
detect_issues,
suggest_improvements,
quality_check,
)
from app.tools.registry import tool_registry
logger = logging.getLogger(__name__)
# ============================================================
# Node Handlers (using the @node decorator)
# ============================================================
@node(name="extract_node", description="Extract functions from the input code")
def extract_node(state: Dict[str, Any]) -> Dict[str, Any]:
"""
Extract function definitions from the code.
Input state requires:
- code: str - The Python source code to analyze
Updates state with:
- functions: List[dict] - Extracted function information
- function_count: int - Number of functions found
"""
code = state.get("code", "")
result = extract_functions(code)
state.update(result)
logger.info(f"Extracted {result.get('function_count', 0)} functions")
return state
@node(name="complexity_node", description="Calculate code complexity metrics")
def complexity_node(state: Dict[str, Any]) -> Dict[str, Any]:
"""
Calculate complexity metrics for the code.
Uses state:
- code: str - Source code
- functions: List[dict] - Previously extracted functions
Updates state with:
- lines_of_code: int
- cyclomatic_complexity: int
- complexity_score: int (1-10)
"""
code = state.get("code", "")
functions = state.get("functions", [])
result = calculate_complexity(code, functions)
state.update(result)
logger.info(f"Complexity score: {result.get('complexity_score', 0)}")
return state
@node(name="issues_node", description="Detect code quality issues")
def issues_node(state: Dict[str, Any]) -> Dict[str, Any]:
"""
Detect code quality issues and calculate quality score.
Uses state:
- code: str - Source code
- functions: List[dict] - Extracted functions
- complexity_score: int - From complexity check
Updates state with:
- issues: List[dict] - Detected issues
- issue_count: int
- quality_score: float (1-10)
"""
code = state.get("code", "")
functions = state.get("functions", [])
complexity_score = state.get("complexity_score")
result = detect_issues(code, functions, complexity_score)
state.update(result)
logger.info(
f"Found {result.get('issue_count', 0)} issues, "
f"quality score: {result.get('quality_score', 0)}"
)
return state
@node(name="improve_node", description="Generate improvement suggestions")
def improve_node(state: Dict[str, Any]) -> Dict[str, Any]:
"""
Generate improvement suggestions based on detected issues.
Uses state:
- issues: List[dict] - Detected issues
- functions: List[dict] - Extracted functions
- quality_score: float - Current quality score
Updates state with:
- suggestions: List[dict] - Improvement suggestions
- suggestion_count: int
- potential_quality_score: float - Score after improvements
"""
issues = state.get("issues", [])
functions = state.get("functions", [])
quality_score = state.get("quality_score", 5.0)
result = suggest_improvements(issues, functions, quality_score)
state.update(result)
# Simulate improvement by slightly increasing quality score
# In a real scenario, this would involve actual code modifications
improvement = min(0.5, result.get("suggestion_count", 0) * 0.2)
state["quality_score"] = min(10, quality_score + improvement)
logger.info(
f"Generated {result.get('suggestion_count', 0)} suggestions, "
f"quality improved to {state['quality_score']}"
)
return state
# Register node handlers as tools so they can be retrieved when rebuilding from storage
def _wrapper_handler(handler_func):
"""Create a wrapper that works with tool registry."""
def wrapper(state: Dict[str, Any]) -> Dict[str, Any]:
return handler_func(state)
wrapper.__name__ = handler_func.__name__
wrapper.__doc__ = handler_func.__doc__
return wrapper
tool_registry.add(_wrapper_handler(extract_node), name="extract_node", description="Extract functions from code")
tool_registry.add(_wrapper_handler(complexity_node), name="complexity_node", description="Calculate complexity")
tool_registry.add(_wrapper_handler(issues_node), name="issues_node", description="Detect quality issues")
tool_registry.add(_wrapper_handler(improve_node), name="improve_node", description="Suggest improvements")
# ============================================================
# Condition Functions
# ============================================================
def quality_meets_threshold(state: Dict[str, Any]) -> str:
"""
Routing condition: check if quality meets threshold.
Returns:
- "pass" if quality_score >= quality_threshold
- "fail" if more improvement needed
"""
quality_score = state.get("quality_score", 0)
threshold = state.get("quality_threshold", 7.0)
if quality_score >= threshold:
logger.info(f"Quality {quality_score} meets threshold {threshold}")
return "pass"
else:
logger.info(f"Quality {quality_score} below threshold {threshold}")
return "fail"
def always_loop(state: Dict[str, Any]) -> str:
"""Always return to issues check after improvement."""
return "continue"
# ============================================================
# Workflow Factory
# ============================================================
def create_code_review_workflow(
max_iterations: int = 5,
quality_threshold: float = 7.0
) -> Graph:
"""
Create a Code Review workflow graph.
Workflow flow:
```
extract → complexity → issues ─┬─→ END (if pass)
└─→ improve → issues (loop if fail)
```
Args:
max_iterations: Maximum improvement loops
quality_threshold: Minimum quality score to pass
Returns:
Configured Graph instance
"""
graph = Graph(
name="Code Review Workflow",
description=(
"Analyzes Python code for quality issues and suggests improvements. "
f"Loops until quality score >= {quality_threshold} or max {max_iterations} iterations."
),
max_iterations=max_iterations,
)
# Add nodes
graph.add_node("extract", handler=extract_node, description="Extract functions from code")
graph.add_node("complexity", handler=complexity_node, description="Calculate complexity")
graph.add_node("issues", handler=issues_node, description="Detect quality issues")
graph.add_node("improve", handler=improve_node, description="Suggest improvements")
# Add edges
graph.add_edge("extract", "complexity")
graph.add_edge("complexity", "issues")
# Conditional edge: issues → END or improve
graph.add_conditional_edge(
"issues",
quality_meets_threshold,
{"pass": END, "fail": "improve"}
)
# Loop back from improve to issues
graph.add_conditional_edge(
"improve",
always_loop,
{"continue": "issues"}
)
# Set entry point
graph.set_entry_point("extract")
return graph
async def register_code_review_workflow():
"""
Register a pre-built Code Review workflow in storage.
This makes the workflow available immediately via the API
without needing to create it first.
"""
from app.storage.memory import graph_storage
workflow = create_code_review_workflow()
await graph_storage.save(
graph_id="code-review-demo",
name="Code Review Demo",
definition=workflow.to_dict(),
)
logger.info("Registered Code Review workflow with ID: code-review-demo")
return workflow
# ============================================================
# Example Usage
# ============================================================
async def run_code_review_demo():
"""
Demo function showing how to run the code review workflow.
Usage:
import asyncio
from app.workflows.code_review import run_code_review_demo
asyncio.run(run_code_review_demo())
"""
from app.engine.executor import execute_graph
# Sample code to review
sample_code = '''
def calculate_total(items):
total = 0
for item in items:
if item.price > 0:
if item.quantity > 0:
if item.discount:
total += item.price * item.quantity * (1 - item.discount)
else:
total += item.price * item.quantity
return total
def process_data(data):
result = []
for i in range(len(data)):
if data[i] > 100:
result.append(data[i] * 2)
else:
result.append(data[i])
print(result)
return result
def helper():
x = 42
return x * 1000
'''
# Create workflow
workflow = create_code_review_workflow(max_iterations=3, quality_threshold=6.0)
# Initial state
initial_state = {
"code": sample_code,
"quality_threshold": 6.0,
}
# Execute
print("Starting Code Review...")
result = await execute_graph(workflow, initial_state)
# Print results
print(f"\nExecution Status: {result.status.value}")
print(f"Total Duration: {result.total_duration_ms:.2f}ms")
print(f"Iterations: {result.iterations}")
print(f"\nFinal Quality Score: {result.final_state.get('quality_score', 'N/A')}")
print(f"Issues Found: {result.final_state.get('issue_count', 'N/A')}")
print(f"\nSuggestions:")
for suggestion in result.final_state.get("suggestions", []):
print(f" - [{suggestion['priority']}] {suggestion['suggestion']}")
return result
if __name__ == "__main__":
import asyncio
asyncio.run(run_code_review_demo())