Spaces:
Runtime error
Runtime error
File size: 4,760 Bytes
3dcada4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 | from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List
from fastapi import FastAPI
from configs import get_settings
from models.database import get_db, Base, engine
from models.sessions import create_session, get_session, update_session_status, delete_session,get_all_sessions
from models.transcriptions import create_transcription, get_transcriptions_by_session, update_transcription_text, delete_transcription
db_router = APIRouter(
prefix=f"/{get_settings().APP_NAME}/{get_settings().APP_VARIENT}",
tags=["db"])
@db_router.post("/getallsessions/", response_model=List)
async def api_get_all_session(db: AsyncSession = Depends(get_db)):
session = await get_all_sessions(db)
sess=[]
for se in session:
sess.append(
{
"session_id":se.session_id,
"created_at":se.created_at,
"status":se.status,
})
return sess
#return {"sessions":f"{sess}"}
@db_router.post("/sessions/", response_model=dict)
async def api_create_session(session_id: str, db: AsyncSession = Depends(get_db)):
session = await create_session(db, session_id)
return {"session_id": session.session_id, "status": session.status, "created_at": session.created_at}
@db_router.get("/sessions/{session_id}", response_model=dict)
async def api_get_session(session_id: str, db: AsyncSession = Depends(get_db)):
session = await get_session(db, session_id)
if not session:
raise HTTPException(status_code=404, detail="Session not found")
return {"session_id": session.session_id, "status": session.status, "created_at": session.created_at}
@db_router.patch("/sessions/{session_id}", response_model=dict)
async def api_update_session_status(session_id: str, status: str, db: AsyncSession = Depends(get_db)):
session = await update_session_status(db, session_id, status)
if not session:
raise HTTPException(status_code=404, detail="Session not found")
return {"session_id": session.session_id, "status": session.status}
@db_router.delete("/sessions/{session_id}", response_model=dict)
async def api_delete_session(session_id: str, db: AsyncSession = Depends(get_db)):
session = await delete_session(db, session_id)
if not session:
raise HTTPException(status_code=404, detail="Session not found")
return {"detail": "Session deleted successfully"}
@db_router.post("/transcriptions/", response_model=dict)
async def api_create_transcription(session_id: str, chunk_number: int, text: str, language: str, db: AsyncSession = Depends(get_db)):
transcription = await create_transcription(db, session_id, chunk_number, text, language)
return {
"id": transcription.id,
"session_id": transcription.session_id,
"chunk_number": transcription.chunk_number,
"text": transcription.text,
"language": transcription.language,
"created_at": transcription.created_at
}
@db_router.get("/transcriptions/{session_id}", response_model=List[dict])
async def api_get_transcriptions(session_id: str, db: AsyncSession = Depends(get_db)):
transcriptions = await get_transcriptions_by_session(db, session_id)
if not transcriptions:
raise HTTPException(status_code=404, detail="Session not found")
return [
{
"id": t.id,
"session_id": t.session_id,
"chunk_number": t.chunk_number,
"text": t.text,
"language": t.language,
"created_at": t.created_at
} for t in transcriptions
]
@db_router.patch("/transcriptions/{transcription_id}", response_model=dict)
async def api_update_transcription(transcription_id: int, new_text: str, db: AsyncSession = Depends(get_db)):
transcription = await update_transcription_text(db, transcription_id, new_text)
if not transcription:
raise HTTPException(status_code=404, detail="Transcription not found")
return {
"id": transcription.id,
"text": transcription.text
}
@db_router.delete("/transcriptions/{transcription_id}", response_model=dict)
async def api_delete_transcription(transcription_id: int, db: AsyncSession = Depends(get_db)):
transcription = await delete_transcription(db, transcription_id)
if not transcription:
raise HTTPException(status_code=404, detail="Transcription not found")
return {"detail": "Transcription deleted successfully"}
async def init_models():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
def register_startup_events(app: FastAPI):
@app.on_event("startup")
async def startup_event():
await init_models() |