Spaces:
Sleeping
Sleeping
| """pipeline/extractor.py — Claude API extraction + 3-layer deduplication.""" | |
| import json, time, hashlib | |
| from typing import Optional | |
| import numpy as np | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| import anthropic | |
| from loguru import logger | |
| import utils.config as cfg | |
| class AIExtractor: | |
| MODEL = "claude-sonnet-4-20250514" | |
| def __init__(self): | |
| self.client = anthropic.Anthropic(api_key=cfg.ANTHROPIC_API_KEY) | |
| self.tokens_used = 0 | |
| def extract(self, chunk) -> dict: | |
| if chunk.word_count < 20: | |
| return {"strategies":[],"formulas":[],"systems":[]} | |
| prompt = cfg.EXTRACTION_PROMPT.format( | |
| source_file=chunk.source_file, page_start=chunk.page_start, | |
| page_end=chunk.page_end, text=chunk.text) | |
| raw = self._call(prompt) | |
| if not raw: return {"strategies":[],"formulas":[],"systems":[]} | |
| return self._parse(raw, chunk) | |
| def _call(self, prompt, retries=3): | |
| delay = 2.0 | |
| for attempt in range(retries): | |
| try: | |
| resp = self.client.messages.create( | |
| model=self.MODEL, max_tokens=4096, | |
| messages=[{"role":"user","content":prompt}]) | |
| self.tokens_used += resp.usage.input_tokens + resp.usage.output_tokens | |
| return resp.content[0].text if resp.content else "" | |
| except anthropic.RateLimitError: | |
| logger.warning(f"Rate limit — {delay}s") | |
| time.sleep(delay); delay *= 2 | |
| except Exception as e: | |
| logger.error(f"API: {e}") | |
| if attempt == retries-1: return "" | |
| time.sleep(delay); delay *= 2 | |
| return "" | |
| def _parse(self, raw, chunk): | |
| raw = raw.strip() | |
| if raw.startswith("```"): raw = "\n".join(raw.split("\n")[1:]).rstrip("`").strip() | |
| try: data = json.loads(raw) | |
| except: | |
| try: | |
| s=raw.find("{"); e=raw.rfind("}") | |
| data=json.loads(raw[s:e+1]) if s!=-1 else {} | |
| except: return {"strategies":[],"formulas":[],"systems":[]} | |
| result = {} | |
| for kind in ("strategies","formulas","systems"): | |
| result[kind] = [] | |
| for item in data.get(kind,[]): | |
| if isinstance(item,dict) and item.get("name"): | |
| item.update({"source_file":chunk.source_file, | |
| "source_pages":f"{chunk.page_start}-{chunk.page_end}"}) | |
| item["content_hash"] = _hash( | |
| item.get("description","") + item.get("plain_text","") + | |
| item.get("entry_system","") + item.get("name","")) | |
| result[kind].append(item) | |
| return result | |
| def compile_strategy_code(self, record: dict) -> str: | |
| """Ask Claude to generate Julia signal code for this strategy.""" | |
| compact = {k: record.get(k) for k in | |
| ("name","category","description","entry_rules","exit_rules", | |
| "filters","parameters","mathematical_basis")} | |
| prompt = cfg.COMPILER_PROMPT.format( | |
| strategy_json=json.dumps(compact, indent=2)) | |
| code = self._call(prompt) | |
| if not code: return "" | |
| if "```" in code: | |
| lines = code.split("\n") | |
| in_block = False; out = [] | |
| for line in lines: | |
| if line.strip().startswith("```"): in_block = not in_block; continue | |
| if in_block: out.append(line) | |
| code = "\n".join(out) | |
| return code.strip() | |
| class Deduplicator: | |
| def __init__(self, threshold=None): | |
| self.threshold = threshold or cfg.SIMILARITY_THRESHOLD | |
| self._vec = TfidfVectorizer(ngram_range=(1,2), max_features=5000, stop_words="english") | |
| def process(self, extracted, kb): | |
| stats = {k:{"added":0,"merged":0,"skipped":0} for k in ("strategies","formulas","systems")} | |
| for kind in ("strategies","formulas","systems"): | |
| for item in extracted.get(kind,[]): | |
| stats[kind][self._process_one(item, kb[kind], kind)] += 1 | |
| return stats | |
| def _process_one(self, item, store, kind): | |
| h = item.get("content_hash","") | |
| for e in store.values(): | |
| if e.get("content_hash") == h: | |
| self._add_src(item, e); return "skipped" | |
| sid = self._similar(item, store, kind) | |
| if sid: self._merge(item, store[sid]); return "merged" | |
| cid = _cid(item["name"], h, kind) | |
| item["canonical_id"] = cid | |
| item["sources"] = [item.get("source_file","")] | |
| item["layers"] = [] | |
| store[cid] = item | |
| return "added" | |
| def _similar(self, item, store, kind): | |
| if not store: return None | |
| texts = [_text(v,kind) for v in store.values()] + [_text(item,kind)] | |
| try: | |
| mat = self._vec.fit_transform(texts) | |
| sims = cosine_similarity(mat[-1], mat[:-1])[0] | |
| idx = int(np.argmax(sims)) | |
| if sims[idx] >= self.threshold: | |
| return list(store.keys())[idx] | |
| except: pass | |
| return None | |
| def _add_src(item, existing): | |
| s = item.get("source_file","") | |
| if s and s not in existing.get("sources",[]): | |
| existing.setdefault("sources",[]).append(s) | |
| def _merge(item, existing): | |
| Deduplicator._add_src(item, existing) | |
| layers = existing.setdefault("layers",[]) | |
| if item.get("content_hash") not in {l.get("content_hash") for l in layers}: | |
| layers.append({"source_file":item.get("source_file"), | |
| "content_hash":item.get("content_hash"), | |
| "data":{k:v for k,v in item.items() | |
| if k not in ("sources","layers","canonical_id")}}) | |
| def _hash(text): | |
| return hashlib.sha256(" ".join(text.lower().split()).encode()).hexdigest()[:16] | |
| def _cid(name, h, kind): | |
| return hashlib.md5(f"{kind}_{name}_{h}".encode()).hexdigest()[:12] | |
| def _text(item, kind): | |
| if kind=="strategies": | |
| return f"{item.get('name','')} {item.get('description','')} {' '.join(item.get('entry_rules',[]))}" | |
| if kind=="formulas": | |
| return f"{item.get('name','')} {item.get('plain_text','')} {item.get('purpose','')}" | |
| return f"{item.get('name','')} {item.get('entry_system','')} {item.get('exit_system','')}" | |