from fastapi import FastAPI, HTTPException from fastapi.responses import JSONResponse from typing import Optional import os import tempfile from pathlib import Path import re import requests import time from gemini_client import AsyncChatbot, Model, load_cookies app = FastAPI(title="Gemini Chat API", description="API para interagir com Google Gemini") # Inicializar chatbot globalmente chatbot = None async def update_cookie_if_needed(cookie_path: str, secure_1psid: str, secure_1psidts: str, additional_cookies: dict): """ Tenta atualizar o cookie __Secure-1PSIDTS se necessário. Retorna o novo cookie ou o original se não precisar atualizar. """ # Não tentar atualizar proativamente - deixar o sistema fazer isso quando necessário # Isso evita erros 401/404 quando o cookie já expirou return secure_1psidts async def init_chatbot(retry_count=0, max_retries=2): """ Inicializa o chatbot com os cookies de forma assíncrona. Tenta atualizar cookies automaticamente se falhar. """ global chatbot cookie_path = os.getenv("COOKIE_PATH", "cookies.json") if not os.path.exists(cookie_path): raise FileNotFoundError(f"Arquivo de cookies não encontrado: {cookie_path}") try: # Carregar cookies secure_1psid, secure_1psidts, additional_cookies = load_cookies(cookie_path) # Tentar atualizar cookie proativamente antes de inicializar if retry_count == 0: secure_1psidts = await update_cookie_if_needed(cookie_path, secure_1psid, secure_1psidts, additional_cookies) # Criar AsyncChatbot diretamente chatbot = await AsyncChatbot.create( secure_1psid=secure_1psid, secure_1psidts=secure_1psidts, model=Model.G_2_5_PRO, additional_cookies=additional_cookies, cookie_path=cookie_path ) print(f"Chatbot inicializado com sucesso usando {cookie_path}") except (ValueError, PermissionError) as e: error_str = str(e).lower() # Se o erro é relacionado a cookie expirado, não tentar atualizar novamente # O sistema já tentou atualizar automaticamente e falhou print(f"Erro ao inicializar chatbot: {e}") print(f"AVISO: Cookies podem estar expirados. Por favor, atualize manualmente os cookies em {cookie_path}") print(f"Para atualizar: acesse https://gemini.google.com/app e copie os novos cookies __Secure-1PSID e __Secure-1PSIDTS") raise except Exception as e: print(f"Erro ao inicializar chatbot: {e}") raise # Inicializar na startup @app.on_event("startup") async def startup_event(): await init_chatbot() @app.get("/") def root(): """Endpoint raiz""" return {"status": "ok", "message": "Gemini Chat API está funcionando"} def srt_time_to_seconds(timestamp): """Converte timestamp SRT (HH:MM:SS,mmm) para segundos""" try: time_part, ms_part = timestamp.split(",") h, m, s = map(int, time_part.split(":")) ms = int(ms_part) return h * 3600 + m * 60 + s + ms / 1000.0 except: return 0.0 def seconds_to_srt_time(seconds): """Converte segundos para timestamp SRT (HH:MM:SS,mmm)""" hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) secs = int(seconds % 60) ms = int((seconds % 1) * 1000) return f"{hours:02d}:{minutes:02d}:{secs:02d},{ms:03d}" def cut_srt_by_time(srt_content, start_time, end_time): """ Corta legendas SRT baseado em tempo de início e fim. Ajusta os timestamps para começar do zero. Parâmetros: - srt_content: Conteúdo SRT original - start_time: Tempo de início em segundos - end_time: Tempo de fim em segundos Retorna: SRT cortado e ajustado """ if start_time is None or end_time is None: return srt_content # Padrão para capturar legendas pattern = re.compile(r"(\d+)\s*\n([^-\n]+?) --> ([^-\n]+?)\s*\n((?:(?!^\d+\s*\n).+\n?)*)", re.MULTILINE) matches = pattern.findall(srt_content) filtered_subtitles = [] for num, start, end, text in matches: start_seconds = srt_time_to_seconds(start.strip()) end_seconds = srt_time_to_seconds(end.strip()) # Verificar se a legenda está dentro do intervalo [start_time, end_time] # Incluir legendas que se sobrepõem parcialmente if end_seconds > start_time and start_seconds < end_time: # Ajustar timestamps para começar do zero new_start = max(0, start_seconds - start_time) new_end = min(end_time - start_time, end_seconds - start_time) # Garantir que new_end > new_start if new_end > new_start: filtered_subtitles.append({ 'start': new_start, 'end': new_end, 'text': text.strip() }) # Gerar SRT cortado srt_cut = "" for i, sub in enumerate(filtered_subtitles, 1): start_srt = seconds_to_srt_time(sub['start']) end_srt = seconds_to_srt_time(sub['end']) srt_cut += f"{i}\n{start_srt} --> {end_srt}\n{sub['text']}\n\n" return srt_cut.strip() def clean_and_validate_srt(srt_content): """Limpa e valida conteúdo SRT seguindo o padrão do example.py""" if "```" in srt_content: # Remover markdown code blocks parts = srt_content.split("```") if len(parts) > 1: # Pegar o conteúdo dentro dos blocos de código for part in parts: if "srt" in part.lower() or not part.strip().startswith("srt"): srt_content = part.strip() break # Padrão mais flexível para capturar timestamps mal formatados pattern = re.compile(r"(\d+)\s*\n([^-\n]+?) --> ([^-\n]+?)\s*\n((?:(?!^\d+\s*\n).+\n?)*)", re.MULTILINE) matches = pattern.findall(srt_content) def corrigir_timestamp(timestamp): timestamp = timestamp.strip() # Se já está correto, retorna if re.match(r"\d{2}:\d{2}:\d{2},\d{3}", timestamp): return timestamp # Formato: MM:SS,mmm -> HH:MM:SS,mmm if re.match(r"\d{2}:\d{2},\d{3}", timestamp): return f"00:{timestamp}" # Formato: M:SS,mmm -> HH:MM:SS,mmm if re.match(r"\d{1}:\d{2},\d{3}", timestamp): parts = timestamp.split(":") minutes = parts[0].zfill(2) return f"00:{minutes}:{parts[1]}" # Formato: SS,mmm -> HH:MM:SS,mmm if re.match(r"\d{1,2},\d{3}", timestamp): seconds_ms = timestamp.split(",") seconds = seconds_ms[0].zfill(2) return f"00:00:{seconds},{seconds_ms[1]}" # Outros formatos problemáticos if re.match(r"\d{2}:\d{2}:\d{3}", timestamp): parts = timestamp.split(":") if len(parts) == 3: h, m, s_ms = parts if len(s_ms) == 3: return f"{h}:{m}:00,{s_ms}" elif len(s_ms) >= 4: s = s_ms[:-3] ms = s_ms[-3:] return f"{h}:{m}:{s.zfill(2)},{ms}" return timestamp srt_corrigido = "" for i, (num, start, end, text) in enumerate(matches, 1): text = text.strip() if not text: continue # Verificar se a legenda tem mais de 2 linhas text_lines = [line.strip() for line in text.split('\n') if line.strip()] if len(text_lines) > 2: # Limitar a 2 linhas, juntando as extras na segunda linha text = text_lines[0] + '\n' + ' '.join(text_lines[1:]) start_corrigido = corrigir_timestamp(start) end_corrigido = corrigir_timestamp(end) srt_corrigido += f"{i}\n{start_corrigido} --> {end_corrigido}\n{text}\n\n" return srt_corrigido.strip() def download_file_with_retry(url: str, max_retries: int = 3, timeout: int = 300): """ Baixa arquivo com retry logic e tratamento de rate limiting. Parâmetros: - url: URL do arquivo - max_retries: Número máximo de tentativas - timeout: Timeout em segundos Retorna: Response object do requests """ headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': '*/*', 'Accept-Language': 'en-US,en;q=0.9', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1' } for attempt in range(max_retries): try: if attempt > 0: # Backoff exponencial: 2^attempt segundos wait_time = 2 ** attempt print(f"⏳ Aguardando {wait_time}s antes de tentar novamente (tentativa {attempt + 1}/{max_retries})...") time.sleep(wait_time) print(f"📥 Tentativa {attempt + 1}/{max_retries} - Baixando arquivo de: {url}") response = requests.get(url, headers=headers, timeout=timeout, stream=True) # Tratar erro 429 (Too Many Requests) if response.status_code == 429: retry_after = response.headers.get('Retry-After') if retry_after: wait_time = int(retry_after) print(f"⚠️ Rate limit atingido. Aguardando {wait_time}s conforme Retry-After header...") time.sleep(wait_time) elif attempt < max_retries - 1: # Se não houver Retry-After, usar backoff exponencial wait_time = (2 ** attempt) * 5 # 5s, 10s, 20s... print(f"⚠️ Rate limit atingido. Aguardando {wait_time}s antes de tentar novamente...") time.sleep(wait_time) continue else: raise HTTPException( status_code=429, detail=f"Rate limit atingido após {max_retries} tentativas. Tente novamente mais tarde." ) response.raise_for_status() return response except requests.exceptions.HTTPError as e: if e.response.status_code == 429 and attempt < max_retries - 1: continue elif attempt == max_retries - 1: raise HTTPException( status_code=400, detail=f"Erro ao baixar arquivo após {max_retries} tentativas: {str(e)}" ) else: raise except requests.exceptions.RequestException as e: if attempt == max_retries - 1: raise HTTPException( status_code=400, detail=f"Erro ao baixar arquivo após {max_retries} tentativas: {str(e)}" ) continue raise HTTPException( status_code=400, detail=f"Falha ao baixar arquivo após {max_retries} tentativas" ) @app.get("/subtitle") async def generate_subtitle( file: str, context: Optional[str] = None, start: Optional[float] = None, end: Optional[float] = None ): """ Endpoint para gerar legendas SRT a partir de um arquivo (imagem, vídeo ou áudio). Parâmetros: - file: URL do arquivo (imagem, vídeo ou áudio) - context: Contexto adicional opcional para a geração de legendas - start: Tempo de início para cortar legendas (em segundos) - end: Tempo de fim para cortar legendas (em segundos) Retorna: - Arquivo SRT formatado (cortado se start e end forem fornecidos) """ if chatbot is None: raise HTTPException(status_code=500, detail="Chatbot não inicializado") if not file: raise HTTPException(status_code=400, detail="Parâmetro 'file' é obrigatório") temp_file = None try: # Baixar arquivo da URL com retry response = download_file_with_retry(file, max_retries=3, timeout=300) # Determinar tipo de mídia e extensão content_type = response.headers.get('content-type', '').lower() file_extension = None if 'video' in content_type: file_extension = '.mp4' media_type = 'video' elif 'audio' in content_type: file_extension = '.mp3' media_type = 'audio' elif 'image' in content_type: # Determinar extensão da imagem if 'jpeg' in content_type or 'jpg' in content_type: file_extension = '.jpg' elif 'png' in content_type: file_extension = '.png' elif 'gif' in content_type: file_extension = '.gif' elif 'webp' in content_type: file_extension = '.webp' else: file_extension = '.jpg' media_type = 'image' else: # Tentar inferir do URL url_lower = file.lower() if any(ext in url_lower for ext in ['.mp4', '.avi', '.mov', '.webm', '.mkv']): file_extension = '.mp4' media_type = 'video' elif any(ext in url_lower for ext in ['.mp3', '.wav', '.ogg', '.flac', '.aac', '.m4a']): file_extension = '.mp3' media_type = 'audio' elif any(ext in url_lower for ext in ['.jpg', '.jpeg', '.png', '.gif', '.webp']): file_extension = Path(file).suffix or '.jpg' media_type = 'image' else: raise HTTPException(status_code=400, detail="Tipo de arquivo não suportado. Use imagem, vídeo ou áudio.") # Salvar arquivo temporariamente temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=file_extension) for chunk in response.iter_content(chunk_size=8192): if chunk: temp_file.write(chunk) temp_file.close() print(f"✅ Arquivo baixado: {temp_file.name} (tipo: {media_type})") # Preparar prompt (mesmo do example.py) context_text = context.strip() if context else "N/A" media_desc = 'áudio' if media_type in ['video', 'audio'] else 'conteúdo' media_desc_final = 'VÍDEO' if media_type in ['video', 'audio'] else 'CONTEÚDO' prompt = f"Gere uma legenda em formato SRT para este {media_desc} seguindo RIGOROSAMENTE todas as especificações do sistema.\n\nContexto adicional: {context_text}" # System instruction (mesmo do example.py) system_instruction = f"""FORMATO TÉCNICO OBRIGATÓRIO Estrutura de cada bloco: [número sequencial] HH:MM:SS,mmm --> HH:MM:SS,mmm [texto da legenda] [linha em branco] CRÍTICO - Formato de tempo: - SEMPRE usar: HH:MM:SS,mmm (exemplo: 00:01:23,456) - Vírgula (,) separando segundos de milissegundos - Duas casas para horas, minutos e segundos - Três casas para milissegundos - Nunca omitir as horas, mesmo que sejam 00 PADRÃO NETFLIX - REGRAS DE TEXTO Limitações de caracteres: - Máximo 2 linhas por legenda - Máximo 42 caracteres por linha (incluindo espaços e pontuação) - Quebras de linha devem respeitar unidades semânticas (não partir palavras ou expressões) Separação de falas: - NUNCA misture falas de pessoas diferentes na mesma legenda - Se houver mudança de locutor, SEMPRE crie um novo bloco numerado - Única exceção: diálogos rápidos marcados com hífen (veja abaixo) Uso de hífen (-): Use APENAS para: 1. Diálogos alternados quando o timing impede separação: - Vamos? - Vamos! 2. Interrupções abruptas de fala 3. Falas sobrepostas simultâneas NÃO use hífen para: - Fala única de uma pessoa - Marcação desnecessária de locutor NATURALIDADE E EMOÇÃO Idioma: - Português brasileiro natural - Adaptar gírias, expressões regionais e modo de falar brasileiro - Evitar traduções literais ou formais demais Expressão emocional: - Gritos, ênfase forte: LETRAS MAIÚSCULAS - Hesitação, pausa: reticências (...) - Surpresa, exclamação: ponto de exclamação (!) - Interrogação: ponto de interrogação (?) - Nunca deixe frases importantes sem pontuação - Exemplos: - "João" → "João..." (hesitante) - "João" → "João!" (chamando com urgência) - "João" → "JOÃO!" (gritando) SINCRONIA TEMPORAL - Precisão de milissegundos - Início da legenda: EXATAMENTE quando a fala começa - Fim da legenda: quando a fala termina (mínimo 1 segundo de exibição) - Respeitar pausas naturais entre falas EXEMPLO DE FORMATAÇÃO PERFEITA 1 00:00:01,200 --> 00:00:04,000 Oi, tudo bem? 2 00:00:04,500 --> 00:00:06,800 Tudo ótimo, e você? 3 00:00:07,100 --> 00:00:09,500 - Quer almoçar comigo? - Claro! 4 00:00:10,000 --> 00:00:12,300 QUE LEGAL! 5 00:00:12,800 --> 00:00:15,100 Não acredito que você aceitou... INSTRUÇÕES FINAIS - Retorne APENAS o arquivo SRT formatado - Sem explicações, comentários ou textos adicionais - Sem marcadores de código (```), apenas o conteúdo puro - Numere sequencialmente a partir de 1 - Linha em branco entre cada bloco de legenda TRADUZA TUDO DE IMPORTANTE NO {media_desc_final}, que tenha dialogo... Nunca deixe passar nada.""" # Adicionar system instruction ao prompt full_prompt = f"{system_instruction}\n\n{prompt}" # Enviar para o Gemini print(f"🧠 Enviando {media_type} para o Gemini...") # Determinar qual parâmetro usar baseado no tipo de mídia if media_type == 'image': response_gemini = await chatbot.ask(full_prompt, image=temp_file.name) elif media_type == 'video': response_gemini = await chatbot.ask(full_prompt, video=temp_file.name) else: # audio response_gemini = await chatbot.ask(full_prompt, audio=temp_file.name) if response_gemini.get("error"): raise HTTPException( status_code=500, detail=f"Erro ao gerar legendas: {response_gemini.get('content', 'Erro desconhecido')}" ) # Extrair conteúdo SRT da resposta raw_srt = response_gemini.get("content", "").strip() if not raw_srt or len(raw_srt) < 10: raise HTTPException( status_code=500, detail="Nenhuma legenda foi gerada - arquivo pode estar vazio ou inaudível" ) # Limpar e validar SRT print("📝 Processando formato SRT...") srt_cleaned = clean_and_validate_srt(raw_srt) if not srt_cleaned or len(srt_cleaned.strip()) < 10: raise HTTPException( status_code=500, detail="Falha ao processar formato SRT - resposta inválida" ) # Aplicar corte de legendas se start e end forem fornecidos if start is not None and end is not None: print(f"✂️ Cortando legendas: {start}s - {end}s") srt_cleaned = cut_srt_by_time(srt_cleaned, start, end) if not srt_cleaned or len(srt_cleaned.strip()) < 10: raise HTTPException( status_code=500, detail="Nenhuma legenda encontrada no intervalo especificado" ) # Retornar SRT em JSON return JSONResponse( content={ "srt": srt_cleaned, "success": True, "media_type": media_type } ) except HTTPException: raise except requests.RequestException as e: raise HTTPException(status_code=400, detail=f"Erro ao baixar arquivo: {str(e)}") except Exception as e: raise HTTPException(status_code=500, detail=f"Erro ao gerar legendas: {str(e)}") finally: # Limpar arquivo temporário if temp_file and os.path.exists(temp_file.name): try: os.unlink(temp_file.name) except: pass