# ========================================= # 1. IMPORTS # ========================================= import asyncio import os import json import uuid import cloudinary import cloudinary.uploader import firebase_admin from firebase_admin import credentials, firestore from fastapi import FastAPI, HTTPException, BackgroundTasks from pydantic import BaseModel from gradio_client import Client from google.cloud.firestore_v1.base_query import FieldFilter import edge_tts from dotenv import load_dotenv # ========================================= # 2. INITIALIZATIONS # ========================================= if not firebase_admin._apps: fb_json = os.getenv("FIREBASE_JSON") if fb_json: cred_dict = json.loads(fb_json) cred = credentials.Certificate(cred_dict) else: cred = credentials.Certificate("serviceAccountKey.json") firebase_admin.initialize_app(cred) db = firestore.client() # Load environment variables load_dotenv() # Cloudinary Configuration cloudinary.config( cloud_name=os.getenv("CLOUD_NAME"), api_key=os.getenv("API_KEY"), api_secret=os.getenv("API_SECRET"), secure=True ) app = FastAPI(title="AI Question Service") HF_SPACE = "Fayza38/Question_and_answer_model" client = None # ========================================= # 3. MODELS & CONSTANTS # ========================================= TECH_CATEGORIES = {0: "Security", 1: "BackEnd", 2: "Networking", 3: "FrontEnd", 4: "DataEngineering", 5: "WebDevelopment", 6: "FullStack", 7: "VersionControl", 8: "SystemDesign", 9: "MachineLearning", 10: "LanguagesAndFrameworks", 11: "DatabaseSystems", 12: "ArtificialIntelligence", 13: "SoftwareTesting", 14: "DistributedSystems", 15: "DevOps", 16: "LowLevelSystems", 17: "DatabaseAndSql", 18: "GeneralProgramming", 19: "DataStructures", 20: "Algorithms"} DIFFICULTY_MAP = {0: "Easy", 1: "Intermediate", 2: "Hard"} SESSION_TYPE_MAP = {0: "Technical", 1: "Behavioral"} class GenerateSessionRequest(BaseModel): sessionId: str sessionType: int difficultyLevel: int = 0 trackName: int class CleanupRequest(BaseModel): audioUrls: list[str] # ========================================= # 4. STARTUP EVENT # ========================================= @app.on_event("startup") async def startup_event(): global client max_retries = 5 retry_delay = 10 print("Connecting to Hugging Face Space...") for i in range(max_retries): try: client = Client(HF_SPACE) print("Connected Successfully!") break except Exception as e: print(f"Connection attempt {i+1} failed. Retrying in {retry_delay}s...") if i < max_retries - 1: await asyncio.sleep(retry_delay) # ========================================= # 5. HELPERS # ========================================= async def generate_audio(text, filename): try: # Rate is set to -10% to make the voice slightly slower and clearer communicate = edge_tts.Communicate(text, "en-US-GuyNeural", rate="-15%") await communicate.save(filename) # Upload to Cloudinary upload_result = cloudinary.uploader.upload( filename, resource_type="video", folder="interview_audio" ) if os.path.exists(filename): os.remove(filename) return upload_result["secure_url"] except Exception as e: print(f"Audio Generation Error: {e}") if os.path.exists(filename): os.remove(filename) return None async def safe_generate(prompt, retries=3): if client is None: raise Exception("Gradio Client not initialized") for attempt in range(retries): try: loop = asyncio.get_running_loop() return await loop.run_in_executor(None, lambda: client.predict(prompt=prompt, api_name="/generate_questions")) except Exception as e: if attempt == retries - 1: raise e await asyncio.sleep(2) def parse_question_output(raw_output: str): if not raw_output: return None, None text = raw_output.split("assistant")[-1].strip() if "assistant" in raw_output else raw_output if "Q:" in text and "A:" in text: try: parts = text.split("A:") q = parts[0].replace("Q:", "").strip() a = parts[1].split("<|im_end|>")[0].strip() return q, a except: return None, None return None, None # ========================================= # 6. REFILL & PREFILL LOGIC # ========================================= async def refill_specific_pool(track_id: int, difficulty: int, count: int, session_type: int = 0): global client while client is None: await asyncio.sleep(5) # Technical (0) vs Behavioral (1) if session_type == 1: prompt = "Generate ONE unique behavioral interview question (soft skills, situational). Format: Q: [Question] A: [Answer]" track_text = "Behavioral" level_text = "General" else: track_text = TECH_CATEGORIES.get(track_id) level_text = DIFFICULTY_MAP.get(difficulty) prompt = f"Generate ONE unique {track_text} interview question for {level_text} level. Format: Q: [Question] A: [Answer]" success_count = 0 while success_count < count: try: raw_output = await safe_generate(prompt) q_text, a_text = parse_question_output(raw_output) if q_text and a_text: filename = f"{uuid.uuid4()}.mp3" audio_url = await generate_audio(q_text, filename) if audio_url: db.collection("questions_pool").add({ "session_type": session_type, "track_id": track_id if session_type == 0 else -1, "difficulty": difficulty if session_type == 0 else 0, "questionText": q_text, "questionIdealAnswer": a_text, "audio_url": audio_url, "created_at": firestore.SERVER_TIMESTAMP }) success_count += 1 print(f"[{success_count}/{count}] Refilled: {track_text}") await asyncio.sleep(2) except Exception as e: print(f"Error in refill: {e}") await asyncio.sleep(5) # ========================================= # 6. ENDPOINTS # ========================================= @app.post("/generate-session") async def generate_session(request: GenerateSessionRequest, background_tasks: BackgroundTasks): t_id, diff = request.trackName, request.difficultyLevel s_type = request.sessionType # 0: Technical, 1: Behavioral # Query based on the new session types (0 or 1) query = db.collection("questions_pool").where(filter=FieldFilter("session_type", "==", s_type)) if s_type == 0: # Technical query = query.where(filter=FieldFilter("track_id", "==", t_id)) \ .where(filter=FieldFilter("difficulty", "==", diff)) docs_query = query.limit(10).get() final_questions = [] for index, doc in enumerate(docs_query, start=1): data = doc.to_dict() final_questions.append({ "question_id": index, "text": data["questionText"], "expected_answer": data["questionIdealAnswer"], "audio_url": data.get("audio_url", "") }) # Delete after fetching to ensure questions are unique for next users db.collection("questions_pool").document(doc.id).delete() # Maintenance task to keep the pool full async def maintain_stock(): agg_query = query.count() current_count = agg_query.get()[0][0].value target = 50 if current_count < target: await refill_specific_pool(t_id, diff, target - current_count, session_type=s_type) background_tasks.add_task(maintain_stock) if not final_questions: raise HTTPException(status_code=503, detail="Pool empty for this type.") return {"session_id": request.sessionId, "questions": final_questions} @app.get("/system-cleanup") async def system_cleanup(background_tasks: BackgroundTasks): """Scan and delete all questions with missing or invalid audio URLs""" def run_cleanup(): print("Starting System Cleanup...") # Get all documents in the pool docs = db.collection("questions_pool").get() deleted_count = 0 for doc in docs: data = doc.to_dict() # Check if audio_url is missing, None, or empty string if not data.get("audio_url") or data.get("audio_url") == "": db.collection("questions_pool").document(doc.id).delete() deleted_count += 1 print(f"Cleanup finished! Deleted {deleted_count} broken questions.") background_tasks.add_task(run_cleanup) return {"message": "Cleanup started in background. Check your console/logs."} @app.post("/cleanup-audio") async def cleanup_audio(request: CleanupRequest, background_tasks: BackgroundTasks): def delete_job(urls): for url in urls: try: public_id = "interview_audio/" + url.split('/')[-1].split('.')[0] cloudinary.uploader.destroy(public_id, resource_type="video") print(f"Deleted: {public_id}") except Exception: pass background_tasks.add_task(delete_job, request.audioUrls) return {"message": "Cleanup started"} # @app.get("/trigger-full-prefill") # async def trigger_full_prefill(background_tasks: BackgroundTasks): # """Prefills 30 questions for every track and every difficulty level""" # async def full_prefill_task(): # for t_id in TECH_CATEGORIES.keys(): # for diff in DIFFICULTY_MAP.keys(): # print(f"Starting full prefill for Track {t_id}, Level {diff}") # await refill_specific_pool(t_id, diff, 30) # background_tasks.add_task(full_prefill_task) # return {"message": "Full system prefill started in background (30 questions per track/level)"} #?############################################################################## # @app.get("/trigger-behavioral-prefill") # async def trigger_behavioral_prefill(background_tasks: BackgroundTasks): # """Prefills 30 Behavioral questions (No track or difficulty needed)""" # async def run_behavioral_task(): # print("Starting Behavioral questions prefill...") # await refill_specific_pool(track_id=0, difficulty=0, count=30, session_type=2) # print("Finished prefilling 30 Behavioral questions!") # background_tasks.add_task(run_behavioral_task) # return {"message": "Behavioral prefill (30 questions) started in background."} @app.get("/health") async def health(): return {"status": "running", "hf_connected": client is not None} #?########################################################################## # @app.get("/final-migration-fix") # async def final_migration_fix(background_tasks: BackgroundTasks): # def run_fix(): # print("🔄 Starting Final Data Fix...") # docs = db.collection("questions_pool").get() # updated_count = 0 # for doc in docs: # data = doc.to_dict() # updates = {} # # 1. تصحيح الـ session_type (Technical: 0, Behavioral: 1) # # لو كان 1 (قديم) خليه 0، ولو كان 2 (قديم) خليه 1 # curr_type = data.get("session_type") # if curr_type == 1: updates["session_type"] = 0 # elif curr_type == 2: updates["session_type"] = 1 # # 2. تصحيح الـ difficulty (Easy: 0, Intermediate: 1, Hard: 2) # # الأسئلة القديمة كانت 1 و 2 و 3، هننقص منها 1 # curr_diff = data.get("difficulty") # if curr_diff in [1, 2, 3]: # updates["difficulty"] = curr_diff - 1 # if updates: # db.collection("questions_pool").document(doc.id).update(updates) # updated_count += 1 # print(f"✅ Final Fix Done! Updated {updated_count} questions.") # background_tasks.add_task(run_fix) # return {"message": "Final migration started. Your pool will be ready in a minute!"}