| from __future__ import annotations |
|
|
| import os |
| import gc |
| import re |
| import json |
| from dataclasses import dataclass |
| from typing import Dict, List, Optional, Any |
|
|
| class MinerBackend: |
| def load(self) -> None: raise NotImplementedError |
| def reject(self) -> None: raise NotImplementedError |
| def mine(self, prompt: str) -> Dict[str, Any]: raise NotImplementedError |
|
|
| class NullMinerBackend(MinerBackend): |
| def load(self) -> None: pass |
| def reject(self) -> None: pass |
| def mine(self, p): return {"STRUCTURE": {"rules": []}, "DEFINITIONS": []} |
|
|
| class TransformersMinerBackend(MinerBackend): |
| def __init__(self, model_id: str, max_new_tokens: int = 512, temperature: float = 0.1): |
| self.model_id = model_id |
| self.max_new_tokens = max_new_tokens |
| self.temperature = temperature |
| self._loaded, self._model, self._tokenizer, self._device = False, None, None, "cpu" |
|
|
| def load(self) -> None: |
| if self._loaded: return |
| try: |
| import torch |
| from transformers import AutoTokenizer, AutoModelForCausalLM |
| except ImportError: return |
|
|
| if torch.backends.mps.is_available(): |
| self._device = "mps" |
| dtype = torch.float16 |
| elif torch.cuda.is_available(): |
| self._device = "cuda" |
| dtype = torch.bfloat16 |
| else: |
| self._device = "cpu" |
| dtype = torch.float32 |
|
|
| print(f"🚀 [AXIS LATHE] Loading {self.model_id} on {self._device}...") |
| self._tokenizer = AutoTokenizer.from_pretrained(self.model_id) |
| if self._tokenizer.pad_token is None: |
| self._tokenizer.pad_token = self._tokenizer.eos_token |
| |
| self._model = AutoModelForCausalLM.from_pretrained( |
| self.model_id, |
| torch_dtype=dtype, |
| low_cpu_mem_usage=True |
| ).to(self._device) |
| self._model.eval() |
| self._loaded = True |
|
|
| def reject(self) -> None: |
| if not self._loaded: return |
| print("🗑️ [AXIS LATHE] Purging Memory...") |
| import torch |
| del self._model, self._tokenizer |
| gc.collect() |
| if torch.backends.mps.is_available(): |
| try: from torch import mps; mps.empty_cache() |
| except: pass |
| elif torch.cuda.is_available(): torch.cuda.empty_cache() |
| self._loaded = False |
|
|
| def mine(self, prompt: str) -> Dict[str, Any]: |
| if not self._loaded: self.load() |
| import torch |
| inputs = self._tokenizer(prompt, return_tensors="pt", padding=True).to(self._device) |
| |
| with torch.no_grad(): |
| outputs = self._model.generate( |
| **inputs, |
| max_new_tokens=self.max_new_tokens, |
| temperature=self.temperature, |
| do_sample=(self.temperature > 0), |
| pad_token_id=self._tokenizer.pad_token_id, |
| eos_token_id=self._tokenizer.eos_token_id |
| ) |
| |
| input_len = inputs['input_ids'].shape[1] |
| text = self._tokenizer.decode(outputs[0][input_len:], skip_special_tokens=True) |
| return self._parse_output(text) |
|
|
| def _parse_output(self, text: str) -> Dict[str, Any]: |
| """V13.3: Robust Parser with Repair Logic.""" |
| raw_text = text.strip() |
| result = {"RAW": raw_text, "STRUCTURE": {"rules": []}, "DEFINITIONS": []} |
| |
| |
| json_match = re.search(r"(\{.*\}|\[.*\])", raw_text, re.DOTALL) |
| if json_match: |
| try: |
| obj = json.loads(json_match.group(0)) |
| |
| if isinstance(obj, list): obj = {"rules": obj} |
| |
| |
| raw_rules = obj.get("rules") or obj.get("ruleset") or [] |
| if isinstance(raw_rules, list): |
| for r in raw_rules: |
| if isinstance(r, dict) and r.get("then"): |
| r.setdefault("if", "always") |
| r.setdefault("requires", [r["if"]]) |
| r.setdefault("domain", "general") |
| result["STRUCTURE"]["rules"].append(r) |
| |
| |
| raw_defs = obj.get("definitions") or obj.get("defs") or [] |
| if isinstance(raw_defs, list): |
| for d in raw_defs: |
| if isinstance(d, dict) and (d.get("term") and d.get("definition")): |
| result["DEFINITIONS"].append(d) |
| except: pass |
|
|
| |
| if not result["STRUCTURE"]["rules"] and not result["DEFINITIONS"]: |
| |
| for line in raw_text.splitlines(): |
| if ":" in line: |
| parts = line.split(":", 1) |
| result["DEFINITIONS"].append({"term": parts[0].strip(), "definition": parts[1].strip()}) |
|
|
| return result |
|
|
| @dataclass |
| class OnDemandMiner: |
| backend: MinerBackend |
|
|
| def mine_formal_rules(self, entities: List[str], domain: str) -> Dict[str, Any]: |
| prompt = ( |
| "【AXIS_MINING_COMMAND】\n" |
| f"TARGET: {', '.join(entities)}\n" |
| f"DOMAIN: {domain}\n\n" |
| "TASK: Output formal IF-THEN rules in JSON.\n" |
| "SCHEMA: { 'rules': [{ 'if': 'condition', 'then': 'valid_formula', 'evidence': 2 }] }\n" |
| "RULES: JSON ONLY. NO PROSE. NO SOLUTIONS." |
| ) |
| return self.backend.mine(prompt) |
|
|
| def mine_knowledge(self, entities: List[str]) -> Dict[str, Any]: |
| prompt = ( |
| "【AXIS_KNOWLEDGE_MINING】\n" |
| f"ENTITIES: {', '.join(entities)}\n\n" |
| "TASK: Output definitions in JSON.\n" |
| "SCHEMA: { 'definitions': [{ 'term': '...', 'definition': '...' }] }\n" |
| "RULES: JSON ONLY. NO PROSE." |
| ) |
| return self.backend.mine(prompt) |
|
|
| def mine_specification(self, symbols: List[str], block_name: str) -> Dict[str, Any]: |
| prompt = f"【AXIS_SPEC】\nTopic: {block_name}\nSymbols: {','.join(symbols)}\nTask: Output JSON with 'ops' list." |
| return self.backend.mine(prompt) |
|
|
| def mine_schema_fill(self, entity: str, schema_def: Dict[str, str]) -> Dict[str, Any]: |
| """ |
| Mine factual data to fill a specific schema for an entity. |
| """ |
| import json |
| import re |
| |
| schema_keys = list(schema_def.keys()) |
| prompt = f""" |
| SYSTEM INSTRUCTION: |
| You are a database filler. Output STRICT JSON only. |
| No explanations. No markdown. |
| |
| TASK: |
| Fill the following schema for the entity: "{entity}". |
| If information is missing, use null. |
| |
| SCHEMA KEYS: |
| {json.dumps(schema_keys)} |
| |
| OUTPUT FORMAT: |
| {{ |
| "{schema_keys[0]}": "...", |
| ... |
| }} |
| |
| JSON ONLY: |
| """ |
| try: |
| |
| |
| raw_text = self.backend.generate(prompt) |
| |
| |
| match = re.search(r"\{.*\}", raw_text, re.DOTALL) |
| if match: |
| return json.loads(match.group()) |
| return {} |
| except Exception as e: |
| print(f"Mining Schema Fill Failed: {e}") |
| return {} |
|
|
| def reject(self) -> None: self.backend.reject() |