import enum import json import uuid from datetime import datetime from typing import Any from sqlalchemy import DateTime, Float, ForeignKey, Integer, String, Text from sqlalchemy.orm import Mapped, mapped_column, relationship from app.database import Base class JobStatus(str, enum.Enum): pending = "pending" processing = "processing" transcribing = "transcribing" analyzing_frames = "analyzing_frames" embedding = "embedding" ready = "ready" failed = "failed" cancelled = "cancelled" class Job(Base): __tablename__ = "jobs" id: Mapped[str] = mapped_column(String(36), primary_key=True, default=lambda: str(uuid.uuid4())) status: Mapped[str] = mapped_column(String(32), default=JobStatus.pending.value, index=True) original_filename: Mapped[str] = mapped_column(String(512)) storage_path: Mapped[str] = mapped_column(String(1024)) duration_seconds: Mapped[float | None] = mapped_column(Float, nullable=True) title: Mapped[str | None] = mapped_column(String(256), nullable=True) subject: Mapped[str | None] = mapped_column(String(128), nullable=True) thumbnail: Mapped[str | None] = mapped_column(String(512), nullable=True) progress_message: Mapped[str | None] = mapped_column(String(512), nullable=True) error_message: Mapped[str | None] = mapped_column(Text, nullable=True) pipeline_timings_json: Mapped[str | None] = mapped_column(Text, nullable=True) activity_log_json: Mapped[str | None] = mapped_column(Text, nullable=True) whisper_language: Mapped[str | None] = mapped_column(String(32), nullable=True) whisper_task: Mapped[str] = mapped_column(String(16), default="transcribe") created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow) updated_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) segments: Mapped[list["TranscriptSegment"]] = relationship( back_populates="job", cascade="all, delete-orphan", order_by="TranscriptSegment.start_sec", ) @property def pipeline_timings(self) -> dict[str, Any] | None: if not self.pipeline_timings_json: return None try: data = json.loads(self.pipeline_timings_json) return data if isinstance(data, dict) else None except json.JSONDecodeError: return None @property def activity_log(self) -> list[dict[str, Any]]: if not self.activity_log_json: return [] try: data = json.loads(self.activity_log_json) return data if isinstance(data, list) else [] except json.JSONDecodeError: return [] class TranscriptSegment(Base): __tablename__ = "transcript_segments" id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) job_id: Mapped[str] = mapped_column(String(36), ForeignKey("jobs.id", ondelete="CASCADE"), index=True) start_sec: Mapped[float] = mapped_column(Float, index=True) end_sec: Mapped[float] = mapped_column(Float) text: Mapped[str] = mapped_column(Text) job: Mapped[Job] = relationship(back_populates="segments")