uml / agents /analysis /sequence_analysis.py
Mohammed Foud
Add application file
5a2d62e
from base_agent import BaseAgent
from models import AgentState
from typing import AsyncGenerator
import json
class SequenceAnalysis(BaseAgent):
async def __call__(self, state: AgentState) -> AsyncGenerator[str, None]:
prompt_template = """
Conduct a comprehensive sequence analysis for the key workflow in:
Project: {project_name}
Description: {project_description}
Use Cases: {actors_use_cases}
Analysis Requirements:
1. Identify all participating components:
- Primary actors (initiators)
- Secondary systems/services
- Internal components
2. For each interaction step specify:
- Sender and receiver
- Message content/type
- Synchronization points
- Alternative/error flows
3. Include timing considerations where relevant
4. Note any parallel/concurrent activities
5. Highlight critical system boundaries
Format your response as:
### Workflow: [Workflow Name]
#### Components:
- [Component1] (type/role)
- [Component2] (type/role)
#### Main Flow:
1. [ComponentA] -> [ComponentB]: [Message/Purpose]
- Preconditions: [required state]
- Postconditions: [resulting state]
- Variations: [alternate paths]
2. [ComponentB] --> [ComponentA]: [Response]
...
#### Parallel Flows:
- Concurrent with step 2: [Description]
#### Error Handling:
- At step 3: [Error Condition] → [Recovery Path]
Example:
### Workflow: Order Processing
#### Components:
- Customer (primary actor)
- OrderService (core system)
- PaymentGateway (external service)
#### Main Flow:
1. Customer -> OrderService: SubmitOrder(cart)
- Preconditions: Cart not empty, user authenticated
- Postconditions: Order pending payment
- Variations: Invalid items → Rejection notice
2. OrderService -> PaymentGateway: ProcessPayment(details)
...
"""
async for chunk in self._stream_process(
state=state,
prompt_template=prompt_template,
output_key="sequence_interactions",
step_name="extract_sequence",
project_name=state["project_name"],
project_description=state["project_description"],
actors_use_cases=state["actors_use_cases"]
):
yield chunk