Spaces:
Runtime error
Runtime error
| from fastapi import APIRouter, Depends, HTTPException | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from typing import List | |
| from fastapi import FastAPI | |
| from configs import get_settings | |
| from models.database import get_db, Base, engine | |
| from models.sessions import create_session, get_session, update_session_status, delete_session,get_all_sessions | |
| from models.transcriptions import create_transcription, get_transcriptions_by_session, update_transcription_text, delete_transcription | |
| db_router = APIRouter( | |
| prefix=f"/{get_settings().APP_NAME}/{get_settings().APP_VARIENT}", | |
| tags=["db"]) | |
| async def api_get_all_session(db: AsyncSession = Depends(get_db)): | |
| session = await get_all_sessions(db) | |
| sess=[] | |
| for se in session: | |
| sess.append( | |
| { | |
| "session_id":se.session_id, | |
| "created_at":se.created_at, | |
| "status":se.status, | |
| }) | |
| return sess | |
| #return {"sessions":f"{sess}"} | |
| async def api_create_session(session_id: str, db: AsyncSession = Depends(get_db)): | |
| session = await create_session(db, session_id) | |
| return {"session_id": session.session_id, "status": session.status, "created_at": session.created_at} | |
| async def api_get_session(session_id: str, db: AsyncSession = Depends(get_db)): | |
| session = await get_session(db, session_id) | |
| if not session: | |
| raise HTTPException(status_code=404, detail="Session not found") | |
| return {"session_id": session.session_id, "status": session.status, "created_at": session.created_at} | |
| async def api_update_session_status(session_id: str, status: str, db: AsyncSession = Depends(get_db)): | |
| session = await update_session_status(db, session_id, status) | |
| if not session: | |
| raise HTTPException(status_code=404, detail="Session not found") | |
| return {"session_id": session.session_id, "status": session.status} | |
| async def api_delete_session(session_id: str, db: AsyncSession = Depends(get_db)): | |
| session = await delete_session(db, session_id) | |
| if not session: | |
| raise HTTPException(status_code=404, detail="Session not found") | |
| return {"detail": "Session deleted successfully"} | |
| async def api_create_transcription(session_id: str, chunk_number: int, text: str, language: str, db: AsyncSession = Depends(get_db)): | |
| transcription = await create_transcription(db, session_id, chunk_number, text, language) | |
| return { | |
| "id": transcription.id, | |
| "session_id": transcription.session_id, | |
| "chunk_number": transcription.chunk_number, | |
| "text": transcription.text, | |
| "language": transcription.language, | |
| "created_at": transcription.created_at | |
| } | |
| async def api_get_transcriptions(session_id: str, db: AsyncSession = Depends(get_db)): | |
| transcriptions = await get_transcriptions_by_session(db, session_id) | |
| if not transcriptions: | |
| raise HTTPException(status_code=404, detail="Session not found") | |
| return [ | |
| { | |
| "id": t.id, | |
| "session_id": t.session_id, | |
| "chunk_number": t.chunk_number, | |
| "text": t.text, | |
| "language": t.language, | |
| "created_at": t.created_at | |
| } for t in transcriptions | |
| ] | |
| async def api_update_transcription(transcription_id: int, new_text: str, db: AsyncSession = Depends(get_db)): | |
| transcription = await update_transcription_text(db, transcription_id, new_text) | |
| if not transcription: | |
| raise HTTPException(status_code=404, detail="Transcription not found") | |
| return { | |
| "id": transcription.id, | |
| "text": transcription.text | |
| } | |
| async def api_delete_transcription(transcription_id: int, db: AsyncSession = Depends(get_db)): | |
| transcription = await delete_transcription(db, transcription_id) | |
| if not transcription: | |
| raise HTTPException(status_code=404, detail="Transcription not found") | |
| return {"detail": "Transcription deleted successfully"} | |
| async def init_models(): | |
| async with engine.begin() as conn: | |
| await conn.run_sync(Base.metadata.create_all) | |
| def register_startup_events(app: FastAPI): | |
| async def startup_event(): | |
| await init_models() |