subapi / app.py
habulaj's picture
Update app.py
50984c5 verified
from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse
from typing import Optional
import os
import tempfile
from pathlib import Path
import re
import requests
import time
from gemini_client import AsyncChatbot, Model, load_cookies
app = FastAPI(title="Gemini Chat API", description="API para interagir com Google Gemini")
# Inicializar chatbot globalmente
chatbot = None
async def update_cookie_if_needed(cookie_path: str, secure_1psid: str, secure_1psidts: str, additional_cookies: dict):
"""
Tenta atualizar o cookie __Secure-1PSIDTS se necessário.
Retorna o novo cookie ou o original se não precisar atualizar.
"""
# Não tentar atualizar proativamente - deixar o sistema fazer isso quando necessário
# Isso evita erros 401/404 quando o cookie já expirou
return secure_1psidts
async def init_chatbot(retry_count=0, max_retries=2):
"""
Inicializa o chatbot com os cookies de forma assíncrona.
Tenta atualizar cookies automaticamente se falhar.
"""
global chatbot
cookie_path = os.getenv("COOKIE_PATH", "cookies.json")
if not os.path.exists(cookie_path):
raise FileNotFoundError(f"Arquivo de cookies não encontrado: {cookie_path}")
try:
# Carregar cookies
secure_1psid, secure_1psidts, additional_cookies = load_cookies(cookie_path)
# Tentar atualizar cookie proativamente antes de inicializar
if retry_count == 0:
secure_1psidts = await update_cookie_if_needed(cookie_path, secure_1psid, secure_1psidts, additional_cookies)
# Criar AsyncChatbot diretamente
chatbot = await AsyncChatbot.create(
secure_1psid=secure_1psid,
secure_1psidts=secure_1psidts,
model=Model.G_2_5_PRO,
additional_cookies=additional_cookies,
cookie_path=cookie_path
)
print(f"Chatbot inicializado com sucesso usando {cookie_path}")
except (ValueError, PermissionError) as e:
error_str = str(e).lower()
# Se o erro é relacionado a cookie expirado, não tentar atualizar novamente
# O sistema já tentou atualizar automaticamente e falhou
print(f"Erro ao inicializar chatbot: {e}")
print(f"AVISO: Cookies podem estar expirados. Por favor, atualize manualmente os cookies em {cookie_path}")
print(f"Para atualizar: acesse https://gemini.google.com/app e copie os novos cookies __Secure-1PSID e __Secure-1PSIDTS")
raise
except Exception as e:
print(f"Erro ao inicializar chatbot: {e}")
raise
# Inicializar na startup
@app.on_event("startup")
async def startup_event():
await init_chatbot()
@app.get("/")
def root():
"""Endpoint raiz"""
return {"status": "ok", "message": "Gemini Chat API está funcionando"}
def srt_time_to_seconds(timestamp):
"""Converte timestamp SRT (HH:MM:SS,mmm) para segundos"""
try:
time_part, ms_part = timestamp.split(",")
h, m, s = map(int, time_part.split(":"))
ms = int(ms_part)
return h * 3600 + m * 60 + s + ms / 1000.0
except:
return 0.0
def seconds_to_srt_time(seconds):
"""Converte segundos para timestamp SRT (HH:MM:SS,mmm)"""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
ms = int((seconds % 1) * 1000)
return f"{hours:02d}:{minutes:02d}:{secs:02d},{ms:03d}"
def cut_srt_by_time(srt_content, start_time, end_time):
"""
Corta legendas SRT baseado em tempo de início e fim.
Ajusta os timestamps para começar do zero.
Parâmetros:
- srt_content: Conteúdo SRT original
- start_time: Tempo de início em segundos
- end_time: Tempo de fim em segundos
Retorna: SRT cortado e ajustado
"""
if start_time is None or end_time is None:
return srt_content
# Padrão para capturar legendas
pattern = re.compile(r"(\d+)\s*\n([^-\n]+?) --> ([^-\n]+?)\s*\n((?:(?!^\d+\s*\n).+\n?)*)", re.MULTILINE)
matches = pattern.findall(srt_content)
filtered_subtitles = []
for num, start, end, text in matches:
start_seconds = srt_time_to_seconds(start.strip())
end_seconds = srt_time_to_seconds(end.strip())
# Verificar se a legenda está dentro do intervalo [start_time, end_time]
# Incluir legendas que se sobrepõem parcialmente
if end_seconds > start_time and start_seconds < end_time:
# Ajustar timestamps para começar do zero
new_start = max(0, start_seconds - start_time)
new_end = min(end_time - start_time, end_seconds - start_time)
# Garantir que new_end > new_start
if new_end > new_start:
filtered_subtitles.append({
'start': new_start,
'end': new_end,
'text': text.strip()
})
# Gerar SRT cortado
srt_cut = ""
for i, sub in enumerate(filtered_subtitles, 1):
start_srt = seconds_to_srt_time(sub['start'])
end_srt = seconds_to_srt_time(sub['end'])
srt_cut += f"{i}\n{start_srt} --> {end_srt}\n{sub['text']}\n\n"
return srt_cut.strip()
def clean_and_validate_srt(srt_content):
"""Limpa e valida conteúdo SRT seguindo o padrão do example.py"""
if "```" in srt_content:
# Remover markdown code blocks
parts = srt_content.split("```")
if len(parts) > 1:
# Pegar o conteúdo dentro dos blocos de código
for part in parts:
if "srt" in part.lower() or not part.strip().startswith("srt"):
srt_content = part.strip()
break
# Padrão mais flexível para capturar timestamps mal formatados
pattern = re.compile(r"(\d+)\s*\n([^-\n]+?) --> ([^-\n]+?)\s*\n((?:(?!^\d+\s*\n).+\n?)*)", re.MULTILINE)
matches = pattern.findall(srt_content)
def corrigir_timestamp(timestamp):
timestamp = timestamp.strip()
# Se já está correto, retorna
if re.match(r"\d{2}:\d{2}:\d{2},\d{3}", timestamp):
return timestamp
# Formato: MM:SS,mmm -> HH:MM:SS,mmm
if re.match(r"\d{2}:\d{2},\d{3}", timestamp):
return f"00:{timestamp}"
# Formato: M:SS,mmm -> HH:MM:SS,mmm
if re.match(r"\d{1}:\d{2},\d{3}", timestamp):
parts = timestamp.split(":")
minutes = parts[0].zfill(2)
return f"00:{minutes}:{parts[1]}"
# Formato: SS,mmm -> HH:MM:SS,mmm
if re.match(r"\d{1,2},\d{3}", timestamp):
seconds_ms = timestamp.split(",")
seconds = seconds_ms[0].zfill(2)
return f"00:00:{seconds},{seconds_ms[1]}"
# Outros formatos problemáticos
if re.match(r"\d{2}:\d{2}:\d{3}", timestamp):
parts = timestamp.split(":")
if len(parts) == 3:
h, m, s_ms = parts
if len(s_ms) == 3:
return f"{h}:{m}:00,{s_ms}"
elif len(s_ms) >= 4:
s = s_ms[:-3]
ms = s_ms[-3:]
return f"{h}:{m}:{s.zfill(2)},{ms}"
return timestamp
srt_corrigido = ""
for i, (num, start, end, text) in enumerate(matches, 1):
text = text.strip()
if not text:
continue
# Verificar se a legenda tem mais de 2 linhas
text_lines = [line.strip() for line in text.split('\n') if line.strip()]
if len(text_lines) > 2:
# Limitar a 2 linhas, juntando as extras na segunda linha
text = text_lines[0] + '\n' + ' '.join(text_lines[1:])
start_corrigido = corrigir_timestamp(start)
end_corrigido = corrigir_timestamp(end)
srt_corrigido += f"{i}\n{start_corrigido} --> {end_corrigido}\n{text}\n\n"
return srt_corrigido.strip()
def download_file_with_retry(url: str, max_retries: int = 3, timeout: int = 300):
"""
Baixa arquivo com retry logic e tratamento de rate limiting.
Parâmetros:
- url: URL do arquivo
- max_retries: Número máximo de tentativas
- timeout: Timeout em segundos
Retorna: Response object do requests
"""
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': '*/*',
'Accept-Language': 'en-US,en;q=0.9',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1'
}
for attempt in range(max_retries):
try:
if attempt > 0:
# Backoff exponencial: 2^attempt segundos
wait_time = 2 ** attempt
print(f"⏳ Aguardando {wait_time}s antes de tentar novamente (tentativa {attempt + 1}/{max_retries})...")
time.sleep(wait_time)
print(f"📥 Tentativa {attempt + 1}/{max_retries} - Baixando arquivo de: {url}")
response = requests.get(url, headers=headers, timeout=timeout, stream=True)
# Tratar erro 429 (Too Many Requests)
if response.status_code == 429:
retry_after = response.headers.get('Retry-After')
if retry_after:
wait_time = int(retry_after)
print(f"⚠️ Rate limit atingido. Aguardando {wait_time}s conforme Retry-After header...")
time.sleep(wait_time)
elif attempt < max_retries - 1:
# Se não houver Retry-After, usar backoff exponencial
wait_time = (2 ** attempt) * 5 # 5s, 10s, 20s...
print(f"⚠️ Rate limit atingido. Aguardando {wait_time}s antes de tentar novamente...")
time.sleep(wait_time)
continue
else:
raise HTTPException(
status_code=429,
detail=f"Rate limit atingido após {max_retries} tentativas. Tente novamente mais tarde."
)
response.raise_for_status()
return response
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429 and attempt < max_retries - 1:
continue
elif attempt == max_retries - 1:
raise HTTPException(
status_code=400,
detail=f"Erro ao baixar arquivo após {max_retries} tentativas: {str(e)}"
)
else:
raise
except requests.exceptions.RequestException as e:
if attempt == max_retries - 1:
raise HTTPException(
status_code=400,
detail=f"Erro ao baixar arquivo após {max_retries} tentativas: {str(e)}"
)
continue
raise HTTPException(
status_code=400,
detail=f"Falha ao baixar arquivo após {max_retries} tentativas"
)
@app.get("/subtitle")
async def generate_subtitle(
file: str,
context: Optional[str] = None,
start: Optional[float] = None,
end: Optional[float] = None
):
"""
Endpoint para gerar legendas SRT a partir de um arquivo (imagem, vídeo ou áudio).
Parâmetros:
- file: URL do arquivo (imagem, vídeo ou áudio)
- context: Contexto adicional opcional para a geração de legendas
- start: Tempo de início para cortar legendas (em segundos)
- end: Tempo de fim para cortar legendas (em segundos)
Retorna:
- Arquivo SRT formatado (cortado se start e end forem fornecidos)
"""
if chatbot is None:
raise HTTPException(status_code=500, detail="Chatbot não inicializado")
if not file:
raise HTTPException(status_code=400, detail="Parâmetro 'file' é obrigatório")
temp_file = None
try:
# Baixar arquivo da URL com retry
response = download_file_with_retry(file, max_retries=3, timeout=300)
# Determinar tipo de mídia e extensão
content_type = response.headers.get('content-type', '').lower()
file_extension = None
if 'video' in content_type:
file_extension = '.mp4'
media_type = 'video'
elif 'audio' in content_type:
file_extension = '.mp3'
media_type = 'audio'
elif 'image' in content_type:
# Determinar extensão da imagem
if 'jpeg' in content_type or 'jpg' in content_type:
file_extension = '.jpg'
elif 'png' in content_type:
file_extension = '.png'
elif 'gif' in content_type:
file_extension = '.gif'
elif 'webp' in content_type:
file_extension = '.webp'
else:
file_extension = '.jpg'
media_type = 'image'
else:
# Tentar inferir do URL
url_lower = file.lower()
if any(ext in url_lower for ext in ['.mp4', '.avi', '.mov', '.webm', '.mkv']):
file_extension = '.mp4'
media_type = 'video'
elif any(ext in url_lower for ext in ['.mp3', '.wav', '.ogg', '.flac', '.aac', '.m4a']):
file_extension = '.mp3'
media_type = 'audio'
elif any(ext in url_lower for ext in ['.jpg', '.jpeg', '.png', '.gif', '.webp']):
file_extension = Path(file).suffix or '.jpg'
media_type = 'image'
else:
raise HTTPException(status_code=400, detail="Tipo de arquivo não suportado. Use imagem, vídeo ou áudio.")
# Salvar arquivo temporariamente
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=file_extension)
for chunk in response.iter_content(chunk_size=8192):
if chunk:
temp_file.write(chunk)
temp_file.close()
print(f"✅ Arquivo baixado: {temp_file.name} (tipo: {media_type})")
# Preparar prompt (mesmo do example.py)
context_text = context.strip() if context else "N/A"
media_desc = 'áudio' if media_type in ['video', 'audio'] else 'conteúdo'
media_desc_final = 'VÍDEO' if media_type in ['video', 'audio'] else 'CONTEÚDO'
prompt = f"Gere uma legenda em formato SRT para este {media_desc} seguindo RIGOROSAMENTE todas as especificações do sistema.\n\nContexto adicional: {context_text}"
# System instruction (mesmo do example.py)
system_instruction = f"""FORMATO TÉCNICO OBRIGATÓRIO
Estrutura de cada bloco:
[número sequencial]
HH:MM:SS,mmm --> HH:MM:SS,mmm
[texto da legenda]
[linha em branco]
CRÍTICO - Formato de tempo:
- SEMPRE usar: HH:MM:SS,mmm (exemplo: 00:01:23,456)
- Vírgula (,) separando segundos de milissegundos
- Duas casas para horas, minutos e segundos
- Três casas para milissegundos
- Nunca omitir as horas, mesmo que sejam 00
PADRÃO NETFLIX - REGRAS DE TEXTO
Limitações de caracteres:
- Máximo 2 linhas por legenda
- Máximo 42 caracteres por linha (incluindo espaços e pontuação)
- Quebras de linha devem respeitar unidades semânticas (não partir palavras ou expressões)
Separação de falas:
- NUNCA misture falas de pessoas diferentes na mesma legenda
- Se houver mudança de locutor, SEMPRE crie um novo bloco numerado
- Única exceção: diálogos rápidos marcados com hífen (veja abaixo)
Uso de hífen (-):
Use APENAS para:
1. Diálogos alternados quando o timing impede separação:
- Vamos?
- Vamos!
2. Interrupções abruptas de fala
3. Falas sobrepostas simultâneas
NÃO use hífen para:
- Fala única de uma pessoa
- Marcação desnecessária de locutor
NATURALIDADE E EMOÇÃO
Idioma:
- Português brasileiro natural
- Adaptar gírias, expressões regionais e modo de falar brasileiro
- Evitar traduções literais ou formais demais
Expressão emocional:
- Gritos, ênfase forte: LETRAS MAIÚSCULAS
- Hesitação, pausa: reticências (...)
- Surpresa, exclamação: ponto de exclamação (!)
- Interrogação: ponto de interrogação (?)
- Nunca deixe frases importantes sem pontuação
- Exemplos:
- "João" → "João..." (hesitante)
- "João" → "João!" (chamando com urgência)
- "João" → "JOÃO!" (gritando)
SINCRONIA TEMPORAL
- Precisão de milissegundos
- Início da legenda: EXATAMENTE quando a fala começa
- Fim da legenda: quando a fala termina (mínimo 1 segundo de exibição)
- Respeitar pausas naturais entre falas
EXEMPLO DE FORMATAÇÃO PERFEITA
1
00:00:01,200 --> 00:00:04,000
Oi, tudo bem?
2
00:00:04,500 --> 00:00:06,800
Tudo ótimo, e você?
3
00:00:07,100 --> 00:00:09,500
- Quer almoçar comigo?
- Claro!
4
00:00:10,000 --> 00:00:12,300
QUE LEGAL!
5
00:00:12,800 --> 00:00:15,100
Não acredito que você aceitou...
INSTRUÇÕES FINAIS
- Retorne APENAS o arquivo SRT formatado
- Sem explicações, comentários ou textos adicionais
- Sem marcadores de código (```), apenas o conteúdo puro
- Numere sequencialmente a partir de 1
- Linha em branco entre cada bloco de legenda
TRADUZA TUDO DE IMPORTANTE NO {media_desc_final}, que tenha dialogo... Nunca deixe passar nada."""
# Adicionar system instruction ao prompt
full_prompt = f"{system_instruction}\n\n{prompt}"
# Enviar para o Gemini
print(f"🧠 Enviando {media_type} para o Gemini...")
# Determinar qual parâmetro usar baseado no tipo de mídia
if media_type == 'image':
response_gemini = await chatbot.ask(full_prompt, image=temp_file.name)
elif media_type == 'video':
response_gemini = await chatbot.ask(full_prompt, video=temp_file.name)
else: # audio
response_gemini = await chatbot.ask(full_prompt, audio=temp_file.name)
if response_gemini.get("error"):
raise HTTPException(
status_code=500,
detail=f"Erro ao gerar legendas: {response_gemini.get('content', 'Erro desconhecido')}"
)
# Extrair conteúdo SRT da resposta
raw_srt = response_gemini.get("content", "").strip()
if not raw_srt or len(raw_srt) < 10:
raise HTTPException(
status_code=500,
detail="Nenhuma legenda foi gerada - arquivo pode estar vazio ou inaudível"
)
# Limpar e validar SRT
print("📝 Processando formato SRT...")
srt_cleaned = clean_and_validate_srt(raw_srt)
if not srt_cleaned or len(srt_cleaned.strip()) < 10:
raise HTTPException(
status_code=500,
detail="Falha ao processar formato SRT - resposta inválida"
)
# Aplicar corte de legendas se start e end forem fornecidos
if start is not None and end is not None:
print(f"✂️ Cortando legendas: {start}s - {end}s")
srt_cleaned = cut_srt_by_time(srt_cleaned, start, end)
if not srt_cleaned or len(srt_cleaned.strip()) < 10:
raise HTTPException(
status_code=500,
detail="Nenhuma legenda encontrada no intervalo especificado"
)
# Retornar SRT em JSON
return JSONResponse(
content={
"srt": srt_cleaned,
"success": True,
"media_type": media_type
}
)
except HTTPException:
raise
except requests.RequestException as e:
raise HTTPException(status_code=400, detail=f"Erro ao baixar arquivo: {str(e)}")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Erro ao gerar legendas: {str(e)}")
finally:
# Limpar arquivo temporário
if temp_file and os.path.exists(temp_file.name):
try:
os.unlink(temp_file.name)
except:
pass