cb-engine / app /engine.py
Godswill-IoT's picture
Upload 12 files
ec66988 verified
"""
Core Orchestration Logic - Course Builder Engine
"""
from typing import Dict, Any, List
from app.contracts import EngineRequest, EngineResponse, ErrorDetail, InputItem
from app.service_client import ServiceClient, ServiceError
from app.config import config
class CourseBuilderEngine:
"""
Orchestrator Engine that combines multiple services to build a course:
1. Transcription (Audio/Video)
2. Vision (Images - Future)
3. Ingestion (Course Structure Generation)
"""
def __init__(self):
self.client = ServiceClient()
self.engine_name = config.ENGINE_NAME
async def run(self, request: EngineRequest) -> EngineResponse:
"""Standard engine entrypoint"""
try:
if request.action == "create_course":
return await self._create_course(request)
else:
return self._error_response(
request,
"INVALID_ACTION",
f"Action '{request.action}' not supported. Use 'create_course'"
)
except Exception as e:
return self._error_response(request, "ENGINE_ERROR", str(e))
async def _create_course(self, request: EngineRequest) -> EngineResponse:
"""Orchestrate the course creation flow"""
items = []
# Support both direct text and items
if request.input.text:
items.append({
"type": "text",
"text": request.input.text,
"lang": request.options.get("lang", "en")
})
for item in request.input.items:
items.append(item.model_dump())
# If we have media references in refs that need processing
media_id = request.input.refs.get("media_id")
if media_id:
# Assume it's audio/video for now as per current main.py logic
# This is a bit simplified; real logic would detect kind
trans = await self.client.run_transcription(
media_id=media_id,
action="transcribe.video", # Default to video for robust extraction
actor_id=request.actor.user_id,
opts=request.options
)
items.extend(trans["result"].get("items", []))
if not items:
return self._error_response(request, "NO_INPUT", "No text or items provided to build a course")
# Ingest
course_result = await self.client.run_ingestion(
items=items,
actor_id=request.actor.user_id,
ctx=request.context,
opts=request.options
)
return EngineResponse(
request_id=request.request_id,
ok=True,
status="success",
engine=self.engine_name,
action=request.action,
result=course_result["result"],
messages=[
"Orchestration complete",
"Aggregated multiple inputs into ingestion engine",
"Course generated successfully"
]
)
def _error_response(self, request: EngineRequest, code: str, detail: str) -> EngineResponse:
return EngineResponse(
request_id=request.request_id,
ok=False,
status="error",
engine=self.engine_name,
action=request.action,
error=ErrorDetail(code=code, detail=detail)
)