Spaces:
Running
Running
| import os | |
| import time | |
| import tempfile | |
| import subprocess | |
| import json | |
| import re | |
| from flask import Flask, request, jsonify | |
| from flask_cors import CORS | |
| # ===================================================== | |
| # CONFIGURAÇÃO | |
| # ===================================================== | |
| API_KEY = os.environ.get("API_KEY", "") | |
| MAX_FILE_SIZE_MB = 5120 | |
| MAX_DURATION_SEC = 8000 | |
| VALID_EXTENSIONS = { | |
| "mp4", "mkv", "avi", "mov", "wmv", "flv", | |
| "webm", "m4v", "mp3", "wav", "ogg", "m4a", | |
| "aac", "wma", "flac" | |
| } | |
| VALID_MODELS = ["tiny", "base", "small", "medium"] | |
| DEFAULT_MODEL = "base" | |
| # Correções comuns em PT-BR | |
| PTBR_CORRECTIONS = { | |
| r'\bpijão\b': 'pijamão', | |
| r'\bpijao\b': 'pijamão', | |
| r'\bta\b': 'tá', | |
| r'\bvc\b': 'você', | |
| r'\btô\b': 'tô', | |
| r'\bné\b': 'né', | |
| r'\bdto\b': 'direito', | |
| r'\besqdo\b': 'esquerdo', | |
| } | |
| # ===================================================== | |
| # APP FLASK | |
| # ===================================================== | |
| app = Flask(__name__) | |
| CORS(app, resources={r"/*": {"origins": "*"}}) | |
| # ===================================================== | |
| # CACHE DE MODELOS | |
| # ===================================================== | |
| _models = {} | |
| def get_model(name="base"): | |
| import whisper | |
| import torch | |
| if name not in _models: | |
| print(f"[INFO] Carregando modelo '{name}'...") | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| _models[name] = whisper.load_model(name, device=device) | |
| print(f"[INFO] Modelo '{name}' carregado no dispositivo: {device}") | |
| return _models[name] | |
| # ===================================================== | |
| # FUNÇÕES AUXILIARES | |
| # ===================================================== | |
| def format_timestamp(seconds): | |
| hrs = int(seconds // 3600) | |
| mins = int((seconds % 3600) // 60) | |
| secs = int(seconds % 60) | |
| millis = int((seconds - int(seconds)) * 1000) | |
| return f"{hrs:02d}:{mins:02d}:{secs:02d},{millis:03d}" | |
| def generate_srt(segments): | |
| srt_lines = [] | |
| for i, seg in enumerate(segments, 1): | |
| start = format_timestamp(seg["start"]) | |
| end = format_timestamp(seg["end"]) | |
| text = seg["text"].strip() | |
| srt_lines.append(f"{i}\n{start} --> {end}\n{text}\n") | |
| return "\n".join(srt_lines) | |
| def apply_ptbr_corrections(text): | |
| corrected = text | |
| for pattern, replacement in PTBR_CORRECTIONS.items(): | |
| corrected = re.sub(pattern, replacement, corrected, flags=re.IGNORECASE) | |
| return corrected | |
| def validate_file(file): | |
| if not file or file.filename == "": | |
| return False, "Nenhum arquivo enviado" | |
| ext = file.filename.rsplit(".", 1)[-1].lower() if "." in file.filename else "" | |
| if ext not in VALID_EXTENSIONS: | |
| return False, f"Formato '.{ext}' não suportado. Use: {', '.join(sorted(VALID_EXTENSIONS))}" | |
| return True, "" | |
| def check_api_key(): | |
| if not API_KEY: | |
| return True | |
| key = request.headers.get("X-API-Key", "") | |
| return key == API_KEY | |
| def get_duration(filepath): | |
| try: | |
| result = subprocess.run( | |
| ["ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", filepath], | |
| capture_output=True, text=True, timeout=30 | |
| ) | |
| info = json.loads(result.stdout) | |
| return float(info.get("format", {}).get("duration", 0)) | |
| except Exception: | |
| return 0 | |
| # ===================================================== | |
| # ROTAS | |
| # ===================================================== | |
| def index(): | |
| import torch | |
| return jsonify({ | |
| "app": "TranscreVid API", | |
| "status": "online", | |
| "version": "2.1", | |
| "device": "cuda" if torch.cuda.is_available() else "cpu", | |
| "models_available": VALID_MODELS, | |
| "models_loaded": list(_models.keys()), | |
| "protected": bool(API_KEY), | |
| "max_file_mb": MAX_FILE_SIZE_MB, | |
| "max_duration_sec": MAX_DURATION_SEC, | |
| "features": ["txt", "srt", "pt-br corrections"] | |
| }) | |
| def health(): | |
| import torch | |
| return jsonify({ | |
| "status": "ok", | |
| "device": "cuda" if torch.cuda.is_available() else "cpu" | |
| }) | |
| def transcribe(): | |
| import torch | |
| if not check_api_key(): | |
| return jsonify({"error": "API Key inválida"}), 401 | |
| if "video" not in request.files: | |
| return jsonify({"error": "Envie um arquivo no campo 'video'"}), 400 | |
| file = request.files["video"] | |
| valid, msg = validate_file(file) | |
| if not valid: | |
| return jsonify({"error": msg}), 400 | |
| output_format = request.form.get("format", "txt").lower() | |
| model_name = request.form.get("model", DEFAULT_MODEL).lower() | |
| language_input = request.form.get("language", "").strip() | |
| is_ptbr = False | |
| is_ptpt = False | |
| if language_input == "pt-br": | |
| language = "pt" | |
| is_ptbr = True | |
| elif language_input == "pt-pt": | |
| language = "pt" | |
| is_ptpt = True | |
| elif language_input: | |
| language = language_input | |
| else: | |
| language = None | |
| if output_format not in ("txt", "srt"): | |
| return jsonify({"error": "Formato deve ser 'txt' ou 'srt'"}), 400 | |
| if model_name not in VALID_MODELS: | |
| return jsonify({"error": f"Modelo '{model_name}' inválido. Use: {', '.join(VALID_MODELS)}"}), 400 | |
| tmp_video = None | |
| tmp_audio = None | |
| try: | |
| start_time = time.time() | |
| ext = file.filename.rsplit(".", 1)[-1].lower() | |
| tmp_video = tempfile.NamedTemporaryFile( | |
| delete=False, suffix=f".{ext}", dir="/tmp" | |
| ) | |
| file.save(tmp_video.name) | |
| tmp_video.close() | |
| file_size_mb = os.path.getsize(tmp_video.name) / (1024 * 1024) | |
| if file_size_mb > MAX_FILE_SIZE_MB: | |
| return jsonify({"error": f"Arquivo muito grande ({file_size_mb:.0f} MB). Máximo: {MAX_FILE_SIZE_MB} MB"}), 400 | |
| duration = get_duration(tmp_video.name) | |
| if duration > MAX_DURATION_SEC: | |
| return jsonify({"error": f"Vídeo muito longo ({duration:.0f}s). Máximo: {MAX_DURATION_SEC}s"}), 400 | |
| tmp_audio = tempfile.NamedTemporaryFile( | |
| delete=False, suffix=".wav", dir="/tmp" | |
| ) | |
| tmp_audio.close() | |
| ffmpeg_cmd = [ | |
| "ffmpeg", "-y", | |
| "-i", tmp_video.name, | |
| "-vn", | |
| "-acodec", "pcm_s16le", | |
| "-ar", "16000", | |
| "-ac", "1", | |
| tmp_audio.name | |
| ] | |
| result = subprocess.run( | |
| ffmpeg_cmd, capture_output=True, text=True, timeout=300 | |
| ) | |
| if result.returncode != 0: | |
| return jsonify({"error": f"Erro ao extrair áudio: {result.stderr[:500]}"}), 500 | |
| model = get_model(model_name) | |
| transcribe_opts = {"fp16": False} | |
| if language: | |
| transcribe_opts["language"] = language | |
| if is_ptbr: | |
| transcribe_opts["initial_prompt"] = ( | |
| "Esta é uma transcrição em português brasileiro. " | |
| "Use vocabulário e expressões do Brasil. " | |
| "Exemplos: pijamão, camisetão, tá, né, você, pra, legal, beleza, " | |
| "carrinho, TikTok, confortável." | |
| ) | |
| elif is_ptpt: | |
| transcribe_opts["initial_prompt"] = ( | |
| "Esta é uma transcrição em português europeu. " | |
| "Use vocabulário e expressões de Portugal. " | |
| "Exemplos: fixe, giro, autocarro, telemóvel, pequeno-almoço, " | |
| "fantástico, espetacular." | |
| ) | |
| result = model.transcribe(tmp_audio.name, **transcribe_opts) | |
| processing_time = time.time() - start_time | |
| segments = result.get("segments", []) | |
| if is_ptbr: | |
| for seg in segments: | |
| seg["text"] = apply_ptbr_corrections(seg["text"]) | |
| full_text = apply_ptbr_corrections(result.get("text", "").strip()) | |
| else: | |
| full_text = result.get("text", "").strip() | |
| if output_format == "srt": | |
| transcription = generate_srt(segments) | |
| else: | |
| transcription = full_text | |
| word_count = len(transcription.split()) if output_format == "txt" else sum( | |
| len(s.get("text", "").split()) for s in segments | |
| ) | |
| detected_lang = result.get("language", "desconhecido") | |
| if is_ptbr: | |
| detected_lang = "pt-br" | |
| elif is_ptpt: | |
| detected_lang = "pt-pt" | |
| return jsonify({ | |
| "transcription": transcription, | |
| "format": output_format, | |
| "language_detected": detected_lang, | |
| "duration_seconds": round(duration, 2), | |
| "processing_seconds": round(processing_time, 2), | |
| "word_count": word_count, | |
| "segments_count": len(segments), | |
| "speed": f"{duration / processing_time:.1f}x" if processing_time > 0 else "N/A", | |
| "model_used": model_name, | |
| "device": "cuda" if torch.cuda.is_available() else "cpu" | |
| }) | |
| except subprocess.TimeoutExpired: | |
| return jsonify({"error": "Processamento demorou demais. Tente um vídeo menor."}), 504 | |
| except Exception as e: | |
| return jsonify({"error": f"Erro interno: {str(e)}"}), 500 | |
| finally: | |
| for f in [tmp_video, tmp_audio]: | |
| if f and os.path.exists(f.name): | |
| try: | |
| os.unlink(f.name) | |
| except OSError: | |
| pass | |
| # ===================================================== | |
| # INICIAR SERVIDOR | |
| # ===================================================== | |
| if __name__ == "__main__": | |
| print("=" * 50) | |
| print(" TranscreVid API v2.1") | |
| print("=" * 50) | |
| try: | |
| print(f"[INIT] Carregando modelo '{DEFAULT_MODEL}'...") | |
| get_model(DEFAULT_MODEL) | |
| print(f"[INIT] Modelo carregado com sucesso!") | |
| except Exception as e: | |
| print(f"[WARN] Erro ao pre-carregar modelo: {e}") | |
| print(f"[INIT] Servidor iniciando na porta 7860...") | |
| app.run(host="0.0.0.0", port=7860, debug=False) | |