SignApp / src /sign_app /api.py
SearingShot's picture
Deploy SignApp main app
bb5dd0a
Raw
History Blame Contribute Delete
7.13 kB
import os
import shutil
from contextlib import asynccontextmanager
from pathlib import Path
import requests
from dotenv import load_dotenv
from fastapi import FastAPI, File, HTTPException, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from pymongo import MongoClient
load_dotenv()
MONGODB_URI = os.getenv("MONGODB_URI", "mongodb://localhost:27017/")
WHISPER_MODEL_SIZE = os.getenv("WHISPER_MODEL", "small")
WHISPER_API_URL = os.getenv("WHISPER_API_URL", "").strip().rstrip("/")
DISFLUENCY_API_URL = os.getenv("DISFLUENCY_API_URL", "").strip().rstrip("/")
REMOTE_API_TIMEOUT = int(os.getenv("REMOTE_API_TIMEOUT", "300"))
HF_TOKEN = os.getenv("HF_TOKEN", "").strip()
UPLOAD_DIR = Path("uploads")
UPLOAD_DIR.mkdir(exist_ok=True)
UI_DIR = Path(__file__).parent / "ui"
client = MongoClient(MONGODB_URI)
db = client["SignApp"]
sign_rules_col = db["sign_rules"]
fingerspell_col = db["fingerspelling"]
_whisper_model = None
_disfluency_fn = None
def _auth_headers() -> dict[str, str]:
if not HF_TOKEN:
return {}
return {"Authorization": f"Bearer {HF_TOKEN}"}
def get_whisper():
global _whisper_model
if _whisper_model is None:
import whisper
_whisper_model = whisper.load_model(WHISPER_MODEL_SIZE)
return _whisper_model
def get_disfluency_fn():
global _disfluency_fn
if _disfluency_fn is None:
from .disfluency.inference import remove_disfluency
_disfluency_fn = remove_disfluency
return _disfluency_fn
def transcribe_audio(file_path: Path) -> dict:
if WHISPER_API_URL:
with file_path.open("rb") as audio_file:
response = requests.post(
f"{WHISPER_API_URL}/transcribe/",
headers=_auth_headers(),
files={"file": (file_path.name, audio_file, "audio/webm")},
timeout=REMOTE_API_TIMEOUT,
)
response.raise_for_status()
data = response.json()
return {
"text": data.get("text", ""),
"language": data.get("language", "en"),
}
whisper_model = get_whisper()
result = whisper_model.transcribe(str(file_path), language="en")
return {
"text": result["text"],
"language": result["language"],
}
def clean_disfluency(text: str) -> str:
if DISFLUENCY_API_URL:
response = requests.post(
f"{DISFLUENCY_API_URL}/clean/",
headers=_auth_headers(),
json={"text": text},
timeout=REMOTE_API_TIMEOUT,
)
response.raise_for_status()
data = response.json()
return data.get("cleaned_text", "").strip()
return get_disfluency_fn()(text)
@asynccontextmanager
async def lifespan(app: FastAPI):
if not WHISPER_API_URL:
print("Loading local Whisper model on startup...")
get_whisper()
else:
print(f"Using remote Whisper API: {WHISPER_API_URL}")
if not DISFLUENCY_API_URL:
print("Loading local disfluency model on startup...")
get_disfluency_fn()
else:
print(f"Using remote disfluency API: {DISFLUENCY_API_URL}")
print("SignApp startup complete.")
yield
app = FastAPI(title="SignApp", version="0.1.0", lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
from .sign_language_text.gloss_converter import convert_to_sign_gloss
class TextInput(BaseModel):
text: str
def build_sign_sequence(gloss_tokens: list[str]) -> list[dict]:
"""Look up each gloss token in MongoDB sign_rules, fall back to fingerspelling."""
sign_sequence = []
for word in gloss_tokens:
rule = sign_rules_col.find_one({"sign": word})
if rule:
sign_sequence.append(
{
"type": "sign",
"gloss": word,
"handshape": rule["handshape"],
"location": rule["location"],
"movement": rule["movement"],
"expression": rule.get("expression", "neutral"),
}
)
else:
for letter in word:
finger = fingerspell_col.find_one({"letter": letter.upper()})
if finger:
sign_sequence.append(
{
"type": "fingerspell",
"letter": letter.upper(),
"handshape": finger["handshape"],
"location": "neutral_space",
"movement": finger.get("movement") or "none",
}
)
return sign_sequence
def text_pipeline(text: str) -> dict:
cleaned_text = clean_disfluency(text)
sign_friendly_text = convert_to_sign_gloss(cleaned_text)
sign_sequence = build_sign_sequence(sign_friendly_text)
return {
"cleaned_transcription": cleaned_text,
"sign_friendly_text": sign_friendly_text,
"sign_sequence": sign_sequence,
}
@app.get("/health")
def health():
return {
"status": "ok",
"whisper": "remote" if WHISPER_API_URL else "local",
"disfluency": "remote" if DISFLUENCY_API_URL else "local",
}
@app.post("/voice-to-text/")
def voice_to_text_endpoint(file: UploadFile = File(...)):
"""Full pipeline: audio -> transcription -> gloss -> sign sequence."""
file_path = UPLOAD_DIR / (file.filename or "recording.webm")
try:
with file_path.open("wb") as audio_file:
shutil.copyfileobj(file.file, audio_file)
transcription_result = transcribe_audio(file_path)
transcription = transcription_result["text"]
language = transcription_result["language"]
result = text_pipeline(transcription)
return {
"language": language,
"raw_transcription": transcription,
**result,
}
except requests.RequestException as exc:
raise HTTPException(status_code=502, detail=f"Remote model service failed: {exc}") from exc
except Exception as exc:
raise HTTPException(status_code=500, detail=str(exc)) from exc
finally:
if file_path.exists():
file_path.unlink()
@app.post("/text-to-sign/")
def text_to_sign_endpoint(body: TextInput):
"""Text-only pipeline: text -> gloss -> sign sequence."""
text = body.text.strip()
if not text:
raise HTTPException(status_code=400, detail="Text is empty")
try:
return text_pipeline(text)
except requests.RequestException as exc:
raise HTTPException(status_code=502, detail=f"Remote model service failed: {exc}") from exc
except Exception as exc:
raise HTTPException(status_code=500, detail=str(exc)) from exc
@app.get("/")
def serve_ui():
return FileResponse(UI_DIR / "index.html")
app.mount("/", StaticFiles(directory=str(UI_DIR)), name="ui")