jonathanjordan21's picture
Update app.py
32c4beb verified
import os
import uuid
import json
import requests
from datetime import datetime, timedelta, timezone
from fastapi import FastAPI, HTTPException, Depends, UploadFile, File, Header, Request, status
from sqlalchemy.orm import Session
from database import SessionLocal
from models import Interview, Base
from sqlalchemy import create_engine
from dotenv import load_dotenv
from pydantic import BaseModel
from enum import Enum
from typing import Optional, Dict, List
from fastapi.responses import Response, JSONResponse
from google.cloud import storage
from google.oauth2 import service_account
load_dotenv()
app = FastAPI()
UTC_7 = timezone(timedelta(hours=7))
BASE_URL = os.getenv("BASE_URL")
ACCESS_TOKEN = os.getenv("ACCESS_TOKEN")
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
# gcp_private_key = os.getenv("GCP_PRIVATE_KEY")
GCP_SERVICE_ACCOUNT = json.loads(os.getenv("GCP_CREDENTIAL"))
# GCP_SERVICE_ACCOUNT["private_key"] = gcp_private_key
GCP_BUCKET_NAME = os.getenv("GCP_BUCKET_NAME")
credentials = service_account.Credentials.from_service_account_info(GCP_SERVICE_ACCOUNT)
client = storage.Client(credentials=credentials)
bucket = client.bucket(GCP_BUCKET_NAME)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# class TokenRequest(BaseModel):
# token: str
class InterviewStatus(str, Enum):
OPEN = "OPEN"
IN_PROGRESS = "IN PROGRESS"
FINISHED = "FINISHED"
EXPIRED = "EXPIRED"
# Pydantic model for update
class InterviewUpdate(BaseModel):
status: Optional[InterviewStatus] = InterviewStatus.OPEN
start_date: Optional[datetime] = None
transcript: Optional[list] = None
summary: Optional[str] = None
recording_url: Optional[str] = None
prompt: Optional[str] = None
expired_date: Optional[datetime] = None
token: Optional[str] = None
duration: Optional[int] = None
@app.exception_handler(HTTPException)
async def custom_http_exception_handler(request: Request, exc: HTTPException):
return JSONResponse(
status_code=exc.status_code,
content={
"code": exc.status_code,
"message": "An error occurred",
"detail": exc.detail
}
)
def get_realtime_session():
url = "https://api.openai.com/v1/realtime/client_secrets"
headers = {
"Authorization": f"Bearer {OPENAI_API_KEY}",
"Content-Type": "application/json"
}
payload = {
"session": {
"type": "realtime",
"model": "gpt-realtime",
}
}
response = requests.post(url, headers=headers, json=payload)
if response.status_code != 200:
raise HTTPException(status_code=response.status_code, detail=response.text)
return response.json()
@app.post("/interview/{id}/file")
async def upload_file_interview(id: str, file: UploadFile = File(...), authorization: str = Header(...)):
db: Session = next(get_db())
interview = db.query(Interview).filter_by(id=id).first()
if not interview:
raise HTTPException(status_code=404, detail="Interview not found")
interview_token = authorization.split(" ")[1]
if interview_token != interview.token:
raise HTTPException(status_code=401, detail="Unauthorized: Invalid token")
ext = file.filename.rsplit(".",1)[-1]
cur_datetime = datetime.now(UTC_7).strftime("%Y%m%d_%H_%M_%S")
filename = f"tenants/joss/attachment/{cur_datetime}_{str(uuid.uuid4())}.{ext}"
blob = bucket.blob(filename)
blob.upload_from_file(
file.file,
content_type=file.content_type
)
return {
"code":201,
"message": "Request was successful.",
"data":{
"file_location": filename,
"filename":filename.rsplit('/',1)[-1],
"url": f"https://storage.googleapis.com/{bucket.name}/{filename}",
"datetime":cur_datetime
}
}
@app.post("/file")
async def upload_file(file: UploadFile = File(...), authorization: str = Header(...)):
auth_token = authorization.split(" ")[1]
if auth_token != ACCESS_TOKEN:
raise HTTPException(status_code=401, detail="Invalid token")
ext = file.filename.rsplit(".",1)[-1]
filename = f"tenants/joss/attachment/{str(uuid.uuid4())}.{ext}"
blob = bucket.blob(filename)
blob.upload_from_file(
file.file,
content_type=file.content_type
)
return {
"code":201,
"message": "Request was successful.",
"data":{
"file_location": filename,
"filename":filename.rsplit('/',1)[-1],
"url": f"https://storage.googleapis.com/{bucket.name}/{filename}"
}
}
@app.get("/file_name/{filename}")
def get_filename(filename: str, authorization: str = Header(...)):
auth_token = authorization.split(" ")[1]
if auth_token != ACCESS_TOKEN:
raise HTTPException(status_code=401, detail="Invalid token")
filename = f"tenants/joss/attachment/{filename}"
return {
"filename":filename
}
@app.get("/file/{filename}")
def get_file(filename: str, authorization: str = Header(...)):
auth_token = authorization.split(" ")[1]
if auth_token != ACCESS_TOKEN:
raise HTTPException(status_code=401, detail="Invalid token")
print("File Name:", filename)
filename = f"tenants/joss/attachment/{filename}"
blob = bucket.blob(filename)
if not blob.exists():
raise HTTPException(status_code=404, detail="File not found")
file_data = blob.download_as_bytes()
return Response(
content=file_data,
media_type=blob.content_type or "application/octet-stream",
headers={
"Content-Disposition": f'inline; filename="{filename}"'
}
)
@app.post("/generate_link")
def generate_link(payload: Optional[InterviewUpdate] = InterviewUpdate(), authorization: str = Header(...)):
auth_token = authorization.split(" ")[1]
if auth_token != ACCESS_TOKEN:
raise HTTPException(status_code=401, detail="Invalid token")
db: Session = next(get_db())
# If expired_date not provided, default 7 days
expired_date = payload.expired_date or (datetime.utcnow() + timedelta(days=7))
# Create interview row (url empty for now)
interview = Interview(
expired_date=expired_date,
start_date=payload.start_date,
status=payload.status.value if payload.status else InterviewStatus.OPEN.value,
transcript=payload.transcript,
summary=payload.summary,
recording_url=payload.recording_url,
prompt=payload.prompt,
duration=payload.duration,
url=None, # will fill after id is generated
token=payload.token if payload.token else str(uuid.uuid4().hex),
)
db.add(interview)
db.commit()
db.refresh(interview) # <-- now interview.id is available
# Use the id as token in URL
token = str(interview.id)
url = f"{BASE_URL}/{token}"
interview.url = url
db.commit()
db.refresh(interview)
return {
"code":201,
"message":"Request was successful.",
"data":{
"url": url,
"expired_date": expired_date,
"status": interview.status,
"token": interview.token
}
}
@app.get("/interview/{id}")
def read_interview(id: str):#, authorization: str = Header(...)):
# interview_token = authorization.split(" ")[1]
db: Session = next(get_db())
interview = db.query(Interview).filter_by(id=id).first()
if not interview:
raise HTTPException(status_code=404, detail="Interview not found")
# if interview_token != interview.token:
# raise HTTPException(status_code=401, detail="Unauthorized: Invalid token")
# if datetime.utcnow() > interview.expires_at:
# raise HTTPException(status_code=400, detail="Interview expired")
return {
"code":200,
"message": "Request was successful.",
"data": {
"id": interview.id,
"url": interview.url,
"expired_date": interview.expired_date,
"start_date": interview.start_date,
"status": interview.status,
"duration": interview.duration,
"transcript": interview.transcript,
"summary": interview.summary,
"recording_url": interview.recording_url,
"prompt": interview.prompt
}
}
@app.get("/interview/")
def read_all_interviews(authorization: str = Header(...)):
auth_token = authorization.split(" ")[1]
if auth_token != ACCESS_TOKEN:
raise HTTPException(status_code=401, detail="Invalid token")
db: Session = next(get_db())
interviews = db.query(Interview).all()
result = []
for interview in interviews:
result.append({
"id": interview.id,
"url": interview.url,
"expired_date": interview.expired_date,
"start_date": interview.start_date,
"status": interview.status,
"duration": interview.duration,
"transcript": interview.transcript,
"summary": interview.summary,
"recording_url": interview.recording_url,
"prompt": interview.prompt
})
# return {"count": len(result), "interviews": result}
return {
"code":200,
"message": "Request was successful.",
"data": result
}
# --- Start interview ---
# @app.post("/start_interview/{id}")
@app.post("/interview/{id}/start_interview")
def start_interview(id: str, response: Response, authorization: str = Header(...)):
interview_token = authorization.split(" ")[1]
db: Session = next(get_db())
interview = db.query(Interview).filter_by(id=id).first()
if not interview:
raise HTTPException(status_code=404, detail="Invalid interview Id")
if interview_token != interview.token:
raise HTTPException(status_code=401, detail="Unauthorized: Invalid token")
if interview.status == InterviewStatus.FINISHED:
# raise HTTPException(status_code=0, detail="Interview has been finished. Thank you for joining our AI interview, please wait at most 2 weeks to get the results.")
response.status_code = status.HTTP_202_ACCEPTED
return {
"code":202,
"message":"Request was successful.",
"data":None
}
if interview.start_date:
raise HTTPException(status_code=400, detail="Already started")
if interview.expired_date and datetime.utcnow() > interview.expired_date:
interview.status = InterviewStatus.EXPIRED.value
db.commit()
db.refresh(interview)
raise HTTPException(status_code=400, detail="Interview expired")
ephemeral_key = get_realtime_session()["value"]
interview.start_date = datetime.utcnow()
interview.status = InterviewStatus.IN_PROGRESS.value
db.commit()
db.refresh(interview)
return {
"code":200,
"message": "Request was successful.",
"data": {
"id": interview.id,
"url": interview.url,
"expired_date": interview.expired_date,
"start_date": interview.start_date,
"status": interview.status,
"duration": interview.duration,
"transcript": interview.transcript,
"summary": interview.summary,
"recording_url": interview.recording_url,
"prompt": interview.prompt,
"ephemeral_key": ephemeral_key
}
}
@app.patch("/interview/{id}")
def update_interview(id: str, payload: InterviewUpdate, authorization: str = Header(...)):
db: Session = next(get_db())
interview = db.query(Interview).filter_by(id=id).first()
if not interview:
raise HTTPException(status_code=404, detail="Interview not found")
interview_token = authorization.split(" ")[1]
if interview_token != interview.token:
raise HTTPException(status_code=401, detail="Unauthorized: Invalid token")
# Only update fields that are provided in request
for key, value in payload.dict(exclude_unset=True).items():
if key == "status" and value not in InterviewStatus._value2member_map_:
raise HTTPException(status_code=400, detail=f"Invalid status: {value}")
setattr(interview, key, value)
db.commit()
db.refresh(interview)
# return {"success": True, "updated_fields": payload.dict(exclude_unset=True)}
return {
"code":200,
"message": "Request was successful.",
"data": {
"id": interview.id,
"url": interview.url,
"expired_date": interview.expired_date,
"start_date": interview.start_date,
"status": interview.status,
"duration": interview.duration,
"transcript": interview.transcript,
"summary": interview.summary,
"recording_url": interview.recording_url,
"prompt": interview.prompt
}
}