import os import sys import shutil import torch from fastapi import FastAPI, UploadFile, File, HTTPException from fastapi.responses import JSONResponse from transformers import pipeline from google import genai from pydub import AudioSegment # ============================================================================== # I. ASR Setup (Whisper) # ============================================================================== MODEL_ID = "MohamedRashad/Arabic-Whisper-CodeSwitching-Edition" device = 0 if torch.cuda.is_available() else "cpu" print(f"Device set to use: {device}") try: print("Loading ASR pipeline (Whisper)...") pipeline_kwargs = {"chunk_length_s": 30} asr_pipeline = pipeline( "automatic-speech-recognition", model=MODEL_ID, device=device, **pipeline_kwargs ) print("Pipeline loaded successfully.") except Exception as e: print(f"Error loading ASR pipeline: {e}.") asr_pipeline = None # ============================================================================== # II. Summarization & Correction Setup (Gemini) # ============================================================================== API_KEY = os.environ.get("GEMINI_API_KEY") if not API_KEY: print("Warning: GEMINI_API_KEY not set. Summarization will fail.") try: client = genai.Client(api_key=API_KEY) if API_KEY else None except Exception as e: print(f"Failed to initialize Gemini Client: {e}") client = None MODEL_NAME = "gemini-2.5-flash" MAX_TOKENS_PER_CHUNK = 10000 CHUNK_SIZE_LIMIT = int(MAX_TOKENS_PER_CHUNK * 5 * 0.9) # --- Helper Functions --- def split_text_into_chunks(text: str) -> list[str]: """Splits large text into smaller chunks based on token limits.""" chunks = [] current_chunk = "" sentences = text.split('.') for sentence in sentences: if len(current_chunk) + len(sentence) < CHUNK_SIZE_LIMIT: current_chunk += sentence + ". " else: if current_chunk: chunks.append(current_chunk.strip()) current_chunk = sentence + ". " if current_chunk: chunks.append(current_chunk.strip()) return chunks def correct_and_format_text(raw_text: str) -> str: """Corrects spelling/grammar and adds punctuation using Gemini.""" if not raw_text.strip(): return "" if client is None: return raw_text correction_prompt = f""" You are an expert text editor. Your task is to take raw, unpunctuated text, often from a Speech-to-Text (ASR) system, and correct it. Perform the following actions: 1. **Fix Spelling and Grammar:** Correct all spelling, syntax, and grammatical errors. 2. **Add Punctuation:** Insert all necessary punctuation (periods, commas, question marks, etc.) to make the text readable and clear. 3. **Fix Transliterated Terms (CRITICAL):** If you find English technical terms written in Arabic letters (e.g., "داتا بيز", "ديب ليرنينج", "نتورك"), you MUST convert them back to their correct English spelling (e.g., "Database", "Deep Learning", "Network") inside the text. 4. **Preserve Content:** DO NOT add, delete, or change any core meaning or factual information. Only correct the form. CRITICAL INSTRUCTION: Output ONLY the corrected text. DO NOT include any conversational filler, introductions, or conclusions such as "Here is the corrected text", "إليك تصحيح النص", or similar phrases. Just return the text itself. Raw Text to Correct: --- {raw_text} """ try: response = client.models.generate_content(model=MODEL_NAME, contents=correction_prompt) return response.text except Exception as e: print(f"Correction Error: {e}") return raw_text def smart_summarize_and_merge(text_to_summarize: str) -> str: """Handles chunking, partial summarization, and final merging.""" if not text_to_summarize.strip(): return "No text to summarize." if client is None: return "Error: Gemini client not initialized." # Split text if too long if len(text_to_summarize) > CHUNK_SIZE_LIMIT: chunks = split_text_into_chunks(text_to_summarize) else: chunks = [text_to_summarize] partial_summaries = [] # Map Step: Summarize each chunk for chunk in chunks: partial_prompt = f""" You are an expert summarizer. Summarize the following text into **clear, key bullet points**. Do not leave out any essential information. The summary must be in the same language as the source text. CRITICAL INSTRUCTION: Output ONLY the bullet points. DO NOT include any conversational filler like "Here is the summary" or "إليك التلخيص". Source Text: --- {chunk} """ try: response = client.models.generate_content(model=MODEL_NAME, contents=partial_prompt) partial_summaries.append(response.text) except Exception: continue if not partial_summaries: return "Failed to generate summary." # Reduce Step: Merge summaries if len(partial_summaries) > 1: combined_summaries = "\n\n--- Previous Chunk Summary ---\n\n".join(partial_summaries) input_for_final_prompt = combined_summaries prompt_type = "summarize the provided partial summaries" else: input_for_final_prompt = partial_summaries[0] prompt_type = "review and format the following summary" final_prompt = f""" You are a professional text summarizer. {prompt_type} into clear, comprehensive **Bullet Points**. Use **round bullet points (•)** for the list items. Language Instructions: 1. **If the majority of the input text was in English:** The final summary must be **strictly in English**. 2. **If the majority of the input text was in Arabic (including dialects):** The final summary must be **in Formal Arabic**, while **strictly preserving all foreign technical terms (English) exactly as they are** without translation. CRITICAL INSTRUCTION: Output ONLY the final bullet points. DO NOT include any conversational filler, greetings, or phrases like "Here is the summary", "إليك التلخيص النهائي", etc. Input: --- {input_for_final_prompt} """ try: response = client.models.generate_content(model=MODEL_NAME, contents=final_prompt) return response.text.replace('**', '') except Exception as e: return f"Summary Error: {e}" # ============================================================================== # III. API Endpoint Definition # ============================================================================== app = FastAPI(title="Streaming Transcription & Summarization API") @app.post("/process-audio/") async def process_audio_api(file: UploadFile = File(...)): if asr_pipeline is None: raise HTTPException(status_code=500, detail="Error: Transcription model not loaded.") print(f"Starting processing for uploaded file: {file.filename}") temp_file_path = f"temp_{file.filename}" with open(temp_file_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) try: audio = AudioSegment.from_file(temp_file_path) except Exception as e: if os.path.exists(temp_file_path): os.remove(temp_file_path) raise HTTPException(status_code=400, detail=f"Error loading audio file: {e}") CHUNK_DURATION_MS = 30 * 1000 BUFFER_LIMIT_MS = 5 * 60 * 1000 total_duration = len(audio) accumulated_corrected_text = "" buffer_text = "" # --------------------------------------------------------- # Phase 1: Processing Loop (Whisper ASR + 5-Min Gemini Buffer) # --------------------------------------------------------- for i in range(0, total_duration, CHUNK_DURATION_MS): chunk_num = (i // CHUNK_DURATION_MS) + 1 # 1. Prepare Chunk for Whisper (30s) chunk_audio = audio[i : i + CHUNK_DURATION_MS] chunk_filename = f"temp_chunk_{chunk_num}.wav" chunk_audio.export(chunk_filename, format="wav") # 2. ASR (Speech-to-Text) try: asr_result = asr_pipeline(chunk_filename, return_timestamps=True) raw_text = asr_result['text'].strip() if raw_text: buffer_text += raw_text + " " # نضيف النص لـ البافر except Exception as e: print(f"Error in ASR for chunk {chunk_num}: {e}") # Cleanup temp file if os.path.exists(chunk_filename): os.remove(chunk_filename) # 3. Check Buffer (Is it 5 mins yet? Or is it the last chunk?) current_position_ms = i + len(chunk_audio) if (current_position_ms % BUFFER_LIMIT_MS == 0) or (current_position_ms >= total_duration): if buffer_text.strip(): print(f"Buffer reached {current_position_ms/60000:.2f} mins. Sending to Gemini for correction...") corrected_chunk = correct_and_format_text(buffer_text) accumulated_corrected_text += corrected_chunk.strip() + "\n\n" buffer_text = "" # --------------------------------------------------------- # Phase 2: Final Summarization # --------------------------------------------------------- print("All chunks processed. Starting summarization...") if accumulated_corrected_text and len(accumulated_corrected_text.strip()) > 50: final_summary = smart_summarize_and_merge(accumulated_corrected_text) else: final_summary = "Text too short to summarize." if os.path.exists(temp_file_path): os.remove(temp_file_path) return JSONResponse(content={ "transcript": accumulated_corrected_text.strip(), "summary": final_summary.strip() })