Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import google.generativeai as genai | |
| from fastapi import FastAPI, File, UploadFile, Header, HTTPException, Request | |
| from fastapi.responses import HTMLResponse, FileResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from pydantic import BaseModel | |
| from typing import List, Optional | |
| import logging | |
| # Configuration du logging pour un meilleur débogage | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # --- Configuration de l'application FastAPI --- | |
| app = FastAPI() | |
| # --- Modèles Pydantic pour la validation des données --- | |
| class DiarizationEntry(BaseModel): | |
| speaker: str | |
| timestamp: str | |
| text: str | |
| # --- Endpoint API pour la Diarisation --- | |
| async def diarize_audio( | |
| file: UploadFile = File(...), | |
| x_api_key: Optional[str] = Header(None, alias="X-API-KEY") | |
| ): | |
| """ | |
| Reçoit un fichier audio MP3 et une clé API, effectue la diarisation | |
| en utilisant l'API Gemini et retourne le résultat structuré. | |
| """ | |
| if not x_api_key: | |
| logger.warning("Tentative d'appel à l'API sans clé API.") | |
| raise HTTPException(status_code=400, detail="La clé API Gemini est requise dans l'en-tête X-API-KEY.") | |
| if not file or file.content_type != "audio/mpeg": | |
| logger.warning(f"Type de fichier invalide reçu: {file.content_type if file else 'None'}") | |
| raise HTTPException(status_code=400, detail="Un fichier au format MP3 (audio/mpeg) est requis.") | |
| logger.info(f"Début du traitement du fichier: {file.filename}") | |
| try: | |
| # Configure le client Gemini avec la clé fournie par l'utilisateur | |
| genai.configure(api_key=x_api_key) | |
| # Télécharge le fichier pour le rendre accessible à l'API Gemini | |
| logger.info("Téléchargement du fichier vers l'API Gemini...") | |
| audio_file = genai.upload_file( | |
| file.file, | |
| mime_type=file.content_type, | |
| display_name=file.filename | |
| ) | |
| logger.info("Fichier téléchargé avec succès.") | |
| # Le prompt pour guider le modèle Gemini | |
| prompt = """ | |
| Tu es un expert en analyse audio spécialisé dans la diarisation des locuteurs. | |
| Ta tâche est d'analyser le fichier audio fourni, d'identifier chaque locuteur distinct, de transcrire précisément ce qu'ils disent, et de fournir un horodatage pour chaque segment de parole. | |
| INSTRUCTIONS IMPORTANTES: | |
| 1. Identifie chaque locuteur unique et assigne-leur une étiquette claire et cohérente (ex: "Locuteur A", "Locuteur B"). | |
| 2. Pour chaque intervention, fournis un horodatage précis au format "HH:MM:SS". | |
| 3. Transcris le texte de chaque intervention. | |
| 4. Ta réponse doit être UNIQUEMENT un tableau JSON valide. N'ajoute AUCUN texte, explication, ou formatage markdown (comme ```json) avant ou après le tableau JSON. | |
| Le format de sortie doit être un tableau d'objets JSON, où chaque objet contient les clés suivantes : | |
| - "speaker": L'identifiant du locuteur (string). | |
| - "timestamp": L'horodatage de début (string, "HH:MM:SS"). | |
| - "text": La transcription textuelle (string). | |
| Exemple de sortie attendue : | |
| [ | |
| { | |
| "speaker": "Locuteur A", | |
| "timestamp": "00:00:05", | |
| "text": "Bonjour, comment ça va aujourd'hui ?" | |
| }, | |
| { | |
| "speaker": "Locuteur B", | |
| "timestamp": "00:00:08", | |
| "text": "Très bien, merci. Et vous ?" | |
| } | |
| ] | |
| Analyse maintenant le fichier audio fourni et retourne le résultat au format JSON spécifié. | |
| """ | |
| # Crée une instance du modèle et lance la génération | |
| logger.info("Appel au modèle Gemini pour la génération de contenu...") | |
| model = genai.GenerativeModel(model_name="gemini-2.5-pro") | |
| response = model.generate_content([prompt, audio_file]) | |
| logger.info("Réponse reçue du modèle Gemini.") | |
| # Nettoie et parse la réponse JSON | |
| cleaned_response_text = response.text.strip() | |
| # Tente de parser le JSON | |
| result = json.loads(cleaned_response_text) | |
| if not isinstance(result, list): | |
| logger.error("La réponse du modèle n'est pas une liste JSON valide.") | |
| raise ValueError("Le modèle n'a pas retourné une liste JSON.") | |
| logger.info(f"Traitement terminé avec succès. {len(result)} entrées de diarisation trouvées.") | |
| return result | |
| except Exception as e: | |
| logger.error(f"Une erreur est survenue lors du traitement: {e}", exc_info=True) | |
| # Retourne une erreur claire au client | |
| error_detail = f"Erreur interne du serveur: {str(e)}" | |
| if "API key not valid" in str(e): | |
| error_detail = "La clé API fournie n'est pas valide. Veuillez vérifier votre clé." | |
| raise HTTPException(status_code=500, detail=error_detail) | |
| # --- Servir les fichiers statiques du frontend --- | |
| # Cette section permet au serveur Python de "montrer" votre page web (index.html, etc.) | |
| app.mount("/", StaticFiles(directory=".", html=True), name="static") | |
| async def catch_all_route(request: Request, full_path: str): | |
| """ | |
| Route "catch-all" pour renvoyer index.html, ce qui permet au routage | |
| côté client de fonctionner correctement, notamment après un rafraîchissement de la page. | |
| """ | |
| return FileResponse("index.html") |