subapi / utils /helpers.py
habulaj's picture
Create utils/helpers.py
0af7dfd verified
Raw
History Blame Contribute Delete
6.26 kB
import re
import json
import time
import requests
from typing import Optional
from fastapi import HTTPException
from gemini_webapi.constants import Model
class ProcessLogger:
def __init__(self, agent_name: str, record_id: str = "N/A"):
self.agent_name = agent_name
self.record_id = record_id
self.start_time = time.time()
self.last_step_time = self.start_time
import datetime
self.log("馃殌 Iniciando processo...")
def log(self, message: str):
import datetime
now = datetime.datetime.now()
abs_time = now.strftime("%H:%M:%S")
curr_time = time.time()
rel_time = curr_time - self.start_time
step_time = curr_time - self.last_step_time
self.last_step_time = curr_time
prefix = f"[{abs_time}][{self.agent_name.upper()}][#{self.record_id}]"
timing = f"(total: {rel_time:.1f}s | passo: {step_time:.1f}s)"
print(f"{prefix} {timing} {message}")
def clean_and_validate_srt(srt_content):
if "```" in srt_content:
code_block_pattern = re.compile(r"```(?:srt)?\n(.*?)```", re.DOTALL | re.IGNORECASE)
match = code_block_pattern.search(srt_content)
if match:
srt_content = match.group(1).strip()
first_block_pattern = re.compile(r"^\s*\d+\s*\n\d{2}:\d{2}:\d{2},\d{3}", re.MULTILINE)
match = first_block_pattern.search(srt_content)
if match: srt_content = srt_content[match.start():]
pattern = re.compile(r"(\d+)\s*\n([^-\n]+?) --> ([^-\n]+?)\s*\n((?:(?!^\d+\s*\n).+\n?)*)", re.MULTILINE)
matches = pattern.findall(srt_content)
def corrigir_timestamp(timestamp):
timestamp = timestamp.strip()
if re.match(r"\d{2}:\d{2}:\d{2},\d{3}", timestamp): return timestamp
if re.match(r"\d{2}:\d{2},\d{3}", timestamp): return f"00:{timestamp}"
if re.match(r"\d{1}:\d{2},\d{3}", timestamp):
parts = timestamp.split(":")
return f"00:{parts[0].zfill(2)}:{parts[1]}"
if re.match(r"\d{1,2},\d{3}", timestamp):
seconds_ms = timestamp.split(",")
return f"00:00:{seconds_ms[0].zfill(2)},{seconds_ms[1]}"
if re.match(r"\d{2}:\d{2}:\d{3}", timestamp):
parts = timestamp.split(":")
if len(parts) == 3:
h, m, s_ms = parts
if len(s_ms) == 3: return f"{h}:{m}:00,{s_ms}"
elif len(s_ms) >= 4:
s, ms = s_ms[:-3], s_ms[-3:]
return f"{h}:{m}:{s.zfill(2)},{ms}"
return timestamp
srt_corrigido = ""
for i, (num, start, end, text) in enumerate(matches, 1):
text = text.strip()
if not text: continue
text_lines = [line.strip() for line in text.split('\n') if line.strip()]
if len(text_lines) > 2:
text = text_lines[0] + '\n' + ' '.join(text_lines[1:])
srt_corrigido += f"{i}\n{corrigir_timestamp(start)} --> {corrigir_timestamp(end)}\n{text}\n\n"
return srt_corrigido.strip()
def download_file_with_retry(url: str, max_retries: int = 3, timeout: int = 300, logger: Optional[ProcessLogger] = None):
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)',
'Accept': '*/*'
}
for attempt in range(max_retries):
try:
if attempt > 0:
if logger: logger.log(f"馃攧 Tentativa {attempt + 1} de download...")
time.sleep(2 ** attempt)
response = requests.get(url, headers=headers, timeout=timeout, stream=True)
if response.status_code == 429:
wait_time = int(response.headers.get('Retry-After', (2 ** attempt) * 5))
time.sleep(wait_time)
continue
response.raise_for_status()
return response
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429 and attempt < max_retries - 1: continue
elif attempt == max_retries - 1: raise HTTPException(status_code=400, detail=str(e))
except requests.exceptions.RequestException as e:
if attempt == max_retries - 1: raise HTTPException(status_code=400, detail=str(e))
raise HTTPException(status_code=400, detail="Falha ao baixar arquivo")
def extract_json_from_text(text: str):
original_text = text
text = text.strip()
if "```json" in text:
text = text.split("```json")[1].split("```")[0].strip()
elif "```" in text:
parts = text.split("```")
if len(parts) >= 2: text = parts[1].strip()
start_idx_dict = text.find('{')
start_idx_list = text.find('[')
if start_idx_dict != -1 and (start_idx_list == -1 or start_idx_dict < start_idx_list):
end_idx = text.rfind('}')
if end_idx != -1: text = text[start_idx_dict:end_idx+1]
elif start_idx_list != -1:
end_idx = text.rfind(']')
if end_idx != -1: text = text[start_idx_list:end_idx+1]
# Limpeza final: remover v铆rgulas extras e outros caracteres 贸bvios de erro da IA
text = re.sub(r',\s*([\]}])', r'\1', text)
# Reparar erro espec铆fico de par锚nteses extras antes de fechar chaves/colchetes
text = re.sub(r'"\s*\)\s*([\]}])', r'"\1', text)
text = re.sub(r'\}\s*\)\s*([\]}])', r'}\1', text)
try:
return json.loads(text)
except Exception as e:
def repair_backslash(match):
full_match = match.group(0)
if re.match(r'\\(["\\\/bfnrt]|u[0-9a-fA-F]{4})', full_match):
return full_match
return '\\\\' + full_match[1:]
text_repaired = re.sub(r'\\u[0-9a-fA-F]{4}|\\.', repair_backslash, text, flags=re.DOTALL)
try:
return json.loads(text_repaired)
except Exception as e2:
print(f"鈿狅笍 Falha ao decodificar JSON ap贸s reparo. Erro original: {e} | Erro p贸s-reparo: {e2}")
return None
def get_gemini_model(model_name: str):
model_name_lower = model_name.lower() if model_name else "flash"
if "thinking" in model_name_lower:
return Model.G_3_FLASH_THINKING_AI_FREE
elif "pro" in model_name_lower:
return Model.G_3_PRO_AI_FREE
return Model.G_3_FLASH_AI_FREE