Spaces:
Sleeping
Sleeping
| from flask import Flask, render_template, request, jsonify | |
| import google.generativeai as genai | |
| import os | |
| from PIL import Image | |
| import tempfile | |
| import PIL.Image | |
| import subprocess | |
| import logging | |
| from urllib.parse import urlparse | |
| import requests | |
| import time | |
| import ssl | |
| from logging.handlers import RotatingFileHandler | |
| app = Flask(__name__) | |
| # Configuration des logs | |
| log_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
| log_file = 'app.log' | |
| # Handler pour fichier avec rotation | |
| file_handler = RotatingFileHandler(log_file, maxBytes=10485760, backupCount=5) # 10MB par fichier, max 5 fichiers | |
| file_handler.setFormatter(log_formatter) | |
| # Handler pour console | |
| console_handler = logging.StreamHandler() | |
| console_handler.setFormatter(log_formatter) | |
| # Configuration du logger principal | |
| logger = logging.getLogger(__name__) | |
| logger.setLevel(logging.INFO) | |
| logger.addHandler(file_handler) | |
| logger.addHandler(console_handler) | |
| # Configuration de l'API Gemini | |
| token = os.environ.get("TOKEN") | |
| if not token: | |
| logger.error("Token API non trouvé dans les variables d'environnement") | |
| raise ValueError("Token API manquant") | |
| genai.configure(api_key=token) | |
| generation_config = { | |
| "temperature": 1, | |
| "max_output_tokens": 8192, | |
| } | |
| safety_settings = [ | |
| {"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"}, | |
| {"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE"}, | |
| {"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"}, | |
| {"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"}, | |
| ] | |
| def generale(): | |
| logger.info("Accès à la page principale") | |
| return render_template("generale.html") | |
| def upload_and_process_file(file_path): | |
| """Upload et traite un fichier avec l'API Gemini avec gestion des erreurs améliorée""" | |
| max_retries = 3 | |
| retry_delay = 2 # secondes | |
| for attempt in range(max_retries): | |
| try: | |
| logger.info(f"Tentative d'upload {attempt + 1}/{max_retries} pour {file_path}") | |
| # Vérification du fichier | |
| if not os.path.exists(file_path): | |
| logger.error(f"Fichier non trouvé: {file_path}") | |
| raise FileNotFoundError(f"Le fichier {file_path} n'existe pas") | |
| file_size = os.path.getsize(file_path) | |
| if file_size == 0: | |
| logger.error(f"Fichier vide: {file_path}") | |
| raise ValueError(f"Le fichier {file_path} est vide") | |
| # Upload du fichier | |
| uploaded_file = genai.upload_file(path=file_path) | |
| logger.info(f"Upload réussi: {uploaded_file.uri}") | |
| # Attente du traitement | |
| timeout = 300 # 5 minutes | |
| start_time = time.time() | |
| while uploaded_file.state.name == "PROCESSING": | |
| if time.time() - start_time > timeout: | |
| logger.error("Timeout pendant le traitement du fichier") | |
| raise TimeoutError("Timeout pendant le traitement du fichier") | |
| logger.debug(f"En attente du traitement... Temps écoulé: {int(time.time() - start_time)}s") | |
| time.sleep(10) | |
| uploaded_file = genai.get_file(uploaded_file.name) | |
| if uploaded_file.state.name == "FAILED": | |
| logger.error(f"Échec du traitement: {uploaded_file.state.name}") | |
| raise ValueError(f"Échec du traitement: {uploaded_file.state.name}") | |
| logger.info(f"Traitement terminé avec succès: {uploaded_file.uri}") | |
| return uploaded_file | |
| except ssl.SSLError as e: | |
| logger.error(f"Erreur SSL lors de l'upload (tentative {attempt + 1}): {e}") | |
| if attempt < max_retries - 1: | |
| time.sleep(retry_delay * (attempt + 1)) | |
| else: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Erreur lors de l'upload (tentative {attempt + 1}): {e}") | |
| if attempt < max_retries - 1: | |
| time.sleep(retry_delay * (attempt + 1)) | |
| else: | |
| raise | |
| def is_youtube_url(url): | |
| """Vérifie si l'URL est une URL YouTube""" | |
| parsed = urlparse(url) | |
| return any(domain in parsed.netloc for domain in ['youtube.com', 'youtu.be']) | |
| def download_youtube_video(url): | |
| """Télécharge une vidéo YouTube en utilisant yt-dlp avec une configuration adaptée aux conteneurs""" | |
| try: | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| output_template = os.path.join(temp_dir, '%(title)s.%(ext)s') | |
| # Configuration optimisée pour environnement conteneurisé | |
| command = [ | |
| 'yt-dlp', | |
| '--format', 'best[filesize<50M]', | |
| '--quiet', | |
| '--no-warnings', | |
| '--output', output_template, | |
| '--extract-audio', # Tenter d'abord l'extraction audio si la vidéo échoue | |
| '--format-sort', 'res,ext:mp4:m4a', # Prioriser les formats plus légers | |
| '--user-agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', | |
| '--referer', 'https://www.youtube.com/', | |
| '--geo-bypass', # Contourner les restrictions géographiques | |
| '--no-check-certificates', # Ignorer les erreurs de certificat | |
| '--force-ipv4', # Forcer IPv4 pour plus de stabilité | |
| '--ignore-errors', # Continuer en cas d'erreurs mineures | |
| '--no-playlist', # Éviter le téléchargement de playlists | |
| '--extractor-retries', '3', | |
| '--file-access-retries', '3', | |
| '--fragment-retries', '3', | |
| '--retry-sleep', '5', | |
| '--sleep-requests','1.5', | |
| '--min-sleep-interval','60', | |
| '--max-sleep-interval','90', | |
| url | |
| ] | |
| logger.info(f"Début du, téléchargement de la vidéo: {url}") | |
| try: | |
| # Vérifier la version de yt-dlp | |
| version_process = subprocess.run(['yt-dlp', '--version'], | |
| capture_output=True, | |
| text=True, | |
| check=True) | |
| logger.info(f"Version de yt-dlp: {version_process.stdout.strip()}") | |
| except subprocess.SubprocessError as e: | |
| logger.error(f"Erreur lors de la vérification de yt-dlp: {e}") | |
| raise RuntimeError("yt-dlp n'est pas correctement installé") | |
| # Premier essai avec la configuration standard | |
| process = subprocess.run( | |
| command, | |
| capture_output=True, | |
| text=True | |
| ) | |
| if process.returncode != 0: | |
| logger.warning(f"Premier essai échoué: {process.stderr}") | |
| logger.info("Tentative avec configuration alternative...") | |
| # Configuration alternative | |
| alternative_command = command.copy() | |
| alternative_command.extend([ | |
| '--format', 'worst', # Utiliser la qualité la plus basse | |
| '--prefer-insecure', # Préférer les connexions non-sécurisées si nécessaire | |
| '--socket-timeout', '30', | |
| '--external-downloader', 'aria2c', # Utiliser aria2c comme downloader alternatif | |
| '--external-downloader-args', 'aria2c:"--min-split-size=1M --max-connection-per-server=16 --max-concurrent-downloads=16 --split=16"' | |
| ]) | |
| process = subprocess.run( | |
| alternative_command, | |
| capture_output=True, | |
| text=True | |
| ) | |
| if process.returncode != 0: | |
| logger.error(f"Échec du téléchargement après tentatives alternatives: {process.stderr}") | |
| raise Exception(f"Échec du téléchargement: {process.stderr}") | |
| downloaded_files = os.listdir(temp_dir) | |
| if not downloaded_files: | |
| logger.error("Aucun fichier n'a été téléchargé") | |
| raise FileNotFoundError("Aucun fichier n'a été téléchargé") | |
| video_path = os.path.join(temp_dir, downloaded_files[0]) | |
| # Vérifier la taille du fichier | |
| file_size = os.path.getsize(video_path) | |
| if file_size == 0: | |
| raise ValueError("Le fichier téléchargé est vide") | |
| logger.info(f"Fichier téléchargé: {video_path} ({file_size} bytes)") | |
| # Copier le fichier vers un emplacement temporaire | |
| temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(video_path)[1]) | |
| with open(video_path, 'rb') as f: | |
| temp_file.write(f.read()) | |
| logger.info(f"Vidéo téléchargée avec succès: {temp_file.name}") | |
| return temp_file.name | |
| except Exception as e: | |
| logger.error(f"Erreur lors du téléchargement de la vidéo: {e}") | |
| return None | |
| def telecharger_pdf(url): | |
| """Télécharge un PDF et retourne le chemin du fichier""" | |
| try: | |
| logger.info(f"Début du téléchargement du PDF: {url}") | |
| response = requests.get(url) | |
| response.raise_for_status() | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as temp_file: | |
| temp_file.write(response.content) | |
| logger.info(f"PDF téléchargé avec succès: {temp_file.name}") | |
| return temp_file.name | |
| except Exception as e: | |
| logger.error(f"Erreur lors du téléchargement du PDF: {e}") | |
| return None | |
| def allowed_file(filename): | |
| """Vérifie si l'extension du fichier est autorisée""" | |
| ALLOWED_EXTENSIONS = {'pdf', 'png', 'jpg', 'jpeg', 'gif', 'mp4', 'mov', 'avi'} | |
| return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS | |
| def submit_question(): | |
| logger.info("Nouvelle soumission reçue") | |
| question = request.form.get('question') | |
| urls = request.form.getlist('urls') | |
| files = request.files.getlist('files') | |
| logger.info(f"Question reçue: {question}") | |
| logger.info(f"URLs reçues: {urls}") | |
| logger.info(f"Nombre de fichiers reçus: {len(files)}") | |
| content = [question] | |
| temp_files = [] | |
| try: | |
| # Traitement des fichiers uploadés | |
| for file in files: | |
| if file and allowed_file(file.filename): | |
| logger.info(f"Traitement du fichier: {file.filename}") | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(file.filename)[1]) as temp_file: | |
| file.save(temp_file.name) | |
| temp_files.append(temp_file.name) | |
| if file.filename.lower().endswith(('.png', '.jpg', '.jpeg', '.gif')): | |
| content.append(PIL.Image.open(temp_file.name)) | |
| logger.info(f"Image ajoutée au contenu: {file.filename}") | |
| else: | |
| uploaded_file = upload_and_process_file(temp_file.name) | |
| content.append(uploaded_file) | |
| logger.info(f"Fichier uploadé et ajouté au contenu: {file.filename}") | |
| # Traitement des URLs | |
| for url in urls: | |
| logger.info(f"Traitement de l'URL: {url}") | |
| if is_youtube_url(url): | |
| video_path = download_youtube_video(url) | |
| if video_path: | |
| temp_files.append(video_path) | |
| uploaded_file = upload_and_process_file(video_path) | |
| content.append(uploaded_file) | |
| logger.info(f"Vidéo YouTube traitée et ajoutée au contenu: {url}") | |
| elif url.lower().endswith('.pdf'): | |
| pdf_path = telecharger_pdf(url) | |
| if pdf_path: | |
| temp_files.append(pdf_path) | |
| uploaded_file = upload_and_process_file(pdf_path) | |
| content.append(uploaded_file) | |
| logger.info(f"PDF téléchargé et ajouté au contenu: {url}") | |
| # Génération de contenu avec Gemini | |
| logger.info("Initialisation du modèle Gemini") | |
| model = genai.GenerativeModel( | |
| model_name="models/gemini-2.0-flash", | |
| safety_settings=safety_settings, | |
| system_instruction="Tu es un assistant intelligent. ton but est d'assister au mieux que tu peux. tu as été créé par Aenir et tu t'appelles Mariam." | |
| ) | |
| logger.info("Génération de la réponse") | |
| response = model.generate_content(content, request_options={"timeout": 600}) | |
| logger.info("Réponse générée avec succès") | |
| return jsonify({"response": response.text}) | |
| except Exception as e: | |
| logger.error(f"Erreur lors du traitement de la requête: {e}", exc_info=True) | |
| return jsonify({"error": str(e)}), 500 | |
| finally: | |
| # Nettoyage des fichiers temporaires | |
| for temp_file in temp_files: | |
| try: | |
| os.unlink(temp_file) | |
| logger.debug(f"Fichier temporaire supprimé: {temp_file}") | |
| except Exception as e: | |
| logger.error(f"Erreur lors de la suppression du fichier temporaire {temp_file}: {e}") | |
| if __name__ == '__main__': | |
| logger.info("Démarrage de l'application") | |
| app.run() |