""" Layer 2 — Anotação multi-annotator com famílias divergentes (Patch B). Para cada par (q_bad, q_good) da Layer 1, três anotadores independentes de arquiteturas distintas avaliam se a reformulação é genuinamente melhor. Anotadores suportados: - Claude Haiku (Anthropic) — ANTHROPIC_API_KEY [obrigatório] - Llama 3.3 70B (Meta/Groq) — GROQ_API_KEY [recomendado] - GPT-4o-mini (OpenAI) — OPENAI_API_KEY [opcional] Saída: data/pairs/pairs_layer2.jsonl Mantém apenas pares onde todos os anotadores ativos concordam na direção (q_good > q_bad) E variância de magnitude < VAR_THRESHOLD. Pares com divergência ficam em pairs_layer2_divergent.jsonl para auditoria. """ from __future__ import annotations import json import os import re import statistics import time from dataclasses import dataclass from dataclasses import field from pathlib import Path from typing import Protocol from dotenv import load_dotenv from tqdm import tqdm load_dotenv(override=True) # Pares com variância de magnitude acima deste threshold vão para divergentes VAR_THRESHOLD = 0.04 # stdev < 0.2 entre anotadores MIN_ANNOTATORS = 2 # mínimo para considerar um par validado # --------------------------------------------------------------------------- # Estruturas de dados # --------------------------------------------------------------------------- @dataclass class Annotation: model: str direction: bool # True = q_good é genuinamente melhor magnitude: float # 0-1: quanto melhor reasoning: str error: str = "" @dataclass class AnnotatedPair: # Campos originais da Layer 1 q_bad: str q_good: str context: str domain: str confidence: float source: str source_id: str year: int # Anotações da Layer 2 annotations: list[Annotation] = field(default_factory=list) @property def valid_annotations(self) -> list[Annotation]: return [a for a in self.annotations if not a.error] @property def agreement_direction(self) -> bool: va = self.valid_annotations return len(va) >= MIN_ANNOTATORS and all(a.direction for a in va) @property def magnitude_variance(self) -> float: va = self.valid_annotations mags = [a.magnitude for a in va] if len(mags) < 2: return 0.0 return statistics.variance(mags) @property def magnitude_mean(self) -> float: va = self.valid_annotations if not va: return 0.0 return statistics.mean(a.magnitude for a in va) @property def passes_layer2(self) -> bool: return ( self.agreement_direction and self.magnitude_variance <= VAR_THRESHOLD and len(self.valid_annotations) >= MIN_ANNOTATORS ) def to_dict(self) -> dict: return { "q_bad": self.q_bad, "q_good": self.q_good, "context": self.context, "domain": self.domain, "confidence": self.confidence, "source": self.source, "source_id": self.source_id, "year": self.year, "annotations": [ { "model": a.model, "direction": a.direction, "magnitude": a.magnitude, "reasoning": a.reasoning, **({"error": a.error} if a.error else {}), } for a in self.annotations ], "n_annotators": len(self.valid_annotations), "agreement_direction": self.agreement_direction, "magnitude_mean": round(self.magnitude_mean, 4), "magnitude_variance": round(self.magnitude_variance, 4), "passes_layer2": self.passes_layer2, } # --------------------------------------------------------------------------- # Prompt compartilhado # --------------------------------------------------------------------------- _SYSTEM = """\ You are an expert in philosophy of science and research methodology. Your task: evaluate whether a question reformulation is genuinely epistemically productive. A reformulation is genuinely better (not just superficially different) when: 1. The new question has clearer methodology — there exist known tools, experiments, or formal frameworks to make progress on it. 2. Answering the new question makes meaningful progress toward answering the original question (inferential connection). 3. The new question is not merely a paraphrase, synonym substitution, or vocabulary upgrade of the original. Respond ONLY with valid JSON. No markdown, no explanation outside the JSON. { "direction": , "magnitude": , "reasoning": "" } """ _USER_TEMPLATE = """\ Context (from source text): {context} Original question (q_bad): {q_bad} Reformulated question (q_good): {q_good} Domain: {domain} """ def _parse_annotation(raw: str, model: str) -> Annotation: raw = re.sub(r"```[a-z]*\n?", "", raw).strip() try: data = json.loads(raw) return Annotation( model=model, direction=bool(data["direction"]), magnitude=float(max(0.0, min(1.0, data["magnitude"]))), reasoning=str(data.get("reasoning", "")), ) except Exception as e: return Annotation(model=model, direction=False, magnitude=0.0, reasoning="", error=str(e)) # --------------------------------------------------------------------------- # Anotadores # --------------------------------------------------------------------------- class Annotator(Protocol): name: str def annotate(self, pair: dict) -> Annotation: ... class ClaudeAnnotator: name = "claude-haiku-4-5-20251001" def __init__(self) -> None: import anthropic self._client = anthropic.Anthropic(api_key=os.environ["ANTHROPIC_API_KEY"]) def annotate(self, pair: dict) -> Annotation: prompt = _USER_TEMPLATE.format(**pair) try: msg = self._client.messages.create( model=self.name, max_tokens=256, system=_SYSTEM, messages=[{"role": "user", "content": prompt}], ) return _parse_annotation(msg.content[0].text, self.name) except Exception as e: return Annotation( model=self.name, direction=False, magnitude=0.0, reasoning="", error=str(e) ) class GroqAnnotator: name = "llama-3.3-70b-versatile" def __init__(self) -> None: from groq import Groq self._client = Groq(api_key=os.environ["GROQ_API_KEY"]) def annotate(self, pair: dict) -> Annotation: prompt = _USER_TEMPLATE.format(**pair) try: resp = self._client.chat.completions.create( model=self.name, max_tokens=256, messages=[ {"role": "system", "content": _SYSTEM}, {"role": "user", "content": prompt}, ], temperature=0.1, ) return _parse_annotation(resp.choices[0].message.content, self.name) except Exception as e: return Annotation( model=self.name, direction=False, magnitude=0.0, reasoning="", error=str(e) ) class OpenAIAnnotator: name = "gpt-4o-mini" def __init__(self) -> None: from openai import OpenAI self._client = OpenAI(api_key=os.environ["OPENAI_API_KEY"]) def annotate(self, pair: dict) -> Annotation: prompt = _USER_TEMPLATE.format(**pair) try: resp = self._client.chat.completions.create( model=self.name, max_tokens=256, messages=[ {"role": "system", "content": _SYSTEM}, {"role": "user", "content": prompt}, ], temperature=0.1, response_format={"type": "json_object"}, ) return _parse_annotation(resp.choices[0].message.content, self.name) except Exception as e: return Annotation( model=self.name, direction=False, magnitude=0.0, reasoning="", error=str(e) ) class GeminiAnnotator: name = "gemini-2.0-flash" def __init__(self) -> None: from google import genai self._client = genai.Client(api_key=os.environ["GEMINI_API_KEY"]) def annotate(self, pair: dict) -> Annotation: from google.genai import types prompt = _USER_TEMPLATE.format(**pair) for attempt in range(4): try: resp = self._client.models.generate_content( model=self.name, contents=prompt, config=types.GenerateContentConfig( system_instruction=_SYSTEM, temperature=0.1, max_output_tokens=256, ), ) return _parse_annotation(resp.text, self.name) except Exception as e: if "429" in str(e) and attempt < 3: wait = 15 * (attempt + 1) time.sleep(wait) continue return Annotation( model=self.name, direction=False, magnitude=0.0, reasoning="", error=str(e) ) return Annotation( model=self.name, direction=False, magnitude=0.0, reasoning="", error="max retries exceeded", ) def _build_annotators() -> list: annotators = [] if os.getenv("ANTHROPIC_API_KEY"): annotators.append(ClaudeAnnotator()) print(" [OK] Claude Haiku (Anthropic)") else: print(" [--] Claude — ANTHROPIC_API_KEY nao definida") if os.getenv("GROQ_API_KEY"): annotators.append(GroqAnnotator()) print(" [OK] Llama 3.3 70B (Groq)") else: print(" [--] Llama/Groq — GROQ_API_KEY nao definida (recomendado: console.groq.com)") if os.getenv("GEMINI_API_KEY"): annotators.append(GeminiAnnotator()) print(" [OK] Gemini 1.5 Flash (Google)") else: print(" [--] Gemini — GEMINI_API_KEY nao definida (gratuito: aistudio.google.com)") if os.getenv("OPENAI_API_KEY"): annotators.append(OpenAIAnnotator()) print(" [OK] GPT-4o-mini (OpenAI)") else: print(" [--] GPT — OPENAI_API_KEY nao definida (opcional)") return annotators # --------------------------------------------------------------------------- # Pipeline principal # --------------------------------------------------------------------------- def annotate_pairs( input_path: Path, output_path: Path, divergent_path: Path, delay: float = 0.3, ) -> tuple[list[dict], list[dict]]: pairs_raw = [ json.loads(l) for l in input_path.read_text(encoding="utf-8").splitlines() if l.strip() ] print("\n=== Layer 2 — Multi-annotator ===") print(f"Pares de entrada: {len(pairs_raw)}") print("\nAnotadores ativos:") annotators = _build_annotators() if len(annotators) < MIN_ANNOTATORS: raise RuntimeError( f"Minimo de {MIN_ANNOTATORS} anotadores necessarios. " f"Configure GROQ_API_KEY (gratuito em console.groq.com)." ) # Retoma de onde parou done_ids: set[str] = set() agreed: list[dict] = [] divergent: list[dict] = [] for path, bucket in [(output_path, agreed), (divergent_path, divergent)]: if path.exists(): for line in path.read_text(encoding="utf-8").splitlines(): if line.strip(): rec = json.loads(line) done_ids.add(rec.get("source_id", "")) bucket.append(rec) if done_ids: print( f"\nRetomando: {len(done_ids)} pares ja anotados " f"({len(agreed)} concordantes, {len(divergent)} divergentes)" ) pending = [p for p in pairs_raw if p.get("source_id", "") not in done_ids] print(f"Pares pendentes: {len(pending)}\n") out_f = output_path.open("a", encoding="utf-8") div_f = divergent_path.open("a", encoding="utf-8") try: for raw in tqdm(pending, desc="Anotando pares"): ap = AnnotatedPair( q_bad=raw["q_bad"], q_good=raw["q_good"], context=raw.get("context", ""), domain=raw.get("domain", ""), confidence=raw.get("confidence", 0.0), source=raw.get("source", ""), source_id=raw.get("source_id", ""), year=raw.get("year", 0), ) for ann in annotators: ap.annotations.append(ann.annotate(raw)) time.sleep(delay) record = ap.to_dict() if ap.passes_layer2: out_f.write(json.dumps(record, ensure_ascii=False) + "\n") out_f.flush() agreed.append(record) else: div_f.write(json.dumps(record, ensure_ascii=False) + "\n") div_f.flush() divergent.append(record) finally: out_f.close() div_f.close() return agreed, divergent def print_summary(agreed: list[dict], divergent: list[dict]) -> None: total = len(agreed) + len(divergent) print(f"\n{'='*55}") print(f" Total anotados: {total}") print(f" Concordantes: {len(agreed)} ({100*len(agreed)//max(total,1)}%)") print(f" Divergentes: {len(divergent)} ({100*len(divergent)//max(total,1)}%)") if agreed: mags = [p["magnitude_mean"] for p in agreed] print(f" Magnitude media: {statistics.mean(mags):.3f}") from collections import Counter sources = Counter(p["source"] for p in agreed) print("\n Por fonte (concordantes):") for s, n in sources.most_common(): print(f" {s}: {n}") # Motivos de divergência if divergent: no_dir = sum(1 for p in divergent if not p["agreement_direction"]) hi_var = sum( 1 for p in divergent if p["agreement_direction"] and p["magnitude_variance"] > VAR_THRESHOLD ) print("\n Motivo divergencia:") print(f" Direcao discordante: {no_dir}") print(f" Variancia alta: {hi_var}") print(f"{'='*55}") if __name__ == "__main__": output_path = Path("data/pairs/pairs_layer2.jsonl") divergent_path = Path("data/pairs/pairs_layer2_divergent.jsonl") output_path.parent.mkdir(parents=True, exist_ok=True) # Com Gemini (15 RPM free tier): delay de 5s entre pares garante < 12 RPM has_gemini = bool(os.getenv("GEMINI_API_KEY")) agreed, divergent = annotate_pairs( input_path=Path("data/pairs/pairs_layer1.jsonl"), output_path=output_path, divergent_path=divergent_path, delay=5.0 if has_gemini else 0.3, ) print_summary(agreed, divergent) print("\n=== Amostra concordantes ===") for p in agreed[:3]: models = [a["model"].split("-")[0] for a in p["annotations"]] mags = [a["magnitude"] for a in p["annotations"]] print(f"\n [{p['source']}] [{p['domain'][:40]}]") print(f" BAD: {p['q_bad'][:80]}") print(f" GOOD: {p['q_good'][:80]}") print(f" Anotadores: {list(zip(models, [round(m,2) for m in mags]))}")