Fayza38's picture
Upload 3 files
1bb94e9 verified
# =========================================
# 1. IMPORTS
# =========================================
import asyncio
import os
import json
import uuid
import cloudinary
import cloudinary.uploader
import firebase_admin
from firebase_admin import credentials, firestore
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from gradio_client import Client
from google.cloud.firestore_v1.base_query import FieldFilter
import edge_tts
from dotenv import load_dotenv
# =========================================
# 2. INITIALIZATIONS
# =========================================
if not firebase_admin._apps:
fb_json = os.getenv("FIREBASE_JSON")
if fb_json:
cred_dict = json.loads(fb_json)
cred = credentials.Certificate(cred_dict)
else:
cred = credentials.Certificate("serviceAccountKey.json")
firebase_admin.initialize_app(cred)
db = firestore.client()
# Load environment variables
load_dotenv()
# Cloudinary Configuration
cloudinary.config(
cloud_name=os.getenv("CLOUD_NAME"),
api_key=os.getenv("API_KEY"),
api_secret=os.getenv("API_SECRET"),
secure=True
)
app = FastAPI(title="AI Question Service")
HF_SPACE = "Fayza38/Question_and_answer_model"
client = None
# =========================================
# 3. MODELS & CONSTANTS
# =========================================
TECH_CATEGORIES = {0: "Security",
1: "BackEnd",
2: "Networking",
3: "FrontEnd",
4: "DataEngineering",
5: "WebDevelopment",
6: "FullStack",
7: "VersionControl",
8: "SystemDesign",
9: "MachineLearning",
10: "LanguagesAndFrameworks",
11: "DatabaseSystems",
12: "ArtificialIntelligence",
13: "SoftwareTesting",
14: "DistributedSystems",
15: "DevOps",
16: "LowLevelSystems",
17: "DatabaseAndSql",
18: "GeneralProgramming",
19: "DataStructures",
20: "Algorithms"}
DIFFICULTY_MAP = {0: "Easy", 1: "Intermediate", 2: "Hard"}
SESSION_TYPE_MAP = {0: "Technical", 1: "Behavioral"}
class GenerateSessionRequest(BaseModel):
sessionId: str
sessionType: int
difficultyLevel: int = 0
trackName: int
class CleanupRequest(BaseModel):
audioUrls: list[str]
# =========================================
# 4. STARTUP EVENT
# =========================================
@app.on_event("startup")
async def startup_event():
global client
max_retries = 5
retry_delay = 10
print("Connecting to Hugging Face Space...")
for i in range(max_retries):
try:
client = Client(HF_SPACE)
print("Connected Successfully!")
break
except Exception as e:
print(f"Connection attempt {i+1} failed. Retrying in {retry_delay}s...")
if i < max_retries - 1: await asyncio.sleep(retry_delay)
# =========================================
# 5. HELPERS
# =========================================
async def generate_audio(text, filename):
try:
# Rate is set to -10% to make the voice slightly slower and clearer
communicate = edge_tts.Communicate(text, "en-US-GuyNeural", rate="-15%")
await communicate.save(filename)
# Upload to Cloudinary
upload_result = cloudinary.uploader.upload(
filename,
resource_type="video",
folder="interview_audio"
)
if os.path.exists(filename): os.remove(filename)
return upload_result["secure_url"]
except Exception as e:
print(f"Audio Generation Error: {e}")
if os.path.exists(filename): os.remove(filename)
return None
async def safe_generate(prompt, retries=3):
if client is None: raise Exception("Gradio Client not initialized")
for attempt in range(retries):
try:
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, lambda: client.predict(prompt=prompt, api_name="/generate_questions"))
except Exception as e:
if attempt == retries - 1: raise e
await asyncio.sleep(2)
def parse_question_output(raw_output: str):
if not raw_output: return None, None
text = raw_output.split("assistant")[-1].strip() if "assistant" in raw_output else raw_output
if "Q:" in text and "A:" in text:
try:
parts = text.split("A:")
q = parts[0].replace("Q:", "").strip()
a = parts[1].split("<|im_end|>")[0].strip()
return q, a
except: return None, None
return None, None
# =========================================
# 6. REFILL & PREFILL LOGIC
# =========================================
async def refill_specific_pool(track_id: int, difficulty: int, count: int, session_type: int = 0):
global client
while client is None: await asyncio.sleep(5)
# Technical (0) vs Behavioral (1)
if session_type == 1:
prompt = "Generate ONE unique behavioral interview question (soft skills, situational). Format: Q: [Question] A: [Answer]"
track_text = "Behavioral"
level_text = "General"
else:
track_text = TECH_CATEGORIES.get(track_id)
level_text = DIFFICULTY_MAP.get(difficulty)
prompt = f"Generate ONE unique {track_text} interview question for {level_text} level. Format: Q: [Question] A: [Answer]"
success_count = 0
while success_count < count:
try:
raw_output = await safe_generate(prompt)
q_text, a_text = parse_question_output(raw_output)
if q_text and a_text:
filename = f"{uuid.uuid4()}.mp3"
audio_url = await generate_audio(q_text, filename)
if audio_url:
db.collection("questions_pool").add({
"session_type": session_type,
"track_id": track_id if session_type == 0 else -1,
"difficulty": difficulty if session_type == 0 else 0,
"questionText": q_text,
"questionIdealAnswer": a_text,
"audio_url": audio_url,
"created_at": firestore.SERVER_TIMESTAMP
})
success_count += 1
print(f"[{success_count}/{count}] Refilled: {track_text}")
await asyncio.sleep(2)
except Exception as e:
print(f"Error in refill: {e}")
await asyncio.sleep(5)
# =========================================
# 6. ENDPOINTS
# =========================================
@app.post("/generate-session")
async def generate_session(request: GenerateSessionRequest, background_tasks: BackgroundTasks):
t_id, diff = request.trackName, request.difficultyLevel
s_type = request.sessionType # 0: Technical, 1: Behavioral
# Query based on the new session types (0 or 1)
query = db.collection("questions_pool").where(filter=FieldFilter("session_type", "==", s_type))
if s_type == 0: # Technical
query = query.where(filter=FieldFilter("track_id", "==", t_id)) \
.where(filter=FieldFilter("difficulty", "==", diff))
docs_query = query.limit(10).get()
final_questions = []
for index, doc in enumerate(docs_query, start=1):
data = doc.to_dict()
final_questions.append({
"question_id": index,
"text": data["questionText"],
"expected_answer": data["questionIdealAnswer"],
"audio_url": data.get("audio_url", "")
})
# Delete after fetching to ensure questions are unique for next users
db.collection("questions_pool").document(doc.id).delete()
# Maintenance task to keep the pool full
async def maintain_stock():
agg_query = query.count()
current_count = agg_query.get()[0][0].value
target = 50
if current_count < target:
await refill_specific_pool(t_id, diff, target - current_count, session_type=s_type)
background_tasks.add_task(maintain_stock)
if not final_questions:
raise HTTPException(status_code=503, detail="Pool empty for this type.")
return {"session_id": request.sessionId, "questions": final_questions}
@app.get("/system-cleanup")
async def system_cleanup(background_tasks: BackgroundTasks):
"""Scan and delete all questions with missing or invalid audio URLs"""
def run_cleanup():
print("Starting System Cleanup...")
# Get all documents in the pool
docs = db.collection("questions_pool").get()
deleted_count = 0
for doc in docs:
data = doc.to_dict()
# Check if audio_url is missing, None, or empty string
if not data.get("audio_url") or data.get("audio_url") == "":
db.collection("questions_pool").document(doc.id).delete()
deleted_count += 1
print(f"Cleanup finished! Deleted {deleted_count} broken questions.")
background_tasks.add_task(run_cleanup)
return {"message": "Cleanup started in background. Check your console/logs."}
@app.post("/cleanup-audio")
async def cleanup_audio(request: CleanupRequest, background_tasks: BackgroundTasks):
def delete_job(urls):
for url in urls:
try:
public_id = "interview_audio/" + url.split('/')[-1].split('.')[0]
cloudinary.uploader.destroy(public_id, resource_type="video")
print(f"Deleted: {public_id}")
except Exception: pass
background_tasks.add_task(delete_job, request.audioUrls)
return {"message": "Cleanup started"}
# @app.get("/trigger-full-prefill")
# async def trigger_full_prefill(background_tasks: BackgroundTasks):
# """Prefills 30 questions for every track and every difficulty level"""
# async def full_prefill_task():
# for t_id in TECH_CATEGORIES.keys():
# for diff in DIFFICULTY_MAP.keys():
# print(f"Starting full prefill for Track {t_id}, Level {diff}")
# await refill_specific_pool(t_id, diff, 30)
# background_tasks.add_task(full_prefill_task)
# return {"message": "Full system prefill started in background (30 questions per track/level)"}
#?##############################################################################
# @app.get("/trigger-behavioral-prefill")
# async def trigger_behavioral_prefill(background_tasks: BackgroundTasks):
# """Prefills 30 Behavioral questions (No track or difficulty needed)"""
# async def run_behavioral_task():
# print("Starting Behavioral questions prefill...")
# await refill_specific_pool(track_id=0, difficulty=0, count=30, session_type=2)
# print("Finished prefilling 30 Behavioral questions!")
# background_tasks.add_task(run_behavioral_task)
# return {"message": "Behavioral prefill (30 questions) started in background."}
@app.get("/health")
async def health(): return {"status": "running", "hf_connected": client is not None}
#?##########################################################################
# @app.get("/final-migration-fix")
# async def final_migration_fix(background_tasks: BackgroundTasks):
# def run_fix():
# print("๐Ÿ”„ Starting Final Data Fix...")
# docs = db.collection("questions_pool").get()
# updated_count = 0
# for doc in docs:
# data = doc.to_dict()
# updates = {}
# # 1. ุชุตุญูŠุญ ุงู„ู€ session_type (Technical: 0, Behavioral: 1)
# # ู„ูˆ ูƒุงู† 1 (ู‚ุฏูŠู…) ุฎู„ูŠู‡ 0ุŒ ูˆู„ูˆ ูƒุงู† 2 (ู‚ุฏูŠู…) ุฎู„ูŠู‡ 1
# curr_type = data.get("session_type")
# if curr_type == 1: updates["session_type"] = 0
# elif curr_type == 2: updates["session_type"] = 1
# # 2. ุชุตุญูŠุญ ุงู„ู€ difficulty (Easy: 0, Intermediate: 1, Hard: 2)
# # ุงู„ุฃุณุฆู„ุฉ ุงู„ู‚ุฏูŠู…ุฉ ูƒุงู†ุช 1 ูˆ 2 ูˆ 3ุŒ ู‡ู†ู†ู‚ุต ู…ู†ู‡ุง 1
# curr_diff = data.get("difficulty")
# if curr_diff in [1, 2, 3]:
# updates["difficulty"] = curr_diff - 1
# if updates:
# db.collection("questions_pool").document(doc.id).update(updates)
# updated_count += 1
# print(f"โœ… Final Fix Done! Updated {updated_count} questions.")
# background_tasks.add_task(run_fix)
# return {"message": "Final migration started. Your pool will be ready in a minute!"}