import json import logging import os import asyncio import tempfile from typing import List, Dict, Optional, Any, Callable import openai from botocore.exceptions import ClientError from core.config import settings from core.prompts import get_flashcard_system_prompt, get_flashcard_topic_prompt, get_flashcard_explanation_prompt from services.s3_service import s3_service logger = logging.getLogger(__name__) class FlashcardService: def __init__(self): self.openai_client = openai.OpenAI(api_key=settings.OPENAI_API_KEY) async def generate_flashcards( self, file_key: Optional[str] = None, text_input: Optional[str] = None, difficulty: str = "medium", quantity: str = "standard", topic: Optional[str] = None, language: str = "English", progress_callback: Optional[Callable[[int, str], None]] = None ) -> List[Dict[str, str]]: """ Generates flashcards from either an S3 PDF or direct text input. Uses asyncio.to_thread for all blocking I/O operations to enable parallel execution. Args: progress_callback: Optional callback function(progress: int, message: str) for progress updates """ try: if progress_callback: progress_callback(5, "Preparing prompts...") system_prompt = get_flashcard_system_prompt(difficulty, quantity, language) if topic: system_prompt += get_flashcard_topic_prompt(topic) if file_key: if progress_callback: progress_callback(15, "Downloading file from S3...") # Download PDF from S3 (non-blocking) tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") tmp_path = tmp.name tmp.close() try: # Use asyncio.to_thread for S3 download await asyncio.to_thread( s3_service.s3_client.download_file, settings.AWS_S3_BUCKET, file_key, tmp_path ) if progress_callback: progress_callback(30, "Uploading to OpenAI...") # Read file and upload to OpenAI (non-blocking) def upload_to_openai(): with open(tmp_path, "rb") as f: return self.openai_client.files.create( file=f, purpose="assistants" ) uploaded_file = await asyncio.to_thread(upload_to_openai) if progress_callback: progress_callback(45, "Generating flashcards with AI...") messages = [ {"role": "system", "content": system_prompt}, { "role": "user", "content": [ { "type": "file", "file": {"file_id": uploaded_file.id} } ] } ] # Call OpenAI API (non-blocking) response = await asyncio.to_thread( self.openai_client.chat.completions.create, model="gpt-4o-mini", messages=messages, temperature=0.7 ) if progress_callback: progress_callback(75, "Cleaning up...") # Clean up OpenAI file (non-blocking) await asyncio.to_thread( self.openai_client.files.delete, uploaded_file.id ) raw_content = response.choices[0].message.content finally: # Remove temp file (non-blocking) if os.path.exists(tmp_path): await asyncio.to_thread(os.remove, tmp_path) elif text_input: if progress_callback: progress_callback(20, "Generating flashcards with AI...") messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": text_input} ] # Call OpenAI API (non-blocking) response = await asyncio.to_thread( self.openai_client.chat.completions.create, model="gpt-4o-mini", messages=messages, temperature=0.7 ) raw_content = response.choices[0].message.content else: raise ValueError("Either file_key or text_input must be provided") if progress_callback: progress_callback(85, "Parsing results...") # Parse JSON if "```json" in raw_content: raw_content = raw_content.split("```json")[1].split("```")[0].strip() elif "```" in raw_content: raw_content = raw_content.split("```")[1].split("```")[0].strip() return json.loads(raw_content) except Exception as e: logger.error(f"Flashcard generation failed: {str(e)}") raise e async def generate_explanation(self, question: str, file_key: Optional[str] = None, language: str = "English") -> str: """ Generates a detailed explanation for a flashcard question. Uses asyncio.to_thread for all blocking I/O operations. """ try: explanation_prompt = get_flashcard_explanation_prompt(question, language) if file_key: tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") tmp_path = tmp.name tmp.close() try: # Download from S3 (non-blocking) await asyncio.to_thread( s3_service.s3_client.download_file, settings.AWS_S3_BUCKET, file_key, tmp_path ) # Upload to OpenAI (non-blocking) def upload_to_openai(): with open(tmp_path, "rb") as f: return self.openai_client.files.create(file=f, purpose="assistants") uploaded_file = await asyncio.to_thread(upload_to_openai) messages = [ {"role": "system", "content": explanation_prompt}, {"role": "user", "content": [{"type": "file", "file": {"file_id": uploaded_file.id}}]} ] # Call OpenAI API (non-blocking) response = await asyncio.to_thread( self.openai_client.chat.completions.create, model="gpt-4o-mini", messages=messages, temperature=0.3 ) # Clean up OpenAI file (non-blocking) await asyncio.to_thread( self.openai_client.files.delete, uploaded_file.id ) content = response.choices[0].message.content or "" # Clean up: remove newlines, markdown bolding, and extra spaces content = content.replace("\n", " ").replace("**", "").replace("__", "") content = " ".join(content.split()) return content finally: # Remove temp file (non-blocking) if os.path.exists(tmp_path): await asyncio.to_thread(os.remove, tmp_path) else: messages = [ {"role": "system", "content": explanation_prompt}, {"role": "user", "content": f"Please explain the question: {question}"} ] # Call OpenAI API (non-blocking) response = await asyncio.to_thread( self.openai_client.chat.completions.create, model="gpt-4o-mini", messages=messages, temperature=0.3 ) content = response.choices[0].message.content or "" # Clean up: remove newlines, markdown bolding, and extra spaces content = content.replace("\n", " ").replace("**", "").replace("__", "") content = " ".join(content.split()) return content except Exception as e: logger.error(f"Explanation generation failed: {str(e)}") raise e flashcard_service = FlashcardService()