well / app /models.py
zarox's picture
Upload 23 files
1c167a4 verified
import enum
import json
import uuid
from datetime import datetime
from typing import Any
from sqlalchemy import DateTime, Float, ForeignKey, Integer, String, Text
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.database import Base
class JobStatus(str, enum.Enum):
pending = "pending"
processing = "processing"
transcribing = "transcribing"
analyzing_frames = "analyzing_frames"
embedding = "embedding"
ready = "ready"
failed = "failed"
cancelled = "cancelled"
class Job(Base):
__tablename__ = "jobs"
id: Mapped[str] = mapped_column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
status: Mapped[str] = mapped_column(String(32), default=JobStatus.pending.value, index=True)
original_filename: Mapped[str] = mapped_column(String(512))
storage_path: Mapped[str] = mapped_column(String(1024))
duration_seconds: Mapped[float | None] = mapped_column(Float, nullable=True)
title: Mapped[str | None] = mapped_column(String(256), nullable=True)
subject: Mapped[str | None] = mapped_column(String(128), nullable=True)
thumbnail: Mapped[str | None] = mapped_column(String(512), nullable=True)
progress_message: Mapped[str | None] = mapped_column(String(512), nullable=True)
error_message: Mapped[str | None] = mapped_column(Text, nullable=True)
pipeline_timings_json: Mapped[str | None] = mapped_column(Text, nullable=True)
activity_log_json: Mapped[str | None] = mapped_column(Text, nullable=True)
whisper_language: Mapped[str | None] = mapped_column(String(32), nullable=True)
whisper_task: Mapped[str] = mapped_column(String(16), default="transcribe")
created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)
updated_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
segments: Mapped[list["TranscriptSegment"]] = relationship(
back_populates="job",
cascade="all, delete-orphan",
order_by="TranscriptSegment.start_sec",
)
@property
def pipeline_timings(self) -> dict[str, Any] | None:
if not self.pipeline_timings_json:
return None
try:
data = json.loads(self.pipeline_timings_json)
return data if isinstance(data, dict) else None
except json.JSONDecodeError:
return None
@property
def activity_log(self) -> list[dict[str, Any]]:
if not self.activity_log_json:
return []
try:
data = json.loads(self.activity_log_json)
return data if isinstance(data, list) else []
except json.JSONDecodeError:
return []
class TranscriptSegment(Base):
__tablename__ = "transcript_segments"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
job_id: Mapped[str] = mapped_column(String(36), ForeignKey("jobs.id", ondelete="CASCADE"), index=True)
start_sec: Mapped[float] = mapped_column(Float, index=True)
end_sec: Mapped[float] = mapped_column(Float)
text: Mapped[str] = mapped_column(Text)
job: Mapped[Job] = relationship(back_populates="segments")