| |
| """ |
| ParaAI - Ferramentas Compartilhadas para Workers (v1.0) |
| ======================================================== |
| Centraliza todas as classes comuns utilizadas pelos orquestradores |
| batch e direct, eliminando duplicação de código. |
| |
| Classes: |
| - GitManager: Gerencia operações Git (clone, sync, commit, push) |
| - StateManager: Gerencia CSV de controle de chunks |
| - QueueManager: Carrega tarefas de chunks de entrada |
| - ResultCollector: Coleta resultados e gera chunks de saída |
| |
| Todas as classes utilizam parâmetros explícitos para máxima flexibilidade |
| e são agnósticas em relação à configuração (devem receber paths e constantes). |
| |
| Autor: Equipe ParaAI |
| Data: 2026-02-12 |
| """ |
|
|
| import os |
| import json |
| import shutil |
| import tarfile |
| import io |
| import logging |
| from pathlib import Path |
| from threading import Lock |
| from datetime import datetime |
| from typing import List, Dict, Optional |
|
|
| import pandas as pd |
| from git import Repo, GitCommandError |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| |
| |
| |
| class GitManager: |
| """ |
| Gerencia Git usando estratégia 'Force Sync'. |
| Encapsula autenticação, clonagem, sincronização e publicação. |
| """ |
|
|
| def __init__(self, token: str, repo_slug: str, branch: str, |
| repo_path: Path, work_path: Path): |
| self.token = token |
| self.repo_slug = repo_slug |
| self.branch = branch |
| self.repo_path = Path(repo_path) |
| self.work_path = Path(work_path) |
| self.lock = Lock() |
|
|
| |
| if "huggingface.co" in repo_slug or "hf.co" in repo_slug: |
| self.remote_url = f"https://user:{token}@huggingface.co/{repo_slug}.git" |
| else: |
| self.remote_url = f"https://oauth2:{token}@github.com/{repo_slug}.git" |
|
|
| logger.info(f"🔧 GitManager: Repo={repo_slug} | Branch={branch}") |
| self.repo = self._setup_repo() |
|
|
| def _setup_repo(self) -> Repo: |
| """Clona o repositório se não existir, ou abre o existente.""" |
| try: |
| if not self.repo_path.exists() or not any(self.repo_path.iterdir()): |
| logger.info(f"📥 Clonando {self.repo_slug} para {self.repo_path}...") |
| self.repo_path.mkdir(parents=True, exist_ok=True) |
| repo = Repo.clone_from(self.remote_url, self.repo_path, branch=self.branch, depth=1, single_branch=True) |
| else: |
| logger.info(f"📂 Repositório local encontrado em {self.repo_path}. Abrindo...") |
| repo = Repo(self.repo_path) |
| repo.remotes.origin.set_url(self.remote_url) |
|
|
| |
| with repo.config_writer() as config: |
| config.set_value("pull", "rebase", "true") |
| config.set_value("user", "email", "paraai-bot@automacao.com") |
| config.set_value("user", "name", "ParaAI Worker") |
| config.set_value("core", "askPass", "echo") |
| config.set_value("credential", "helper", "") |
|
|
| return repo |
|
|
| except GitCommandError as e: |
| logger.critical(f"❌ Falha crítica na configuração do Git: {e}", exc_info=True) |
| if self.repo_path.exists(): |
| shutil.rmtree(self.repo_path, ignore_errors=True) |
| raise |
|
|
| def sincronizar_com_remoto(self) -> bool: |
| """Reset → Clean → Pull (força sincronização).""" |
| with self.lock: |
| try: |
| logger.info("🔄 [Git] Resetando e limpando repositório local...") |
| self.repo.git.reset("--hard", "HEAD") |
| self.repo.git.clean("-fd") |
| logger.info("⬇️ [Git] Executando Pull (Rebase)...") |
| self.repo.remotes.origin.pull() |
| logger.info("✅ [Git] Sincronização concluída.") |
| return True |
| except GitCommandError as e: |
| logger.error(f"❌ [Git] Falha ao sincronizar: {e}") |
| return False |
|
|
| def copiar_arquivos_de_trabalho(self, arquivos_relativos: list) -> bool: |
| """Copia arquivos da pasta de trabalho para a pasta do repositório.""" |
| try: |
| count = 0 |
| for arquivo_rel in arquivos_relativos: |
| origem = self.work_path / arquivo_rel |
| destino = self.repo_path / arquivo_rel |
| if origem.exists(): |
| destino.parent.mkdir(parents=True, exist_ok=True) |
| shutil.copy2(origem, destino) |
| count += 1 |
| logger.info(f"📋 [Git] {count} arquivos copiados para área de stage.") |
| return True |
| except Exception as e: |
| logger.error(f"❌ [Git] Erro na cópia de arquivos: {e}") |
| return False |
|
|
| def publicar_alteracoes(self, arquivos_relativos: list, commit_msg: str) -> bool: |
| """ |
| Fluxo completo: Sync → Copy → Add → Commit → Push |
| """ |
| with self.lock: |
| try: |
| logger.info("🚀 [Git] Iniciando publicação...") |
|
|
| |
| logger.info("1️⃣ [Git] Sincronizando...") |
| self.repo.git.reset("--hard", "HEAD") |
| self.repo.git.clean("-fd") |
| self.repo.remotes.origin.pull() |
|
|
| |
| logger.info("2️⃣ [Git] Copiando arquivos...") |
| if not self.copiar_arquivos_de_trabalho(arquivos_relativos): |
| return False |
|
|
| |
| logger.info("3️⃣ [Git] Adicionando arquivos...") |
| self.repo.git.add(A=True) |
|
|
| if not self.repo.is_dirty(untracked_files=True): |
| logger.warning("⚠️ [Git] Nenhuma alteração detectada para commit.") |
| return True |
|
|
| |
| logger.info(f"4️⃣ [Git] Commit: {commit_msg}") |
| self.repo.index.commit(commit_msg) |
|
|
| |
| logger.info("5️⃣ [Git] Push...") |
| try: |
| self.repo.remotes.origin.push() |
| except GitCommandError as push_err: |
| logger.warning(f"⚠️ [Git] Push rejeitado — tentando rebase: {push_err}") |
| try: |
| self.repo.git.fetch("origin") |
| self.repo.git.rebase(f"origin/{self.branch}") |
| self.repo.remotes.origin.push() |
| logger.info("✅ [Git] Push após rebase ok.") |
| except GitCommandError as rebase_err: |
| logger.error(f"❌ [Git] Rebase falhou: {rebase_err}") |
| |
| try: |
| self.repo.git.rebase("--abort") |
| except Exception: |
| pass |
| raise |
|
|
| logger.info("✅ [Git] SUCESSO! Dados enviados.") |
| return True |
|
|
| except GitCommandError as e: |
| logger.error(f"❌ [Git] Falha no processo Git: {e}", exc_info=True) |
| return False |
| except Exception as e: |
| logger.error(f"❌ [Git] Erro inesperado: {e}", exc_info=True) |
| return False |
|
|
|
|
| |
| |
| |
| class StateManager: |
| """Gerencia o CSV de controle de chunks.""" |
|
|
| COLUMNS = [ |
| 'Id_chunk_data_gz', |
| 'Id_chunk_llm_gz', |
| 'dataini', |
| 'datafim', |
| 'total_ok', |
| 'lista_id_err', |
| 'status' |
| ] |
|
|
| DTYPE_MAP = { |
| 'Id_chunk_data_gz': 'string', |
| 'Id_chunk_llm_gz': 'string', |
| 'dataini': 'string', |
| 'datafim': 'string', |
| 'total_ok': 'Int64', |
| 'lista_id_err': 'string', |
| 'status': 'string' |
| } |
|
|
| def __init__(self, work_dir: Path, chunks_dir: Path): |
| """ |
| Args: |
| work_dir: Diretório de trabalho (onde o CSV será salvo). |
| chunks_dir: Diretório contendo os chunks de entrada (chunks_dados). |
| """ |
| self.state_csv_path = work_dir / "new_tracking_processamento_up.csv" |
| self.chunks_dir = chunks_dir |
| self._carregar_ou_criar_csv() |
| self.id_minimo = 1 |
|
|
| def _carregar_ou_criar_csv(self): |
| if self.state_csv_path.exists(): |
| self.df = pd.read_csv( |
| self.state_csv_path, |
| dtype={k: (str if v == 'string' else v) for k, v in self.DTYPE_MAP.items()} |
| ) |
| for col in ['Id_chunk_data_gz', 'Id_chunk_llm_gz', |
| 'dataini', 'datafim', 'lista_id_err', 'status']: |
| self.df[col] = self.df[col].astype('string') |
| else: |
| self.df = pd.DataFrame(columns=self.COLUMNS).astype(self.DTYPE_MAP) |
|
|
| def sincronizar_com_chunks_locais(self) -> bool: |
| """Adiciona ao CSV chunks que existem em disco mas não estão no CSV.""" |
| if not self.chunks_dir.exists(): |
| return False |
|
|
| chunks_no_csv = set(self.df['Id_chunk_data_gz'].dropna().astype(str)) |
| chunks_no_disco = [ |
| p.name.replace("chunk_dados_", "").split('.')[0] |
| for p in self.chunks_dir.glob("chunk_dados_*.tar.gz") |
| ] |
|
|
| novos = 0 |
| for chunk_id in sorted(chunks_no_disco): |
| if chunk_id not in chunks_no_csv: |
| novo = { |
| 'Id_chunk_data_gz': chunk_id, |
| 'Id_chunk_llm_gz': chunk_id, |
| 'dataini': '', |
| 'datafim': '', |
| 'total_ok': 0, |
| 'lista_id_err': '[]', |
| 'status': 'pendente' |
| } |
| self.df = pd.concat([self.df, pd.DataFrame([novo])], ignore_index=True) |
| self.df = self.df.astype(self.DTYPE_MAP) |
| novos += 1 |
|
|
| if novos > 0: |
| logger.info(f"➕ {novos} novo(s) chunk(s) adicionado(s) ao CSV.") |
| self.salvar_csv() |
| return True |
| return False |
|
|
| def selecionar_proximo_chunk(self) -> Optional[str]: |
| """Retorna o ID do próximo chunk pendente (ordem alfabética) a partir de um ID mínimo.""" |
| pendentes = self.df[ |
| (self.df['status'] == 'pendente') & |
| (self.df['Id_chunk_data_gz'].astype(int) >= self.id_minimo) |
| ] |
| if pendentes.empty: |
| return None |
| proximo = pendentes.sort_values('Id_chunk_data_gz').iloc[0]['Id_chunk_data_gz'] |
| logger.info(f"🎯 Próximo chunk: {proximo}") |
| return str(proximo) |
|
|
| def selecionar_proximo_chunkx(self) -> Optional[str]: |
| """Retorna o ID do próximo chunk pendente (ordem alfabética).""" |
| pendentes = self.df[self.df['status'] == 'pendente'] |
| if pendentes.empty: |
| return None |
| proximo = pendentes.sort_values('Id_chunk_data_gz').iloc[0]['Id_chunk_data_gz'] |
| logger.info(f"🎯 Próximo chunk: {proximo}") |
| return str(proximo) |
|
|
| def marcar_chunk_como_iniciado(self, chunk_id: str): |
| idx = self.df.index[self.df['Id_chunk_data_gz'] == chunk_id] |
| if not idx.empty: |
| self.df.loc[idx, 'dataini'] = datetime.now().isoformat() |
| self.df.loc[idx, 'status'] = 'em_processamento' |
| self.salvar_csv() |
|
|
| def marcar_chunk_como_finalizado(self, chunk_id: str, stats: dict): |
| idx = self.df.index[self.df['Id_chunk_data_gz'] == chunk_id] |
| if not idx.empty: |
| self.df.loc[idx, 'datafim'] = datetime.now().isoformat() |
| self.df.loc[idx, 'status'] = 'concluido' |
| self.df.loc[idx, 'total_ok'] = stats.get('total_ok', 0) |
| self.df.loc[idx, 'lista_id_err'] = json.dumps(stats.get('erros', [])) |
| self.salvar_csv() |
|
|
| def salvar_csv(self): |
| self.df.to_csv(self.state_csv_path, index=False) |
| logger.debug(f"💾 CSV salvo: {self.state_csv_path.name}") |
|
|
|
|
| |
| |
| |
| class QueueManager: |
| """Carrega tarefas a partir de um arquivo chunk_dados_*.tar.gz.""" |
|
|
| def __init__(self, repo_path: Path): |
| """ |
| Args: |
| repo_path: Diretório raiz do repositório clonado. |
| """ |
| self.chunks_dir = repo_path / "chunks_dados" |
|
|
| def carregar_tarefas_de_chunk(self, chunk_path: Path) -> List[Dict]: |
| """ |
| Extrai o JSONL dentro do tar.gz e retorna lista de tarefas. |
| Cada tarefa contém: id, chunk_id, dados_originais (registro completo). |
| Registros com menos de 7000 caracteres combinados (ementa+decisao+integra) |
| são filtrados aqui — mesmo critério do PromptBuilder — economizando |
| tokens e evitando que o worker receba tarefas que serão descartadas. |
| """ |
| if not chunk_path.exists(): |
| return [] |
|
|
| chunk_id = chunk_path.name.replace("chunk_dados_", "").split('.')[0] |
| tarefas = [] |
| skips = 0 |
|
|
| try: |
| with tarfile.open(chunk_path, 'r:gz') as tar: |
| jsonl_member = next( |
| (m for m in tar.getmembers() if m.name.endswith('.jsonl')), |
| None |
| ) |
| if not jsonl_member: |
| return [] |
| f = tar.extractfile(jsonl_member) |
| for line in f: |
| try: |
| reg = json.loads(line.decode('utf-8')) |
| id_reg = str(reg.get('Id', '')).strip() |
| if not id_reg: |
| continue |
|
|
| |
| ementa = reg.get('ementa', '') or '' |
| decisao = reg.get('decisao', '') or '' |
| integra = reg.get('integra_do_acordao', '') or '' |
| texto_total = f"{ementa} {decisao} {integra}".strip() |
|
|
| if len(texto_total) < 7000: |
| skips += 1 |
| continue |
|
|
| tarefas.append({ |
| 'id': id_reg, |
| 'chunk_id': chunk_id, |
| 'dados_originais': reg |
| }) |
| except (json.JSONDecodeError, UnicodeDecodeError): |
| continue |
| except Exception as e: |
| logger.error(f"Erro ao ler chunk {chunk_id}: {e}") |
| return [] |
|
|
| logger.info( |
| f"✅ Chunk {chunk_id}: {len(tarefas)} tarefas enfileiradas" |
| + (f" | {skips} ignoradas (texto < 7000 chars)" if skips else "") |
| ) |
| return tarefas |
|
|
|
|
| |
| |
| |
| class ResultCollector: |
| """ |
| Acumula resultados de processamento e gera chunks de saída |
| (chunk_llm_*.tar.gz) com jurisprudências enriquecidas e logs. |
| Thread‑safe. |
| """ |
|
|
| def __init__(self, work_dir: Path): |
| """ |
| Args: |
| work_dir: Diretório de trabalho (onde a pasta Silver será criada). |
| """ |
| self.output_dir = work_dir / "SILVER" |
| self.output_dir.mkdir(exist_ok=True, parents=True) |
| self.lock = Lock() |
| self.buffer_resultados = [] |
| self.buffer_logs = [] |
| |
| self.buffer_raw_md: Dict[str, str] = {} |
| self.stats = {"erros": [], "total_ok": 0} |
|
|
| def adicionar_resultado(self, tarefa: dict, resultado_worker: dict): |
| """ |
| Adiciona um resultado ao buffer. |
| - Se o resultado contém 'debug_log', armazena. |
| - Se status == 'sucesso', adiciona o resultado e incrementa total_ok. |
| - Se resultado_worker contém '_raw_markdown', armazena em buffer_raw_md |
| indexado por id_manifestacao (para ser empacotado em raw/<id>.md). |
| - Caso contrário, registra o ID na lista de erros. |
| """ |
| with self.lock: |
| if 'debug_log' in resultado_worker: |
| self.buffer_logs.append(resultado_worker['debug_log']) |
|
|
| |
| raw_md = resultado_worker.get('_raw_markdown') |
| if raw_md and isinstance(raw_md, str): |
| mid = resultado_worker.get('_id_manifestacao') |
| if mid: |
| self.buffer_raw_md[str(mid)] = raw_md |
|
|
| if resultado_worker.get('status') == 'sucesso': |
| self.buffer_resultados.append(resultado_worker.get('resultado', {})) |
| self.stats['total_ok'] += 1 |
| else: |
| self.stats['erros'].append(tarefa.get('id')) |
|
|
| def finalizar_chunk_parcial(self, chunk_num_str: str): |
| """ |
| Empacota os buffers atuais em um arquivo chunk_llm_*.tar.gz |
| e limpa os buffers. |
| |
| Estrutura do tar: |
| jurisprudencias.jsonl — registros processados pelo LLM |
| logs_execucao.jsonl — logs de debug (se houver) |
| raw/<id_manifestacao>.md — texto original de cada decisão (se disponível) |
| """ |
| with self.lock: |
| if not self.buffer_resultados and not self.buffer_logs: |
| return |
| nome = f"chunk_llm_{chunk_num_str}.tar.gz" |
| path = self.output_dir / nome |
| n_raw = len(self.buffer_raw_md) |
| logger.info( |
| f"📦 Gerando {nome} " |
| f"({len(self.buffer_resultados)} jurisprudências | {n_raw} raw/*.md)..." |
| ) |
| with tarfile.open(path, "w:gz") as tar: |
| |
| if self.buffer_resultados: |
| dados = "\n".join( |
| json.dumps(r, ensure_ascii=False) |
| for r in self.buffer_resultados |
| ) |
| dados_bytes = dados.encode('utf-8') |
| info = tarfile.TarInfo(name="jurisprudencias.jsonl") |
| info.size = len(dados_bytes) |
| tar.addfile(info, io.BytesIO(dados_bytes)) |
|
|
| |
| if self.buffer_logs: |
| logs = "\n".join( |
| json.dumps(l, ensure_ascii=False) |
| for l in self.buffer_logs |
| ) |
| logs_bytes = logs.encode('utf-8') |
| info = tarfile.TarInfo(name="logs_execucao.jsonl") |
| info.size = len(logs_bytes) |
| tar.addfile(info, io.BytesIO(logs_bytes)) |
|
|
| |
| for id_man, md_text in sorted(self.buffer_raw_md.items()): |
| md_bytes = md_text.encode('utf-8') |
| info = tarfile.TarInfo(name=f"raw/{id_man}.md") |
| info.size = len(md_bytes) |
| tar.addfile(info, io.BytesIO(md_bytes)) |
|
|
| self.buffer_resultados.clear() |
| self.buffer_logs.clear() |
| self.buffer_raw_md.clear() |
| logger.info(f"✅ {nome} criado.") |
|
|
| def get_chunk_stats_and_reset(self) -> dict: |
| """Retorna as estatísticas acumuladas do chunk atual e reseta.""" |
| with self.lock: |
| current = self.stats.copy() |
| self.stats = {"erros": [], "total_ok": 0} |
| return current |
|
|
| |