File size: 9,592 Bytes
a9dc537
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
"""
Workflow execution and monitoring endpoints
"""

from fastapi import APIRouter, BackgroundTasks, HTTPException, WebSocket, WebSocketDisconnect
from pydantic import BaseModel
from typing import Dict, List
import uuid
from datetime import datetime
import asyncio
from loguru import logger

router = APIRouter()

class WorkflowRequest(BaseModel):
    """Request to start a workflow"""
    patent_id: str
    scenario: str = "patent_wakeup"

class WorkflowResponse(BaseModel):
    """Workflow execution response"""
    workflow_id: str
    status: str
    message: str

@router.post("/execute", response_model=WorkflowResponse)
async def execute_workflow(
    request: WorkflowRequest,
    background_tasks: BackgroundTasks
):
    """
    Start Patent Wake-Up workflow execution.

    Args:
        request: Workflow execution request

    Returns:
        Workflow ID for tracking progress
    """
    from api.main import app_state

    # Validate patent exists
    if request.patent_id not in app_state["patents"]:
        raise HTTPException(
            status_code=404,
            detail=f"Patent not found: {request.patent_id}"
        )

    # Generate workflow ID
    workflow_id = str(uuid.uuid4())

    # Initialize workflow state
    workflow_state = {
        "id": workflow_id,
        "patent_id": request.patent_id,
        "scenario": request.scenario,
        "status": "queued",
        "progress": 0,
        "current_step": None,
        "started_at": datetime.utcnow().isoformat(),
        "completed_at": None,
        "execution_time_seconds": None,
        "result": None,
        "error": None,
        "steps": []
    }

    app_state["workflows"][workflow_id] = workflow_state

    # Update patent status
    app_state["patents"][request.patent_id]["status"] = "analyzing"
    app_state["patents"][request.patent_id]["workflow_id"] = workflow_id

    logger.info(f"πŸš€ Starting workflow {workflow_id} for patent {request.patent_id}")

    # Execute workflow in background
    background_tasks.add_task(
        run_workflow,
        workflow_id,
        request.patent_id,
        request.scenario
    )

    return WorkflowResponse(
        workflow_id=workflow_id,
        status="queued",
        message="Workflow started successfully"
    )

async def run_workflow(workflow_id: str, patent_id: str, scenario: str):
    """
    Background task to execute workflow.

    Args:
        workflow_id: Unique workflow identifier
        patent_id: Patent to analyze
        scenario: Workflow scenario type
    """
    from api.main import app_state
    from src.workflow.langgraph_state import ScenarioType

    workflow_state = app_state["workflows"][workflow_id]
    patent = app_state["patents"][patent_id]

    start_time = datetime.utcnow()

    try:
        logger.info(f"πŸ“Š Executing workflow {workflow_id}...")

        # Update status
        workflow_state["status"] = "running"
        workflow_state["progress"] = 10
        workflow_state["current_step"] = "initializing"

        # Determine scenario
        scenario_map = {
            "patent_wakeup": ScenarioType.PATENT_WAKEUP
        }
        scenario_type = scenario_map.get(scenario, ScenarioType.PATENT_WAKEUP)

        # Execute Patent Wake-Up workflow
        logger.info(f"Analyzing patent: {patent['filename']}")

        workflow_state["current_step"] = "document_analysis"
        workflow_state["progress"] = 25

        result = await app_state["workflow"].run(
            task_description=f"Analyze patent: {patent['filename']} and create valorization roadmap",
            scenario=scenario_type,
            input_data={"patent_path": patent["path"]},
            task_id=workflow_id
        )

        # Calculate execution time
        end_time = datetime.utcnow()
        execution_time = (end_time - start_time).total_seconds()

        # Process result
        workflow_state["status"] = "completed"
        workflow_state["progress"] = 100
        workflow_state["current_step"] = "completed"
        workflow_state["completed_at"] = end_time.isoformat()
        workflow_state["execution_time_seconds"] = execution_time

        # Store detailed results
        workflow_state["result"] = {
            "success": result.success,
            "quality_score": result.quality_score,
            "iterations_used": result.iterations_used,
            "status_value": result.status.value,

            # Document Analysis
            "document_analysis": result.agent_outputs.get("document_analysis"),

            # Market Analysis
            "market_analysis": result.agent_outputs.get("market_analysis"),

            # Stakeholder Matches
            "matches": result.agent_outputs.get("matches", []),

            # Valorization Brief
            "brief": result.agent_outputs.get("brief"),

            # Executor summary
            "executor_output": result.agent_outputs.get("executor", {})
        }

        # Update patent status
        patent["status"] = "analyzed"

        logger.success(f"βœ… Workflow {workflow_id} completed in {execution_time:.1f}s")

    except Exception as e:
        logger.error(f"❌ Workflow {workflow_id} failed: {e}")

        workflow_state["status"] = "failed"
        workflow_state["error"] = str(e)
        workflow_state["completed_at"] = datetime.utcnow().isoformat()

        # Update patent status
        patent["status"] = "failed"

        import traceback
        traceback.print_exc()

