TaskWeaver / services /workflow_service.py
PocketSkye's picture
Initial deployment
0242ab2
Raw
History Blame Contribute Delete
4.66 kB
import json
from datetime import datetime, timedelta
from sqlalchemy.orm import Session
from models.workflow import Workflow
from models.workflow_schedule import WorkflowSchedule
from agents.workflow_generator.agent import (
WorkflowGeneratorAgent
)
from agents.workflow_generator.parser import (
parse_workflow
)
from agents.workflow_validator.agent import (
WorkflowValidatorAgent
)
class WorkflowService:
def __init__(self):
self.generator = WorkflowGeneratorAgent()
self.validator = WorkflowValidatorAgent()
# Calculate next execution time
def calculate_next_run(
self,
schedule_type: str,
run_time: str,
day_of_week: str = None
):
now = datetime.utcnow()
hour, minute = map(
int,
run_time.split(":")
)
# Daily schedule
if schedule_type == "daily":
next_run = now.replace(
hour=hour,
minute=minute,
second=0,
microsecond=0
)
if next_run <= now:
next_run += timedelta(days=1)
return next_run
# Weekly schedule
if schedule_type == "weekly":
days = {
"monday": 0,
"tuesday": 1,
"wednesday": 2,
"thursday": 3,
"friday": 4,
"saturday": 5,
"sunday": 6
}
target_day = days[
day_of_week.lower()
]
current_day = now.weekday()
days_ahead = (
target_day - current_day
) % 7
next_run = now.replace(
hour=hour,
minute=minute,
second=0,
microsecond=0
) + timedelta(
days=days_ahead
)
if next_run <= now:
next_run += timedelta(days=7)
return next_run
def create_workflow(
self,
prompt: str,
user_id: int,
db: Session
):
# Generate workflow
raw_output = self.generator.generate(
prompt
)
print("\nRAW OUTPUT:")
print(raw_output)
# Parse workflow
workflow_json = parse_workflow(
raw_output
)
if "error" in workflow_json:
return {
"success": False,
"error": workflow_json["error"],
"raw_response": workflow_json[
"raw_response"
]
}
# Validate workflow
validation = self.validator.validate(
workflow_json
)
if not validation.get("valid"):
return {
"success": False,
"error": validation.get(
"message",
"Validation failed"
)
}
workflow_data = validation[
"workflow"
]
# Save workflow
workflow = Workflow(
user_id=user_id,
name=workflow_data[
"workflow_name"
],
prompt=prompt,
workflow_json=json.dumps(
workflow_data
)
)
db.add(workflow)
db.commit()
db.refresh(workflow)
# Save schedule if needed
if (
workflow_data[
"execution_type"
] == "scheduled"
and "schedule" in workflow_data
):
schedule_type = workflow_data[
"schedule"
]["type"]
run_time = workflow_data[
"schedule"
].get("time")
day_of_week = workflow_data[
"schedule"
].get("day_of_week")
# Calculate first execution time
next_run_at = self.calculate_next_run(
schedule_type,
run_time,
day_of_week
)
schedule = WorkflowSchedule(
workflow_id=workflow.id,
schedule_type=schedule_type,
run_time=run_time,
day_of_week=day_of_week,
next_run_at=next_run_at
)
db.add(schedule)
db.commit()
return {
"success": True,
"workflow_id": workflow.id,
"workflow_name": workflow.name,
"execution_type": workflow_data[
"execution_type"
]
}