File size: 3,564 Bytes
0242ab2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
import json

from datetime import datetime

from fastapi import HTTPException
from sqlalchemy.orm import Session

from models.workflow import Workflow
from models.workflow_run import WorkflowRun

from executor.executor import (
    WorkflowExecutor
)

from browser.playwright_client import (
    PlaywrightClient
)

from agents.result_formatter.agent import (
    ResultFormatterAgent
)


class ExecutionService:

    # Used by API route
    async def run_workflow(
        self,
        workflow_id: int,
        user_id: int,
        db: Session
    ):

        workflow = db.query(
            Workflow
        ).filter(
            Workflow.id == workflow_id,
            Workflow.user_id == user_id
        ).first()

        if not workflow:

            raise HTTPException(
                status_code=404,
                detail="Workflow not found"
            )

        run = WorkflowRun(
            workflow_id=workflow.id,
            status="running",
            started_at=datetime.utcnow()
        )

        db.add(run)
        db.commit()
        db.refresh(run)

        workflow_json = json.loads(
            workflow.workflow_json
        )

        client = PlaywrightClient()

        (
            playwright,
            browser,
            page
        ) = await client.get_page()

        try:

            executor = WorkflowExecutor()

            print("WORKFLOW JSON:")
            print(workflow_json)

            results = await executor.execute(
                page,
                workflow_json,
                db,
                run.id
            )
            formatter = ResultFormatterAgent()

            formatted_result = formatter.format(
                workflow.prompt,
                workflow.name,
                json.dumps(results)
            )

            run.status = "completed"

            run.completed_at = (
                datetime.utcnow()
            )

            run.duration = (
                run.completed_at -
                run.started_at
            ).total_seconds()

            run.result_json = formatted_result

            db.commit()

            return {
                "success": True,
                "run_id": run.id,
                "workflow_id": workflow.id,
                "workflow_name": workflow.name,
                "status": run.status,
                "duration": run.duration,
                "results": formatted_result
            }

        except Exception as e:

            import traceback
            traceback.print_exc()

            run.status = "failed"

            run.completed_at = (
                datetime.utcnow()
            )

            run.duration = (
                run.completed_at -
                run.started_at
            ).total_seconds()

            run.error_message = str(e)

            db.commit()

            raise HTTPException(
                status_code=500,
                detail=str(e)
            )

        finally:

            await browser.close()
            await playwright.stop()

    # Used by scheduler worker
    async def run_scheduled_workflow(
        self,
        workflow_id: int,
        db: Session
    ):

        workflow = db.query(
            Workflow
        ).filter(
            Workflow.id == workflow_id
        ).first()

        if not workflow:

            print(
                f"Workflow {workflow_id} not found"
            )

            return

        return await self.run_workflow(
            workflow_id,
            workflow.user_id,
            db
        )