@router.get("/{workflow_id}", response_model=Dict)
async def get_workflow(workflow_id: str):
    """
    Get workflow status and results.

    Args:
        workflow_id: Unique workflow identifier

    Returns:
        Workflow state including results if completed
    """
    from api.main import app_state

    if workflow_id not in app_state["workflows"]:
        raise HTTPException(
            status_code=404,
            detail=f"Workflow not found: {workflow_id}"
        )

    return app_state["workflows"][workflow_id]

@router.get("/", response_model=List[Dict])
async def list_workflows(
    status: str = None,
    limit: int = 100,
    offset: int = 0
):
    """
    List all workflows.

    Args:
        status: Filter by status (queued, running, completed, failed)
        limit: Maximum number of results
        offset: Pagination offset

    Returns:
        List of workflow states
    """
    from api.main import app_state

    workflows = list(app_state["workflows"].values())

    # Filter by status if provided
    if status:
        workflows = [w for w in workflows if w["status"] == status]

    # Sort by start time (newest first)
    workflows.sort(key=lambda x: x["started_at"], reverse=True)

    # Pagination
    workflows = workflows[offset:offset + limit]

    return workflows

@router.websocket("/{workflow_id}/stream")
async def stream_workflow(websocket: WebSocket, workflow_id: str):
    """
    WebSocket endpoint for real-time workflow updates.

    Args:
        websocket: WebSocket connection
        workflow_id: Workflow to stream
    """
    from api.main import app_state

    await websocket.accept()

    logger.info(f"πŸ“‘ WebSocket connected for workflow {workflow_id}")

    if workflow_id not in app_state["workflows"]:
        await websocket.send_json({"error": "Workflow not found"})
        await websocket.close()
        return

    try:
        # Send updates every second until workflow completes
        while True:
            workflow_state = app_state["workflows"].get(workflow_id)

            if not workflow_state:
                await websocket.send_json({"error": "Workflow removed"})
                break

            # Send current state
            await websocket.send_json(workflow_state)

            # Check if workflow is done
            if workflow_state["status"] in ["completed", "failed"]:
                logger.info(f"Workflow {workflow_id} finished, closing WebSocket")
                break

            # Wait before next update
            await asyncio.sleep(1)

    except WebSocketDisconnect:
        logger.info(f"WebSocket disconnected for workflow {workflow_id}")
    except Exception as e:
        logger.error(f"WebSocket error: {e}")
    finally:
        await websocket.close()

@router.get("/{workflow_id}/brief/download")
async def download_brief(workflow_id: str):
    """
    Download the generated valorization brief.

    Args:
        workflow_id: Workflow identifier

    Returns:
        PDF file
    """
    from api.main import app_state
    from fastapi.responses import FileResponse
    from pathlib import Path

    if workflow_id not in app_state["workflows"]:
        raise HTTPException(
            status_code=404,
            detail="Workflow not found"
        )

    workflow = app_state["workflows"][workflow_id]

    if workflow["status"] != "completed":
        raise HTTPException(
            status_code=400,
            detail="Workflow not yet completed"
        )

    # Get brief path
    result = workflow.get("result") or {}
    brief = result.get("brief") or {}
    pdf_path = brief.get("pdf_path") if isinstance(brief, dict) else None

    if not pdf_path:
        raise HTTPException(
            status_code=404,
            detail="Valorization brief not found"
        )

    file_path = Path(pdf_path)

    if not file_path.exists():
        raise HTTPException(
            status_code=404,
            detail="Brief file not found on disk"
        )

    return FileResponse(
        path=file_path,
        media_type="application/pdf",
        filename=file_path.name
    )