cyberkyne's picture
Upload 22 files
094a5f6 verified
"""pipeline/extractor.py — Claude API extraction + 3-layer deduplication."""
import json, time, hashlib
from typing import Optional
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import anthropic
from loguru import logger
import utils.config as cfg
class AIExtractor:
MODEL = "claude-sonnet-4-20250514"
def __init__(self):
self.client = anthropic.Anthropic(api_key=cfg.ANTHROPIC_API_KEY)
self.tokens_used = 0
def extract(self, chunk) -> dict:
if chunk.word_count < 20:
return {"strategies":[],"formulas":[],"systems":[]}
prompt = cfg.EXTRACTION_PROMPT.format(
source_file=chunk.source_file, page_start=chunk.page_start,
page_end=chunk.page_end, text=chunk.text)
raw = self._call(prompt)
if not raw: return {"strategies":[],"formulas":[],"systems":[]}
return self._parse(raw, chunk)
def _call(self, prompt, retries=3):
delay = 2.0
for attempt in range(retries):
try:
resp = self.client.messages.create(
model=self.MODEL, max_tokens=4096,
messages=[{"role":"user","content":prompt}])
self.tokens_used += resp.usage.input_tokens + resp.usage.output_tokens
return resp.content[0].text if resp.content else ""
except anthropic.RateLimitError:
logger.warning(f"Rate limit — {delay}s")
time.sleep(delay); delay *= 2
except Exception as e:
logger.error(f"API: {e}")
if attempt == retries-1: return ""
time.sleep(delay); delay *= 2
return ""
def _parse(self, raw, chunk):
raw = raw.strip()
if raw.startswith("```"): raw = "\n".join(raw.split("\n")[1:]).rstrip("`").strip()
try: data = json.loads(raw)
except:
try:
s=raw.find("{"); e=raw.rfind("}")
data=json.loads(raw[s:e+1]) if s!=-1 else {}
except: return {"strategies":[],"formulas":[],"systems":[]}
result = {}
for kind in ("strategies","formulas","systems"):
result[kind] = []
for item in data.get(kind,[]):
if isinstance(item,dict) and item.get("name"):
item.update({"source_file":chunk.source_file,
"source_pages":f"{chunk.page_start}-{chunk.page_end}"})
item["content_hash"] = _hash(
item.get("description","") + item.get("plain_text","") +
item.get("entry_system","") + item.get("name",""))
result[kind].append(item)
return result
def compile_strategy_code(self, record: dict) -> str:
"""Ask Claude to generate Julia signal code for this strategy."""
compact = {k: record.get(k) for k in
("name","category","description","entry_rules","exit_rules",
"filters","parameters","mathematical_basis")}
prompt = cfg.COMPILER_PROMPT.format(
strategy_json=json.dumps(compact, indent=2))
code = self._call(prompt)
if not code: return ""
if "```" in code:
lines = code.split("\n")
in_block = False; out = []
for line in lines:
if line.strip().startswith("```"): in_block = not in_block; continue
if in_block: out.append(line)
code = "\n".join(out)
return code.strip()
class Deduplicator:
def __init__(self, threshold=None):
self.threshold = threshold or cfg.SIMILARITY_THRESHOLD
self._vec = TfidfVectorizer(ngram_range=(1,2), max_features=5000, stop_words="english")
def process(self, extracted, kb):
stats = {k:{"added":0,"merged":0,"skipped":0} for k in ("strategies","formulas","systems")}
for kind in ("strategies","formulas","systems"):
for item in extracted.get(kind,[]):
stats[kind][self._process_one(item, kb[kind], kind)] += 1
return stats
def _process_one(self, item, store, kind):
h = item.get("content_hash","")
for e in store.values():
if e.get("content_hash") == h:
self._add_src(item, e); return "skipped"
sid = self._similar(item, store, kind)
if sid: self._merge(item, store[sid]); return "merged"
cid = _cid(item["name"], h, kind)
item["canonical_id"] = cid
item["sources"] = [item.get("source_file","")]
item["layers"] = []
store[cid] = item
return "added"
def _similar(self, item, store, kind):
if not store: return None
texts = [_text(v,kind) for v in store.values()] + [_text(item,kind)]
try:
mat = self._vec.fit_transform(texts)
sims = cosine_similarity(mat[-1], mat[:-1])[0]
idx = int(np.argmax(sims))
if sims[idx] >= self.threshold:
return list(store.keys())[idx]
except: pass
return None
@staticmethod
def _add_src(item, existing):
s = item.get("source_file","")
if s and s not in existing.get("sources",[]):
existing.setdefault("sources",[]).append(s)
@staticmethod
def _merge(item, existing):
Deduplicator._add_src(item, existing)
layers = existing.setdefault("layers",[])
if item.get("content_hash") not in {l.get("content_hash") for l in layers}:
layers.append({"source_file":item.get("source_file"),
"content_hash":item.get("content_hash"),
"data":{k:v for k,v in item.items()
if k not in ("sources","layers","canonical_id")}})
def _hash(text):
return hashlib.sha256(" ".join(text.lower().split()).encode()).hexdigest()[:16]
def _cid(name, h, kind):
return hashlib.md5(f"{kind}_{name}_{h}".encode()).hexdigest()[:12]
def _text(item, kind):
if kind=="strategies":
return f"{item.get('name','')} {item.get('description','')} {' '.join(item.get('entry_rules',[]))}"
if kind=="formulas":
return f"{item.get('name','')} {item.get('plain_text','')} {item.get('purpose','')}"
return f"{item.get('name','')} {item.get('entry_system','')} {item.get('exit_system','')}"