Spaces:
Sleeping
Sleeping
| import asyncio | |
| import json | |
| import uuid | |
| from datetime import datetime, timezone | |
| from celery import Celery | |
| from app.config import settings | |
| celery_app = Celery("inference_engine", broker=settings.REDIS_URL, backend=settings.REDIS_URL) | |
| celery_app.conf.update( | |
| task_serializer="json", | |
| accept_content=["json"], | |
| result_serializer="json", | |
| timezone="UTC", | |
| enable_utc=True, | |
| ) | |
| def run_inference_task(job_id: str, file_bytes_hex: str, filename: str, content_type: str, params: dict): | |
| import asyncio | |
| from app.database import AsyncSessionLocal | |
| from app.models.db_models import InferenceJob | |
| from app.preprocessing.registry import StepRegistry | |
| from app.preprocessing.pipeline import PreprocessingPipeline | |
| from app.preprocessing.config import PIPELINE_CONFIG | |
| from app.services.model_service import model_service | |
| from sqlalchemy import select | |
| async def _run(): | |
| async with AsyncSessionLocal() as db: | |
| result = await db.execute(select(InferenceJob).where(InferenceJob.id == job_id)) | |
| job = result.scalar_one_or_none() | |
| if not job: | |
| return | |
| job.status = "processing" | |
| await db.commit() | |
| try: | |
| file_bytes = bytes.fromhex(file_bytes_hex) | |
| steps = StepRegistry.discover_steps() | |
| config = {**PIPELINE_CONFIG, **params} | |
| pipeline = PreprocessingPipeline(steps, config) | |
| ctx = await pipeline.run(file_bytes, filename, content_type) | |
| predictions = await model_service.predict(ctx) | |
| result_data = { | |
| "predictions": predictions, | |
| "preprocessing": { | |
| "steps_completed": ctx.steps_completed, | |
| "step_timings": ctx.step_timings, | |
| "step_outputs": ctx.step_outputs, | |
| "total_preprocessing_ms": sum(ctx.step_timings.values()), | |
| "errors": ctx.errors, | |
| }, | |
| } | |
| job.status = "completed" | |
| job.result_json = json.dumps(result_data) | |
| job.completed_at = datetime.now(timezone.utc) | |
| except Exception as e: | |
| job.status = "failed" | |
| job.error_message = str(e) | |
| job.completed_at = datetime.now(timezone.utc) | |
| await db.commit() | |
| asyncio.run(_run()) | |