Spaces:
Sleeping
Sleeping
| """ | |
| Core Orchestration Logic - Course Builder Engine | |
| """ | |
| from typing import Dict, Any, List | |
| from app.contracts import EngineRequest, EngineResponse, ErrorDetail, InputItem | |
| from app.service_client import ServiceClient, ServiceError | |
| from app.config import config | |
| class CourseBuilderEngine: | |
| """ | |
| Orchestrator Engine that combines multiple services to build a course: | |
| 1. Transcription (Audio/Video) | |
| 2. Vision (Images - Future) | |
| 3. Ingestion (Course Structure Generation) | |
| """ | |
| def __init__(self): | |
| self.client = ServiceClient() | |
| self.engine_name = config.ENGINE_NAME | |
| async def run(self, request: EngineRequest) -> EngineResponse: | |
| """Standard engine entrypoint""" | |
| try: | |
| if request.action == "create_course": | |
| return await self._create_course(request) | |
| else: | |
| return self._error_response( | |
| request, | |
| "INVALID_ACTION", | |
| f"Action '{request.action}' not supported. Use 'create_course'" | |
| ) | |
| except Exception as e: | |
| return self._error_response(request, "ENGINE_ERROR", str(e)) | |
| async def _create_course(self, request: EngineRequest) -> EngineResponse: | |
| """Orchestrate the course creation flow""" | |
| items = [] | |
| # Support both direct text and items | |
| if request.input.text: | |
| items.append({ | |
| "type": "text", | |
| "text": request.input.text, | |
| "lang": request.options.get("lang", "en") | |
| }) | |
| for item in request.input.items: | |
| items.append(item.model_dump()) | |
| # If we have media references in refs that need processing | |
| media_id = request.input.refs.get("media_id") | |
| if media_id: | |
| # Assume it's audio/video for now as per current main.py logic | |
| # This is a bit simplified; real logic would detect kind | |
| trans = await self.client.run_transcription( | |
| media_id=media_id, | |
| action="transcribe.video", # Default to video for robust extraction | |
| actor_id=request.actor.user_id, | |
| opts=request.options | |
| ) | |
| items.extend(trans["result"].get("items", [])) | |
| if not items: | |
| return self._error_response(request, "NO_INPUT", "No text or items provided to build a course") | |
| # Ingest | |
| course_result = await self.client.run_ingestion( | |
| items=items, | |
| actor_id=request.actor.user_id, | |
| ctx=request.context, | |
| opts=request.options | |
| ) | |
| return EngineResponse( | |
| request_id=request.request_id, | |
| ok=True, | |
| status="success", | |
| engine=self.engine_name, | |
| action=request.action, | |
| result=course_result["result"], | |
| messages=[ | |
| "Orchestration complete", | |
| "Aggregated multiple inputs into ingestion engine", | |
| "Course generated successfully" | |
| ] | |
| ) | |
| def _error_response(self, request: EngineRequest, code: str, detail: str) -> EngineResponse: | |
| return EngineResponse( | |
| request_id=request.request_id, | |
| ok=False, | |
| status="error", | |
| engine=self.engine_name, | |
| action=request.action, | |
| error=ErrorDetail(code=code, detail=detail) | |
| ) | |