import re import json import time import requests from typing import Optional from fastapi import HTTPException from gemini_webapi.constants import Model class ProcessLogger: def __init__(self, agent_name: str, record_id: str = "N/A"): self.agent_name = agent_name self.record_id = record_id self.start_time = time.time() self.last_step_time = self.start_time import datetime self.log("🚀 Iniciando processo...") def log(self, message: str): import datetime now = datetime.datetime.now() abs_time = now.strftime("%H:%M:%S") curr_time = time.time() rel_time = curr_time - self.start_time step_time = curr_time - self.last_step_time self.last_step_time = curr_time prefix = f"[{abs_time}][{self.agent_name.upper()}][#{self.record_id}]" timing = f"(total: {rel_time:.1f}s | passo: {step_time:.1f}s)" print(f"{prefix} {timing} {message}") def clean_and_validate_srt(srt_content): if "```" in srt_content: code_block_pattern = re.compile(r"```(?:srt)?\n(.*?)```", re.DOTALL | re.IGNORECASE) match = code_block_pattern.search(srt_content) if match: srt_content = match.group(1).strip() first_block_pattern = re.compile(r"^\s*\d+\s*\n\d{2}:\d{2}:\d{2},\d{3}", re.MULTILINE) match = first_block_pattern.search(srt_content) if match: srt_content = srt_content[match.start():] pattern = re.compile(r"(\d+)\s*\n([^-\n]+?) --> ([^-\n]+?)\s*\n((?:(?!^\d+\s*\n).+\n?)*)", re.MULTILINE) matches = pattern.findall(srt_content) def corrigir_timestamp(timestamp): timestamp = timestamp.strip() if re.match(r"\d{2}:\d{2}:\d{2},\d{3}", timestamp): return timestamp if re.match(r"\d{2}:\d{2},\d{3}", timestamp): return f"00:{timestamp}" if re.match(r"\d{1}:\d{2},\d{3}", timestamp): parts = timestamp.split(":") return f"00:{parts[0].zfill(2)}:{parts[1]}" if re.match(r"\d{1,2},\d{3}", timestamp): seconds_ms = timestamp.split(",") return f"00:00:{seconds_ms[0].zfill(2)},{seconds_ms[1]}" if re.match(r"\d{2}:\d{2}:\d{3}", timestamp): parts = timestamp.split(":") if len(parts) == 3: h, m, s_ms = parts if len(s_ms) == 3: return f"{h}:{m}:00,{s_ms}" elif len(s_ms) >= 4: s, ms = s_ms[:-3], s_ms[-3:] return f"{h}:{m}:{s.zfill(2)},{ms}" return timestamp srt_corrigido = "" for i, (num, start, end, text) in enumerate(matches, 1): text = text.strip() if not text: continue text_lines = [line.strip() for line in text.split('\n') if line.strip()] if len(text_lines) > 2: text = text_lines[0] + '\n' + ' '.join(text_lines[1:]) srt_corrigido += f"{i}\n{corrigir_timestamp(start)} --> {corrigir_timestamp(end)}\n{text}\n\n" return srt_corrigido.strip() def download_file_with_retry(url: str, max_retries: int = 3, timeout: int = 300, logger: Optional[ProcessLogger] = None): headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)', 'Accept': '*/*' } for attempt in range(max_retries): try: if attempt > 0: if logger: logger.log(f"🔄 Tentativa {attempt + 1} de download...") time.sleep(2 ** attempt) response = requests.get(url, headers=headers, timeout=timeout, stream=True) if response.status_code == 429: wait_time = int(response.headers.get('Retry-After', (2 ** attempt) * 5)) time.sleep(wait_time) continue response.raise_for_status() return response except requests.exceptions.HTTPError as e: if e.response.status_code == 429 and attempt < max_retries - 1: continue elif attempt == max_retries - 1: raise HTTPException(status_code=400, detail=str(e)) except requests.exceptions.RequestException as e: if attempt == max_retries - 1: raise HTTPException(status_code=400, detail=str(e)) raise HTTPException(status_code=400, detail="Falha ao baixar arquivo") def extract_json_from_text(text: str): original_text = text text = text.strip() if "```json" in text: text = text.split("```json")[1].split("```")[0].strip() elif "```" in text: parts = text.split("```") if len(parts) >= 2: text = parts[1].strip() start_idx_dict = text.find('{') start_idx_list = text.find('[') if start_idx_dict != -1 and (start_idx_list == -1 or start_idx_dict < start_idx_list): end_idx = text.rfind('}') if end_idx != -1: text = text[start_idx_dict:end_idx+1] elif start_idx_list != -1: end_idx = text.rfind(']') if end_idx != -1: text = text[start_idx_list:end_idx+1] # Limpeza final: remover vírgulas extras e outros caracteres óbvios de erro da IA text = re.sub(r',\s*([\]}])', r'\1', text) # Reparar erro específico de parênteses extras antes de fechar chaves/colchetes text = re.sub(r'"\s*\)\s*([\]}])', r'"\1', text) text = re.sub(r'\}\s*\)\s*([\]}])', r'}\1', text) try: return json.loads(text) except Exception as e: def repair_backslash(match): full_match = match.group(0) if re.match(r'\\(["\\\/bfnrt]|u[0-9a-fA-F]{4})', full_match): return full_match return '\\\\' + full_match[1:] text_repaired = re.sub(r'\\u[0-9a-fA-F]{4}|\\.', repair_backslash, text, flags=re.DOTALL) try: return json.loads(text_repaired) except Exception as e2: print(f"⚠️ Falha ao decodificar JSON após reparo. Erro original: {e} | Erro pós-reparo: {e2}") return None def get_gemini_model(model_name: str): model_name_lower = model_name.lower() if model_name else "flash" if "thinking" in model_name_lower: return Model.G_3_FLASH_THINKING_AI_FREE elif "pro" in model_name_lower: return Model.G_3_PRO_AI_FREE return Model.G_3_FLASH_AI_FREE