| import re |
| import json |
| import time |
| import requests |
| from typing import Optional |
| from fastapi import HTTPException |
| from gemini_webapi.constants import Model |
|
|
| class ProcessLogger: |
| def __init__(self, agent_name: str, record_id: str = "N/A"): |
| self.agent_name = agent_name |
| self.record_id = record_id |
| self.start_time = time.time() |
| self.last_step_time = self.start_time |
| import datetime |
| self.log("馃殌 Iniciando processo...") |
|
|
| def log(self, message: str): |
| import datetime |
| now = datetime.datetime.now() |
| abs_time = now.strftime("%H:%M:%S") |
| curr_time = time.time() |
| rel_time = curr_time - self.start_time |
| step_time = curr_time - self.last_step_time |
| self.last_step_time = curr_time |
| |
| prefix = f"[{abs_time}][{self.agent_name.upper()}][#{self.record_id}]" |
| timing = f"(total: {rel_time:.1f}s | passo: {step_time:.1f}s)" |
| |
| print(f"{prefix} {timing} {message}") |
|
|
| def clean_and_validate_srt(srt_content): |
| if "```" in srt_content: |
| code_block_pattern = re.compile(r"```(?:srt)?\n(.*?)```", re.DOTALL | re.IGNORECASE) |
| match = code_block_pattern.search(srt_content) |
| if match: |
| srt_content = match.group(1).strip() |
| |
| first_block_pattern = re.compile(r"^\s*\d+\s*\n\d{2}:\d{2}:\d{2},\d{3}", re.MULTILINE) |
| match = first_block_pattern.search(srt_content) |
| if match: srt_content = srt_content[match.start():] |
| |
| pattern = re.compile(r"(\d+)\s*\n([^-\n]+?) --> ([^-\n]+?)\s*\n((?:(?!^\d+\s*\n).+\n?)*)", re.MULTILINE) |
| matches = pattern.findall(srt_content) |
| |
| def corrigir_timestamp(timestamp): |
| timestamp = timestamp.strip() |
| if re.match(r"\d{2}:\d{2}:\d{2},\d{3}", timestamp): return timestamp |
| if re.match(r"\d{2}:\d{2},\d{3}", timestamp): return f"00:{timestamp}" |
| if re.match(r"\d{1}:\d{2},\d{3}", timestamp): |
| parts = timestamp.split(":") |
| return f"00:{parts[0].zfill(2)}:{parts[1]}" |
| if re.match(r"\d{1,2},\d{3}", timestamp): |
| seconds_ms = timestamp.split(",") |
| return f"00:00:{seconds_ms[0].zfill(2)},{seconds_ms[1]}" |
| if re.match(r"\d{2}:\d{2}:\d{3}", timestamp): |
| parts = timestamp.split(":") |
| if len(parts) == 3: |
| h, m, s_ms = parts |
| if len(s_ms) == 3: return f"{h}:{m}:00,{s_ms}" |
| elif len(s_ms) >= 4: |
| s, ms = s_ms[:-3], s_ms[-3:] |
| return f"{h}:{m}:{s.zfill(2)},{ms}" |
| return timestamp |
| |
| srt_corrigido = "" |
| for i, (num, start, end, text) in enumerate(matches, 1): |
| text = text.strip() |
| if not text: continue |
| text_lines = [line.strip() for line in text.split('\n') if line.strip()] |
| if len(text_lines) > 2: |
| text = text_lines[0] + '\n' + ' '.join(text_lines[1:]) |
| srt_corrigido += f"{i}\n{corrigir_timestamp(start)} --> {corrigir_timestamp(end)}\n{text}\n\n" |
| return srt_corrigido.strip() |
|
|
| def download_file_with_retry(url: str, max_retries: int = 3, timeout: int = 300, logger: Optional[ProcessLogger] = None): |
| headers = { |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)', |
| 'Accept': '*/*' |
| } |
| for attempt in range(max_retries): |
| try: |
| if attempt > 0: |
| if logger: logger.log(f"馃攧 Tentativa {attempt + 1} de download...") |
| time.sleep(2 ** attempt) |
| response = requests.get(url, headers=headers, timeout=timeout, stream=True) |
| if response.status_code == 429: |
| wait_time = int(response.headers.get('Retry-After', (2 ** attempt) * 5)) |
| time.sleep(wait_time) |
| continue |
| response.raise_for_status() |
| return response |
| except requests.exceptions.HTTPError as e: |
| if e.response.status_code == 429 and attempt < max_retries - 1: continue |
| elif attempt == max_retries - 1: raise HTTPException(status_code=400, detail=str(e)) |
| except requests.exceptions.RequestException as e: |
| if attempt == max_retries - 1: raise HTTPException(status_code=400, detail=str(e)) |
| raise HTTPException(status_code=400, detail="Falha ao baixar arquivo") |
|
|
| def extract_json_from_text(text: str): |
| original_text = text |
| text = text.strip() |
| if "```json" in text: |
| text = text.split("```json")[1].split("```")[0].strip() |
| elif "```" in text: |
| parts = text.split("```") |
| if len(parts) >= 2: text = parts[1].strip() |
| |
| start_idx_dict = text.find('{') |
| start_idx_list = text.find('[') |
|
|
| if start_idx_dict != -1 and (start_idx_list == -1 or start_idx_dict < start_idx_list): |
| end_idx = text.rfind('}') |
| if end_idx != -1: text = text[start_idx_dict:end_idx+1] |
| elif start_idx_list != -1: |
| end_idx = text.rfind(']') |
| if end_idx != -1: text = text[start_idx_list:end_idx+1] |
|
|
| |
| text = re.sub(r',\s*([\]}])', r'\1', text) |
| |
| |
| text = re.sub(r'"\s*\)\s*([\]}])', r'"\1', text) |
| text = re.sub(r'\}\s*\)\s*([\]}])', r'}\1', text) |
|
|
| try: |
| return json.loads(text) |
| except Exception as e: |
| def repair_backslash(match): |
| full_match = match.group(0) |
| if re.match(r'\\(["\\\/bfnrt]|u[0-9a-fA-F]{4})', full_match): |
| return full_match |
| return '\\\\' + full_match[1:] |
|
|
| text_repaired = re.sub(r'\\u[0-9a-fA-F]{4}|\\.', repair_backslash, text, flags=re.DOTALL) |
| |
| try: |
| return json.loads(text_repaired) |
| except Exception as e2: |
| print(f"鈿狅笍 Falha ao decodificar JSON ap贸s reparo. Erro original: {e} | Erro p贸s-reparo: {e2}") |
| return None |
|
|
| def get_gemini_model(model_name: str): |
| model_name_lower = model_name.lower() if model_name else "flash" |
| if "thinking" in model_name_lower: |
| return Model.G_3_FLASH_THINKING_AI_FREE |
| elif "pro" in model_name_lower: |
| return Model.G_3_PRO_AI_FREE |
| return Model.G_3_FLASH_AI_FREE |