| import os |
| import sys |
| import shutil |
| import torch |
| from fastapi import FastAPI, UploadFile, File, HTTPException |
| from fastapi.responses import JSONResponse |
| from transformers import pipeline |
| from google import genai |
| from pydub import AudioSegment |
|
|
| |
| |
| |
| MODEL_ID = "MohamedRashad/Arabic-Whisper-CodeSwitching-Edition" |
| device = 0 if torch.cuda.is_available() else "cpu" |
| print(f"Device set to use: {device}") |
|
|
| try: |
| print("Loading ASR pipeline (Whisper)...") |
| pipeline_kwargs = {"chunk_length_s": 30} |
| asr_pipeline = pipeline( |
| "automatic-speech-recognition", |
| model=MODEL_ID, |
| device=device, |
| **pipeline_kwargs |
| ) |
| print("Pipeline loaded successfully.") |
| except Exception as e: |
| print(f"Error loading ASR pipeline: {e}.") |
| asr_pipeline = None |
|
|
| |
| |
| |
| API_KEY = os.environ.get("GEMINI_API_KEY") |
| if not API_KEY: |
| print("Warning: GEMINI_API_KEY not set. Summarization will fail.") |
|
|
| try: |
| client = genai.Client(api_key=API_KEY) if API_KEY else None |
| except Exception as e: |
| print(f"Failed to initialize Gemini Client: {e}") |
| client = None |
|
|
| MODEL_NAME = "gemini-2.5-flash" |
| MAX_TOKENS_PER_CHUNK = 10000 |
| CHUNK_SIZE_LIMIT = int(MAX_TOKENS_PER_CHUNK * 5 * 0.9) |
|
|
| |
| def split_text_into_chunks(text: str) -> list[str]: |
| """Splits large text into smaller chunks based on token limits.""" |
| chunks = [] |
| current_chunk = "" |
| sentences = text.split('.') |
| for sentence in sentences: |
| if len(current_chunk) + len(sentence) < CHUNK_SIZE_LIMIT: |
| current_chunk += sentence + ". " |
| else: |
| if current_chunk: |
| chunks.append(current_chunk.strip()) |
| current_chunk = sentence + ". " |
| if current_chunk: |
| chunks.append(current_chunk.strip()) |
| return chunks |
|
|
| def correct_and_format_text(raw_text: str) -> str: |
| """Corrects spelling/grammar and adds punctuation using Gemini.""" |
| if not raw_text.strip(): return "" |
| if client is None: return raw_text |
|
|
| correction_prompt = f""" |
| You are an expert text editor. Your task is to take raw, unpunctuated text, often from a Speech-to-Text (ASR) system, and correct it. |
| |
| Perform the following actions: |
| 1. **Fix Spelling and Grammar:** Correct all spelling, syntax, and grammatical errors. |
| 2. **Add Punctuation:** Insert all necessary punctuation (periods, commas, question marks, etc.) to make the text readable and clear. |
| 3. **Fix Transliterated Terms (CRITICAL):** If you find English technical terms written in Arabic letters (e.g., "داتا بيز", "ديب ليرنينج", "نتورك"), you MUST convert them back to their correct English spelling (e.g., "Database", "Deep Learning", "Network") inside the text. |
| 4. **Preserve Content:** DO NOT add, delete, or change any core meaning or factual information. Only correct the form. |
| |
| CRITICAL INSTRUCTION: |
| Output ONLY the corrected text. DO NOT include any conversational filler, introductions, or conclusions such as "Here is the corrected text", "إليك تصحيح النص", or similar phrases. Just return the text itself. |
| |
| Raw Text to Correct: |
| --- |
| {raw_text} |
| """ |
| try: |
| response = client.models.generate_content(model=MODEL_NAME, contents=correction_prompt) |
| return response.text |
| except Exception as e: |
| print(f"Correction Error: {e}") |
| return raw_text |
|
|
| def smart_summarize_and_merge(text_to_summarize: str) -> str: |
| """Handles chunking, partial summarization, and final merging.""" |
| if not text_to_summarize.strip(): return "No text to summarize." |
| if client is None: return "Error: Gemini client not initialized." |
|
|
| |
| if len(text_to_summarize) > CHUNK_SIZE_LIMIT: |
| chunks = split_text_into_chunks(text_to_summarize) |
| else: |
| chunks = [text_to_summarize] |
| |
| partial_summaries = [] |
|
|
| |
| for chunk in chunks: |
| partial_prompt = f""" |
| You are an expert summarizer. Summarize the following text into **clear, key bullet points**. |
| Do not leave out any essential information. The summary must be in the same language as the source text. |
| |
| CRITICAL INSTRUCTION: |
| Output ONLY the bullet points. DO NOT include any conversational filler like "Here is the summary" or "إليك التلخيص". |
| |
| Source Text: |
| --- |
| {chunk} |
| """ |
| try: |
| response = client.models.generate_content(model=MODEL_NAME, contents=partial_prompt) |
| partial_summaries.append(response.text) |
| except Exception: |
| continue |
| |
| if not partial_summaries: return "Failed to generate summary." |
|
|
| |
| if len(partial_summaries) > 1: |
| combined_summaries = "\n\n--- Previous Chunk Summary ---\n\n".join(partial_summaries) |
| input_for_final_prompt = combined_summaries |
| prompt_type = "summarize the provided partial summaries" |
| else: |
| input_for_final_prompt = partial_summaries[0] |
| prompt_type = "review and format the following summary" |
| |
| final_prompt = f""" |
| You are a professional text summarizer. {prompt_type} into clear, comprehensive **Bullet Points**. |
| Use **round bullet points (•)** for the list items. |
| |
| Language Instructions: |
| 1. **If the majority of the input text was in English:** The final summary must be **strictly in English**. |
| 2. **If the majority of the input text was in Arabic (including dialects):** The final summary must be **in Formal Arabic**, while **strictly preserving all foreign technical terms (English) exactly as they are** without translation. |
| |
| CRITICAL INSTRUCTION: |
| Output ONLY the final bullet points. DO NOT include any conversational filler, greetings, or phrases like "Here is the summary", "إليك التلخيص النهائي", etc. |
| |
| Input: |
| --- |
| {input_for_final_prompt} |
| """ |
| try: |
| response = client.models.generate_content(model=MODEL_NAME, contents=final_prompt) |
| return response.text.replace('**', '') |
| except Exception as e: |
| return f"Summary Error: {e}" |
|
|
| |
| |
| |
| app = FastAPI(title="Streaming Transcription & Summarization API") |
|
|
| @app.post("/process-audio/") |
| async def process_audio_api(file: UploadFile = File(...)): |
| if asr_pipeline is None: |
| raise HTTPException(status_code=500, detail="Error: Transcription model not loaded.") |
| |
| print(f"Starting processing for uploaded file: {file.filename}") |
| temp_file_path = f"temp_{file.filename}" |
| |
|
|
| with open(temp_file_path, "wb") as buffer: |
| shutil.copyfileobj(file.file, buffer) |
| |
| try: |
| audio = AudioSegment.from_file(temp_file_path) |
| except Exception as e: |
| if os.path.exists(temp_file_path): |
| os.remove(temp_file_path) |
| raise HTTPException(status_code=400, detail=f"Error loading audio file: {e}") |
|
|
| |
| CHUNK_DURATION_MS = 30 * 1000 |
| BUFFER_LIMIT_MS = 5 * 60 * 1000 |
| total_duration = len(audio) |
| |
| accumulated_corrected_text = "" |
| buffer_text = "" |
|
|
| |
| |
| |
| for i in range(0, total_duration, CHUNK_DURATION_MS): |
| chunk_num = (i // CHUNK_DURATION_MS) + 1 |
| |
| |
| chunk_audio = audio[i : i + CHUNK_DURATION_MS] |
| chunk_filename = f"temp_chunk_{chunk_num}.wav" |
| chunk_audio.export(chunk_filename, format="wav") |
| |
| |
| try: |
| asr_result = asr_pipeline(chunk_filename, return_timestamps=True) |
| raw_text = asr_result['text'].strip() |
| if raw_text: |
| buffer_text += raw_text + " " |
| except Exception as e: |
| print(f"Error in ASR for chunk {chunk_num}: {e}") |
| |
| |
| if os.path.exists(chunk_filename): |
| os.remove(chunk_filename) |
|
|
| |
| current_position_ms = i + len(chunk_audio) |
| if (current_position_ms % BUFFER_LIMIT_MS == 0) or (current_position_ms >= total_duration): |
| if buffer_text.strip(): |
| print(f"Buffer reached {current_position_ms/60000:.2f} mins. Sending to Gemini for correction...") |
| corrected_chunk = correct_and_format_text(buffer_text) |
| accumulated_corrected_text += corrected_chunk.strip() + "\n\n" |
| buffer_text = "" |
|
|
| |
| |
| |
| print("All chunks processed. Starting summarization...") |
|
|
| if accumulated_corrected_text and len(accumulated_corrected_text.strip()) > 50: |
| final_summary = smart_summarize_and_merge(accumulated_corrected_text) |
| else: |
| final_summary = "Text too short to summarize." |
|
|
| if os.path.exists(temp_file_path): |
| os.remove(temp_file_path) |
| |
| return JSONResponse(content={ |
| "transcript": accumulated_corrected_text.strip(), |
| "summary": final_summary.strip() |
| }) |