|
|
from fastapi import FastAPI, HTTPException |
|
|
from fastapi.responses import JSONResponse |
|
|
from typing import Optional |
|
|
import os |
|
|
import tempfile |
|
|
from pathlib import Path |
|
|
import re |
|
|
import requests |
|
|
import time |
|
|
|
|
|
from gemini_client import AsyncChatbot, Model, load_cookies |
|
|
|
|
|
app = FastAPI(title="Gemini Chat API", description="API para interagir com Google Gemini") |
|
|
|
|
|
|
|
|
chatbot = None |
|
|
|
|
|
async def update_cookie_if_needed(cookie_path: str, secure_1psid: str, secure_1psidts: str, additional_cookies: dict): |
|
|
""" |
|
|
Tenta atualizar o cookie __Secure-1PSIDTS se necessário. |
|
|
Retorna o novo cookie ou o original se não precisar atualizar. |
|
|
""" |
|
|
|
|
|
|
|
|
return secure_1psidts |
|
|
|
|
|
async def init_chatbot(retry_count=0, max_retries=2): |
|
|
""" |
|
|
Inicializa o chatbot com os cookies de forma assíncrona. |
|
|
Tenta atualizar cookies automaticamente se falhar. |
|
|
""" |
|
|
global chatbot |
|
|
cookie_path = os.getenv("COOKIE_PATH", "cookies.json") |
|
|
|
|
|
if not os.path.exists(cookie_path): |
|
|
raise FileNotFoundError(f"Arquivo de cookies não encontrado: {cookie_path}") |
|
|
|
|
|
try: |
|
|
|
|
|
secure_1psid, secure_1psidts, additional_cookies = load_cookies(cookie_path) |
|
|
|
|
|
|
|
|
if retry_count == 0: |
|
|
secure_1psidts = await update_cookie_if_needed(cookie_path, secure_1psid, secure_1psidts, additional_cookies) |
|
|
|
|
|
|
|
|
chatbot = await AsyncChatbot.create( |
|
|
secure_1psid=secure_1psid, |
|
|
secure_1psidts=secure_1psidts, |
|
|
model=Model.G_2_5_PRO, |
|
|
additional_cookies=additional_cookies, |
|
|
cookie_path=cookie_path |
|
|
) |
|
|
print(f"Chatbot inicializado com sucesso usando {cookie_path}") |
|
|
except (ValueError, PermissionError) as e: |
|
|
error_str = str(e).lower() |
|
|
|
|
|
|
|
|
print(f"Erro ao inicializar chatbot: {e}") |
|
|
print(f"AVISO: Cookies podem estar expirados. Por favor, atualize manualmente os cookies em {cookie_path}") |
|
|
print(f"Para atualizar: acesse https://gemini.google.com/app e copie os novos cookies __Secure-1PSID e __Secure-1PSIDTS") |
|
|
raise |
|
|
except Exception as e: |
|
|
print(f"Erro ao inicializar chatbot: {e}") |
|
|
raise |
|
|
|
|
|
|
|
|
@app.on_event("startup") |
|
|
async def startup_event(): |
|
|
await init_chatbot() |
|
|
|
|
|
@app.get("/") |
|
|
def root(): |
|
|
"""Endpoint raiz""" |
|
|
return {"status": "ok", "message": "Gemini Chat API está funcionando"} |
|
|
|
|
|
def srt_time_to_seconds(timestamp): |
|
|
"""Converte timestamp SRT (HH:MM:SS,mmm) para segundos""" |
|
|
try: |
|
|
time_part, ms_part = timestamp.split(",") |
|
|
h, m, s = map(int, time_part.split(":")) |
|
|
ms = int(ms_part) |
|
|
return h * 3600 + m * 60 + s + ms / 1000.0 |
|
|
except: |
|
|
return 0.0 |
|
|
|
|
|
def seconds_to_srt_time(seconds): |
|
|
"""Converte segundos para timestamp SRT (HH:MM:SS,mmm)""" |
|
|
hours = int(seconds // 3600) |
|
|
minutes = int((seconds % 3600) // 60) |
|
|
secs = int(seconds % 60) |
|
|
ms = int((seconds % 1) * 1000) |
|
|
return f"{hours:02d}:{minutes:02d}:{secs:02d},{ms:03d}" |
|
|
|
|
|
def cut_srt_by_time(srt_content, start_time, end_time): |
|
|
""" |
|
|
Corta legendas SRT baseado em tempo de início e fim. |
|
|
Ajusta os timestamps para começar do zero. |
|
|
|
|
|
Parâmetros: |
|
|
- srt_content: Conteúdo SRT original |
|
|
- start_time: Tempo de início em segundos |
|
|
- end_time: Tempo de fim em segundos |
|
|
|
|
|
Retorna: SRT cortado e ajustado |
|
|
""" |
|
|
if start_time is None or end_time is None: |
|
|
return srt_content |
|
|
|
|
|
|
|
|
pattern = re.compile(r"(\d+)\s*\n([^-\n]+?) --> ([^-\n]+?)\s*\n((?:(?!^\d+\s*\n).+\n?)*)", re.MULTILINE) |
|
|
matches = pattern.findall(srt_content) |
|
|
|
|
|
filtered_subtitles = [] |
|
|
for num, start, end, text in matches: |
|
|
start_seconds = srt_time_to_seconds(start.strip()) |
|
|
end_seconds = srt_time_to_seconds(end.strip()) |
|
|
|
|
|
|
|
|
|
|
|
if end_seconds > start_time and start_seconds < end_time: |
|
|
|
|
|
new_start = max(0, start_seconds - start_time) |
|
|
new_end = min(end_time - start_time, end_seconds - start_time) |
|
|
|
|
|
|
|
|
if new_end > new_start: |
|
|
filtered_subtitles.append({ |
|
|
'start': new_start, |
|
|
'end': new_end, |
|
|
'text': text.strip() |
|
|
}) |
|
|
|
|
|
|
|
|
srt_cut = "" |
|
|
for i, sub in enumerate(filtered_subtitles, 1): |
|
|
start_srt = seconds_to_srt_time(sub['start']) |
|
|
end_srt = seconds_to_srt_time(sub['end']) |
|
|
srt_cut += f"{i}\n{start_srt} --> {end_srt}\n{sub['text']}\n\n" |
|
|
|
|
|
return srt_cut.strip() |
|
|
|
|
|
def clean_and_validate_srt(srt_content): |
|
|
"""Limpa e valida conteúdo SRT seguindo o padrão do example.py""" |
|
|
if "```" in srt_content: |
|
|
|
|
|
parts = srt_content.split("```") |
|
|
if len(parts) > 1: |
|
|
|
|
|
for part in parts: |
|
|
if "srt" in part.lower() or not part.strip().startswith("srt"): |
|
|
srt_content = part.strip() |
|
|
break |
|
|
|
|
|
|
|
|
pattern = re.compile(r"(\d+)\s*\n([^-\n]+?) --> ([^-\n]+?)\s*\n((?:(?!^\d+\s*\n).+\n?)*)", re.MULTILINE) |
|
|
matches = pattern.findall(srt_content) |
|
|
|
|
|
def corrigir_timestamp(timestamp): |
|
|
timestamp = timestamp.strip() |
|
|
|
|
|
|
|
|
if re.match(r"\d{2}:\d{2}:\d{2},\d{3}", timestamp): |
|
|
return timestamp |
|
|
|
|
|
|
|
|
if re.match(r"\d{2}:\d{2},\d{3}", timestamp): |
|
|
return f"00:{timestamp}" |
|
|
|
|
|
|
|
|
if re.match(r"\d{1}:\d{2},\d{3}", timestamp): |
|
|
parts = timestamp.split(":") |
|
|
minutes = parts[0].zfill(2) |
|
|
return f"00:{minutes}:{parts[1]}" |
|
|
|
|
|
|
|
|
if re.match(r"\d{1,2},\d{3}", timestamp): |
|
|
seconds_ms = timestamp.split(",") |
|
|
seconds = seconds_ms[0].zfill(2) |
|
|
return f"00:00:{seconds},{seconds_ms[1]}" |
|
|
|
|
|
|
|
|
if re.match(r"\d{2}:\d{2}:\d{3}", timestamp): |
|
|
parts = timestamp.split(":") |
|
|
if len(parts) == 3: |
|
|
h, m, s_ms = parts |
|
|
if len(s_ms) == 3: |
|
|
return f"{h}:{m}:00,{s_ms}" |
|
|
elif len(s_ms) >= 4: |
|
|
s = s_ms[:-3] |
|
|
ms = s_ms[-3:] |
|
|
return f"{h}:{m}:{s.zfill(2)},{ms}" |
|
|
|
|
|
return timestamp |
|
|
|
|
|
srt_corrigido = "" |
|
|
for i, (num, start, end, text) in enumerate(matches, 1): |
|
|
text = text.strip() |
|
|
if not text: |
|
|
continue |
|
|
|
|
|
|
|
|
text_lines = [line.strip() for line in text.split('\n') if line.strip()] |
|
|
if len(text_lines) > 2: |
|
|
|
|
|
text = text_lines[0] + '\n' + ' '.join(text_lines[1:]) |
|
|
|
|
|
start_corrigido = corrigir_timestamp(start) |
|
|
end_corrigido = corrigir_timestamp(end) |
|
|
srt_corrigido += f"{i}\n{start_corrigido} --> {end_corrigido}\n{text}\n\n" |
|
|
|
|
|
return srt_corrigido.strip() |
|
|
|
|
|
def download_file_with_retry(url: str, max_retries: int = 3, timeout: int = 300): |
|
|
""" |
|
|
Baixa arquivo com retry logic e tratamento de rate limiting. |
|
|
|
|
|
Parâmetros: |
|
|
- url: URL do arquivo |
|
|
- max_retries: Número máximo de tentativas |
|
|
- timeout: Timeout em segundos |
|
|
|
|
|
Retorna: Response object do requests |
|
|
""" |
|
|
headers = { |
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', |
|
|
'Accept': '*/*', |
|
|
'Accept-Language': 'en-US,en;q=0.9', |
|
|
'Accept-Encoding': 'gzip, deflate, br', |
|
|
'Connection': 'keep-alive', |
|
|
'Upgrade-Insecure-Requests': '1' |
|
|
} |
|
|
|
|
|
for attempt in range(max_retries): |
|
|
try: |
|
|
if attempt > 0: |
|
|
|
|
|
wait_time = 2 ** attempt |
|
|
print(f"⏳ Aguardando {wait_time}s antes de tentar novamente (tentativa {attempt + 1}/{max_retries})...") |
|
|
time.sleep(wait_time) |
|
|
|
|
|
print(f"📥 Tentativa {attempt + 1}/{max_retries} - Baixando arquivo de: {url}") |
|
|
response = requests.get(url, headers=headers, timeout=timeout, stream=True) |
|
|
|
|
|
|
|
|
if response.status_code == 429: |
|
|
retry_after = response.headers.get('Retry-After') |
|
|
if retry_after: |
|
|
wait_time = int(retry_after) |
|
|
print(f"⚠️ Rate limit atingido. Aguardando {wait_time}s conforme Retry-After header...") |
|
|
time.sleep(wait_time) |
|
|
elif attempt < max_retries - 1: |
|
|
|
|
|
wait_time = (2 ** attempt) * 5 |
|
|
print(f"⚠️ Rate limit atingido. Aguardando {wait_time}s antes de tentar novamente...") |
|
|
time.sleep(wait_time) |
|
|
continue |
|
|
else: |
|
|
raise HTTPException( |
|
|
status_code=429, |
|
|
detail=f"Rate limit atingido após {max_retries} tentativas. Tente novamente mais tarde." |
|
|
) |
|
|
|
|
|
response.raise_for_status() |
|
|
return response |
|
|
|
|
|
except requests.exceptions.HTTPError as e: |
|
|
if e.response.status_code == 429 and attempt < max_retries - 1: |
|
|
continue |
|
|
elif attempt == max_retries - 1: |
|
|
raise HTTPException( |
|
|
status_code=400, |
|
|
detail=f"Erro ao baixar arquivo após {max_retries} tentativas: {str(e)}" |
|
|
) |
|
|
else: |
|
|
raise |
|
|
except requests.exceptions.RequestException as e: |
|
|
if attempt == max_retries - 1: |
|
|
raise HTTPException( |
|
|
status_code=400, |
|
|
detail=f"Erro ao baixar arquivo após {max_retries} tentativas: {str(e)}" |
|
|
) |
|
|
continue |
|
|
|
|
|
raise HTTPException( |
|
|
status_code=400, |
|
|
detail=f"Falha ao baixar arquivo após {max_retries} tentativas" |
|
|
) |
|
|
|
|
|
@app.get("/subtitle") |
|
|
async def generate_subtitle( |
|
|
file: str, |
|
|
context: Optional[str] = None, |
|
|
start: Optional[float] = None, |
|
|
end: Optional[float] = None |
|
|
): |
|
|
""" |
|
|
Endpoint para gerar legendas SRT a partir de um arquivo (imagem, vídeo ou áudio). |
|
|
|
|
|
Parâmetros: |
|
|
- file: URL do arquivo (imagem, vídeo ou áudio) |
|
|
- context: Contexto adicional opcional para a geração de legendas |
|
|
- start: Tempo de início para cortar legendas (em segundos) |
|
|
- end: Tempo de fim para cortar legendas (em segundos) |
|
|
|
|
|
Retorna: |
|
|
- Arquivo SRT formatado (cortado se start e end forem fornecidos) |
|
|
""" |
|
|
if chatbot is None: |
|
|
raise HTTPException(status_code=500, detail="Chatbot não inicializado") |
|
|
|
|
|
if not file: |
|
|
raise HTTPException(status_code=400, detail="Parâmetro 'file' é obrigatório") |
|
|
|
|
|
temp_file = None |
|
|
try: |
|
|
|
|
|
response = download_file_with_retry(file, max_retries=3, timeout=300) |
|
|
|
|
|
|
|
|
content_type = response.headers.get('content-type', '').lower() |
|
|
file_extension = None |
|
|
|
|
|
if 'video' in content_type: |
|
|
file_extension = '.mp4' |
|
|
media_type = 'video' |
|
|
elif 'audio' in content_type: |
|
|
file_extension = '.mp3' |
|
|
media_type = 'audio' |
|
|
elif 'image' in content_type: |
|
|
|
|
|
if 'jpeg' in content_type or 'jpg' in content_type: |
|
|
file_extension = '.jpg' |
|
|
elif 'png' in content_type: |
|
|
file_extension = '.png' |
|
|
elif 'gif' in content_type: |
|
|
file_extension = '.gif' |
|
|
elif 'webp' in content_type: |
|
|
file_extension = '.webp' |
|
|
else: |
|
|
file_extension = '.jpg' |
|
|
media_type = 'image' |
|
|
else: |
|
|
|
|
|
url_lower = file.lower() |
|
|
if any(ext in url_lower for ext in ['.mp4', '.avi', '.mov', '.webm', '.mkv']): |
|
|
file_extension = '.mp4' |
|
|
media_type = 'video' |
|
|
elif any(ext in url_lower for ext in ['.mp3', '.wav', '.ogg', '.flac', '.aac', '.m4a']): |
|
|
file_extension = '.mp3' |
|
|
media_type = 'audio' |
|
|
elif any(ext in url_lower for ext in ['.jpg', '.jpeg', '.png', '.gif', '.webp']): |
|
|
file_extension = Path(file).suffix or '.jpg' |
|
|
media_type = 'image' |
|
|
else: |
|
|
raise HTTPException(status_code=400, detail="Tipo de arquivo não suportado. Use imagem, vídeo ou áudio.") |
|
|
|
|
|
|
|
|
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=file_extension) |
|
|
for chunk in response.iter_content(chunk_size=8192): |
|
|
if chunk: |
|
|
temp_file.write(chunk) |
|
|
temp_file.close() |
|
|
|
|
|
print(f"✅ Arquivo baixado: {temp_file.name} (tipo: {media_type})") |
|
|
|
|
|
|
|
|
context_text = context.strip() if context else "N/A" |
|
|
media_desc = 'áudio' if media_type in ['video', 'audio'] else 'conteúdo' |
|
|
media_desc_final = 'VÍDEO' if media_type in ['video', 'audio'] else 'CONTEÚDO' |
|
|
|
|
|
prompt = f"Gere uma legenda em formato SRT para este {media_desc} seguindo RIGOROSAMENTE todas as especificações do sistema.\n\nContexto adicional: {context_text}" |
|
|
|
|
|
|
|
|
system_instruction = f"""FORMATO TÉCNICO OBRIGATÓRIO |
|
|
|
|
|
Estrutura de cada bloco: |
|
|
|
|
|
[número sequencial] |
|
|
HH:MM:SS,mmm --> HH:MM:SS,mmm |
|
|
[texto da legenda] |
|
|
[linha em branco] |
|
|
|
|
|
CRÍTICO - Formato de tempo: |
|
|
- SEMPRE usar: HH:MM:SS,mmm (exemplo: 00:01:23,456) |
|
|
- Vírgula (,) separando segundos de milissegundos |
|
|
- Duas casas para horas, minutos e segundos |
|
|
- Três casas para milissegundos |
|
|
- Nunca omitir as horas, mesmo que sejam 00 |
|
|
|
|
|
PADRÃO NETFLIX - REGRAS DE TEXTO |
|
|
|
|
|
Limitações de caracteres: |
|
|
- Máximo 2 linhas por legenda |
|
|
- Máximo 42 caracteres por linha (incluindo espaços e pontuação) |
|
|
- Quebras de linha devem respeitar unidades semânticas (não partir palavras ou expressões) |
|
|
|
|
|
Separação de falas: |
|
|
- NUNCA misture falas de pessoas diferentes na mesma legenda |
|
|
- Se houver mudança de locutor, SEMPRE crie um novo bloco numerado |
|
|
- Única exceção: diálogos rápidos marcados com hífen (veja abaixo) |
|
|
|
|
|
Uso de hífen (-): |
|
|
Use APENAS para: |
|
|
1. Diálogos alternados quando o timing impede separação: |
|
|
|
|
|
- Vamos? |
|
|
- Vamos! |
|
|
|
|
|
2. Interrupções abruptas de fala |
|
|
3. Falas sobrepostas simultâneas |
|
|
|
|
|
NÃO use hífen para: |
|
|
- Fala única de uma pessoa |
|
|
- Marcação desnecessária de locutor |
|
|
|
|
|
NATURALIDADE E EMOÇÃO |
|
|
|
|
|
Idioma: |
|
|
- Português brasileiro natural |
|
|
- Adaptar gírias, expressões regionais e modo de falar brasileiro |
|
|
- Evitar traduções literais ou formais demais |
|
|
|
|
|
Expressão emocional: |
|
|
- Gritos, ênfase forte: LETRAS MAIÚSCULAS |
|
|
- Hesitação, pausa: reticências (...) |
|
|
- Surpresa, exclamação: ponto de exclamação (!) |
|
|
- Interrogação: ponto de interrogação (?) |
|
|
- Nunca deixe frases importantes sem pontuação |
|
|
- Exemplos: |
|
|
- "João" → "João..." (hesitante) |
|
|
- "João" → "João!" (chamando com urgência) |
|
|
- "João" → "JOÃO!" (gritando) |
|
|
|
|
|
SINCRONIA TEMPORAL |
|
|
|
|
|
- Precisão de milissegundos |
|
|
- Início da legenda: EXATAMENTE quando a fala começa |
|
|
- Fim da legenda: quando a fala termina (mínimo 1 segundo de exibição) |
|
|
- Respeitar pausas naturais entre falas |
|
|
|
|
|
EXEMPLO DE FORMATAÇÃO PERFEITA |
|
|
|
|
|
1 |
|
|
00:00:01,200 --> 00:00:04,000 |
|
|
Oi, tudo bem? |
|
|
|
|
|
2 |
|
|
00:00:04,500 --> 00:00:06,800 |
|
|
Tudo ótimo, e você? |
|
|
|
|
|
3 |
|
|
00:00:07,100 --> 00:00:09,500 |
|
|
- Quer almoçar comigo? |
|
|
- Claro! |
|
|
|
|
|
4 |
|
|
00:00:10,000 --> 00:00:12,300 |
|
|
QUE LEGAL! |
|
|
|
|
|
5 |
|
|
00:00:12,800 --> 00:00:15,100 |
|
|
Não acredito que você aceitou... |
|
|
|
|
|
INSTRUÇÕES FINAIS |
|
|
|
|
|
- Retorne APENAS o arquivo SRT formatado |
|
|
- Sem explicações, comentários ou textos adicionais |
|
|
- Sem marcadores de código (```), apenas o conteúdo puro |
|
|
- Numere sequencialmente a partir de 1 |
|
|
- Linha em branco entre cada bloco de legenda |
|
|
|
|
|
TRADUZA TUDO DE IMPORTANTE NO {media_desc_final}, que tenha dialogo... Nunca deixe passar nada.""" |
|
|
|
|
|
|
|
|
full_prompt = f"{system_instruction}\n\n{prompt}" |
|
|
|
|
|
|
|
|
print(f"🧠 Enviando {media_type} para o Gemini...") |
|
|
|
|
|
|
|
|
if media_type == 'image': |
|
|
response_gemini = await chatbot.ask(full_prompt, image=temp_file.name) |
|
|
elif media_type == 'video': |
|
|
response_gemini = await chatbot.ask(full_prompt, video=temp_file.name) |
|
|
else: |
|
|
response_gemini = await chatbot.ask(full_prompt, audio=temp_file.name) |
|
|
|
|
|
if response_gemini.get("error"): |
|
|
raise HTTPException( |
|
|
status_code=500, |
|
|
detail=f"Erro ao gerar legendas: {response_gemini.get('content', 'Erro desconhecido')}" |
|
|
) |
|
|
|
|
|
|
|
|
raw_srt = response_gemini.get("content", "").strip() |
|
|
|
|
|
if not raw_srt or len(raw_srt) < 10: |
|
|
raise HTTPException( |
|
|
status_code=500, |
|
|
detail="Nenhuma legenda foi gerada - arquivo pode estar vazio ou inaudível" |
|
|
) |
|
|
|
|
|
|
|
|
print("📝 Processando formato SRT...") |
|
|
srt_cleaned = clean_and_validate_srt(raw_srt) |
|
|
|
|
|
if not srt_cleaned or len(srt_cleaned.strip()) < 10: |
|
|
raise HTTPException( |
|
|
status_code=500, |
|
|
detail="Falha ao processar formato SRT - resposta inválida" |
|
|
) |
|
|
|
|
|
|
|
|
if start is not None and end is not None: |
|
|
print(f"✂️ Cortando legendas: {start}s - {end}s") |
|
|
srt_cleaned = cut_srt_by_time(srt_cleaned, start, end) |
|
|
if not srt_cleaned or len(srt_cleaned.strip()) < 10: |
|
|
raise HTTPException( |
|
|
status_code=500, |
|
|
detail="Nenhuma legenda encontrada no intervalo especificado" |
|
|
) |
|
|
|
|
|
|
|
|
return JSONResponse( |
|
|
content={ |
|
|
"srt": srt_cleaned, |
|
|
"success": True, |
|
|
"media_type": media_type |
|
|
} |
|
|
) |
|
|
|
|
|
except HTTPException: |
|
|
raise |
|
|
except requests.RequestException as e: |
|
|
raise HTTPException(status_code=400, detail=f"Erro ao baixar arquivo: {str(e)}") |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=f"Erro ao gerar legendas: {str(e)}") |
|
|
finally: |
|
|
|
|
|
if temp_file and os.path.exists(temp_file.name): |
|
|
try: |
|
|
os.unlink(temp_file.name) |
|
|
except: |
|
|
pass |
|
|
|