|
|
""" |
|
|
Code Review Workflow Implementation. |
|
|
|
|
|
This is the sample workflow demonstrating the workflow engine capabilities: |
|
|
1. Extract functions from code |
|
|
2. Check complexity |
|
|
3. Detect issues |
|
|
4. Suggest improvements |
|
|
5. Loop until quality_score >= threshold |
|
|
""" |
|
|
|
|
|
from typing import Any, Dict |
|
|
import logging |
|
|
|
|
|
from app.engine.graph import Graph, END |
|
|
from app.engine.node import node, NodeType |
|
|
from app.tools.builtin import ( |
|
|
extract_functions, |
|
|
calculate_complexity, |
|
|
detect_issues, |
|
|
suggest_improvements, |
|
|
quality_check, |
|
|
) |
|
|
from app.tools.registry import tool_registry |
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@node(name="extract_node", description="Extract functions from the input code") |
|
|
def extract_node(state: Dict[str, Any]) -> Dict[str, Any]: |
|
|
""" |
|
|
Extract function definitions from the code. |
|
|
|
|
|
Input state requires: |
|
|
- code: str - The Python source code to analyze |
|
|
|
|
|
Updates state with: |
|
|
- functions: List[dict] - Extracted function information |
|
|
- function_count: int - Number of functions found |
|
|
""" |
|
|
code = state.get("code", "") |
|
|
result = extract_functions(code) |
|
|
state.update(result) |
|
|
logger.info(f"Extracted {result.get('function_count', 0)} functions") |
|
|
return state |
|
|
|
|
|
|
|
|
@node(name="complexity_node", description="Calculate code complexity metrics") |
|
|
def complexity_node(state: Dict[str, Any]) -> Dict[str, Any]: |
|
|
""" |
|
|
Calculate complexity metrics for the code. |
|
|
|
|
|
Uses state: |
|
|
- code: str - Source code |
|
|
- functions: List[dict] - Previously extracted functions |
|
|
|
|
|
Updates state with: |
|
|
- lines_of_code: int |
|
|
- cyclomatic_complexity: int |
|
|
- complexity_score: int (1-10) |
|
|
""" |
|
|
code = state.get("code", "") |
|
|
functions = state.get("functions", []) |
|
|
result = calculate_complexity(code, functions) |
|
|
state.update(result) |
|
|
logger.info(f"Complexity score: {result.get('complexity_score', 0)}") |
|
|
return state |
|
|
|
|
|
|
|
|
@node(name="issues_node", description="Detect code quality issues") |
|
|
def issues_node(state: Dict[str, Any]) -> Dict[str, Any]: |
|
|
""" |
|
|
Detect code quality issues and calculate quality score. |
|
|
|
|
|
Uses state: |
|
|
- code: str - Source code |
|
|
- functions: List[dict] - Extracted functions |
|
|
- complexity_score: int - From complexity check |
|
|
|
|
|
Updates state with: |
|
|
- issues: List[dict] - Detected issues |
|
|
- issue_count: int |
|
|
- quality_score: float (1-10) |
|
|
""" |
|
|
code = state.get("code", "") |
|
|
functions = state.get("functions", []) |
|
|
complexity_score = state.get("complexity_score") |
|
|
|
|
|
result = detect_issues(code, functions, complexity_score) |
|
|
state.update(result) |
|
|
|
|
|
logger.info( |
|
|
f"Found {result.get('issue_count', 0)} issues, " |
|
|
f"quality score: {result.get('quality_score', 0)}" |
|
|
) |
|
|
return state |
|
|
|
|
|
|
|
|
@node(name="improve_node", description="Generate improvement suggestions") |
|
|
def improve_node(state: Dict[str, Any]) -> Dict[str, Any]: |
|
|
""" |
|
|
Generate improvement suggestions based on detected issues. |
|
|
|
|
|
Uses state: |
|
|
- issues: List[dict] - Detected issues |
|
|
- functions: List[dict] - Extracted functions |
|
|
- quality_score: float - Current quality score |
|
|
|
|
|
Updates state with: |
|
|
- suggestions: List[dict] - Improvement suggestions |
|
|
- suggestion_count: int |
|
|
- potential_quality_score: float - Score after improvements |
|
|
""" |
|
|
issues = state.get("issues", []) |
|
|
functions = state.get("functions", []) |
|
|
quality_score = state.get("quality_score", 5.0) |
|
|
|
|
|
result = suggest_improvements(issues, functions, quality_score) |
|
|
state.update(result) |
|
|
|
|
|
|
|
|
|
|
|
improvement = min(0.5, result.get("suggestion_count", 0) * 0.2) |
|
|
state["quality_score"] = min(10, quality_score + improvement) |
|
|
|
|
|
logger.info( |
|
|
f"Generated {result.get('suggestion_count', 0)} suggestions, " |
|
|
f"quality improved to {state['quality_score']}" |
|
|
) |
|
|
return state |
|
|
|
|
|
|
|
|
|
|
|
def _wrapper_handler(handler_func): |
|
|
"""Create a wrapper that works with tool registry.""" |
|
|
def wrapper(state: Dict[str, Any]) -> Dict[str, Any]: |
|
|
return handler_func(state) |
|
|
wrapper.__name__ = handler_func.__name__ |
|
|
wrapper.__doc__ = handler_func.__doc__ |
|
|
return wrapper |
|
|
|
|
|
tool_registry.add(_wrapper_handler(extract_node), name="extract_node", description="Extract functions from code") |
|
|
tool_registry.add(_wrapper_handler(complexity_node), name="complexity_node", description="Calculate complexity") |
|
|
tool_registry.add(_wrapper_handler(issues_node), name="issues_node", description="Detect quality issues") |
|
|
tool_registry.add(_wrapper_handler(improve_node), name="improve_node", description="Suggest improvements") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def quality_meets_threshold(state: Dict[str, Any]) -> str: |
|
|
""" |
|
|
Routing condition: check if quality meets threshold. |
|
|
|
|
|
Returns: |
|
|
- "pass" if quality_score >= quality_threshold |
|
|
- "fail" if more improvement needed |
|
|
""" |
|
|
quality_score = state.get("quality_score", 0) |
|
|
threshold = state.get("quality_threshold", 7.0) |
|
|
|
|
|
if quality_score >= threshold: |
|
|
logger.info(f"Quality {quality_score} meets threshold {threshold}") |
|
|
return "pass" |
|
|
else: |
|
|
logger.info(f"Quality {quality_score} below threshold {threshold}") |
|
|
return "fail" |
|
|
|
|
|
|
|
|
def always_loop(state: Dict[str, Any]) -> str: |
|
|
"""Always return to issues check after improvement.""" |
|
|
return "continue" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_code_review_workflow( |
|
|
max_iterations: int = 5, |
|
|
quality_threshold: float = 7.0 |
|
|
) -> Graph: |
|
|
""" |
|
|
Create a Code Review workflow graph. |
|
|
|
|
|
Workflow flow: |
|
|
``` |
|
|
extract → complexity → issues ─┬─→ END (if pass) |
|
|
│ |
|
|
└─→ improve → issues (loop if fail) |
|
|
``` |
|
|
|
|
|
Args: |
|
|
max_iterations: Maximum improvement loops |
|
|
quality_threshold: Minimum quality score to pass |
|
|
|
|
|
Returns: |
|
|
Configured Graph instance |
|
|
""" |
|
|
graph = Graph( |
|
|
name="Code Review Workflow", |
|
|
description=( |
|
|
"Analyzes Python code for quality issues and suggests improvements. " |
|
|
f"Loops until quality score >= {quality_threshold} or max {max_iterations} iterations." |
|
|
), |
|
|
max_iterations=max_iterations, |
|
|
) |
|
|
|
|
|
|
|
|
graph.add_node("extract", handler=extract_node, description="Extract functions from code") |
|
|
graph.add_node("complexity", handler=complexity_node, description="Calculate complexity") |
|
|
graph.add_node("issues", handler=issues_node, description="Detect quality issues") |
|
|
graph.add_node("improve", handler=improve_node, description="Suggest improvements") |
|
|
|
|
|
|
|
|
graph.add_edge("extract", "complexity") |
|
|
graph.add_edge("complexity", "issues") |
|
|
|
|
|
|
|
|
graph.add_conditional_edge( |
|
|
"issues", |
|
|
quality_meets_threshold, |
|
|
{"pass": END, "fail": "improve"} |
|
|
) |
|
|
|
|
|
|
|
|
graph.add_conditional_edge( |
|
|
"improve", |
|
|
always_loop, |
|
|
{"continue": "issues"} |
|
|
) |
|
|
|
|
|
|
|
|
graph.set_entry_point("extract") |
|
|
|
|
|
return graph |
|
|
|
|
|
|
|
|
async def register_code_review_workflow(): |
|
|
""" |
|
|
Register a pre-built Code Review workflow in storage. |
|
|
|
|
|
This makes the workflow available immediately via the API |
|
|
without needing to create it first. |
|
|
""" |
|
|
from app.storage.memory import graph_storage |
|
|
|
|
|
workflow = create_code_review_workflow() |
|
|
|
|
|
await graph_storage.save( |
|
|
graph_id="code-review-demo", |
|
|
name="Code Review Demo", |
|
|
definition=workflow.to_dict(), |
|
|
) |
|
|
|
|
|
logger.info("Registered Code Review workflow with ID: code-review-demo") |
|
|
return workflow |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def run_code_review_demo(): |
|
|
""" |
|
|
Demo function showing how to run the code review workflow. |
|
|
|
|
|
Usage: |
|
|
import asyncio |
|
|
from app.workflows.code_review import run_code_review_demo |
|
|
asyncio.run(run_code_review_demo()) |
|
|
""" |
|
|
from app.engine.executor import execute_graph |
|
|
|
|
|
|
|
|
sample_code = ''' |
|
|
def calculate_total(items): |
|
|
total = 0 |
|
|
for item in items: |
|
|
if item.price > 0: |
|
|
if item.quantity > 0: |
|
|
if item.discount: |
|
|
total += item.price * item.quantity * (1 - item.discount) |
|
|
else: |
|
|
total += item.price * item.quantity |
|
|
return total |
|
|
|
|
|
def process_data(data): |
|
|
result = [] |
|
|
for i in range(len(data)): |
|
|
if data[i] > 100: |
|
|
result.append(data[i] * 2) |
|
|
else: |
|
|
result.append(data[i]) |
|
|
print(result) |
|
|
return result |
|
|
|
|
|
|
|
|
def helper(): |
|
|
x = 42 |
|
|
return x * 1000 |
|
|
''' |
|
|
|
|
|
|
|
|
workflow = create_code_review_workflow(max_iterations=3, quality_threshold=6.0) |
|
|
|
|
|
|
|
|
initial_state = { |
|
|
"code": sample_code, |
|
|
"quality_threshold": 6.0, |
|
|
} |
|
|
|
|
|
|
|
|
print("Starting Code Review...") |
|
|
result = await execute_graph(workflow, initial_state) |
|
|
|
|
|
|
|
|
print(f"\nExecution Status: {result.status.value}") |
|
|
print(f"Total Duration: {result.total_duration_ms:.2f}ms") |
|
|
print(f"Iterations: {result.iterations}") |
|
|
print(f"\nFinal Quality Score: {result.final_state.get('quality_score', 'N/A')}") |
|
|
print(f"Issues Found: {result.final_state.get('issue_count', 'N/A')}") |
|
|
print(f"\nSuggestions:") |
|
|
for suggestion in result.final_state.get("suggestions", []): |
|
|
print(f" - [{suggestion['priority']}] {suggestion['suggestion']}") |
|
|
|
|
|
return result |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
import asyncio |
|
|
asyncio.run(run_code_review_demo()) |
|
|
|