Buckets:
bbkdevops/unicosys-hypergraph-bucket / tinymind-native-8b-remote-handoff /bundle /data /internet_ingestor.py
| """Internet evidence ingestor for TinyMind Open Pure updates.""" | |
| from __future__ import annotations | |
| from collections import Counter | |
| from dataclasses import dataclass | |
| from datetime import datetime, timezone | |
| from html.parser import HTMLParser | |
| import hashlib | |
| import json | |
| from pathlib import Path | |
| import re | |
| from typing import Iterable | |
| import httpx | |
| SCHEMA_VERSION = "tinymind-internet-evidence-v1" | |
| JUNK_MARKERS = ("todo", "fixme", "???", "lorem ipsum", "click here", "subscribe now") | |
| class _TextExtractor(HTMLParser): | |
| def __init__(self) -> None: | |
| super().__init__() | |
| self._skip = 0 | |
| self.parts: list[str] = [] | |
| def handle_starttag(self, tag: str, attrs): # noqa: ANN001 | |
| if tag.lower() in {"script", "style", "noscript", "svg"}: | |
| self._skip += 1 | |
| def handle_endtag(self, tag: str): | |
| if tag.lower() in {"script", "style", "noscript", "svg"} and self._skip: | |
| self._skip -= 1 | |
| def handle_data(self, data: str): | |
| if not self._skip: | |
| text = re.sub(r"\s+", " ", data).strip() | |
| if text: | |
| self.parts.append(text) | |
| def text(self) -> str: | |
| return re.sub(r"\s+", " ", " ".join(self.parts)).strip() | |
| def extract_text(html_or_text: str) -> str: | |
| parser = _TextExtractor() | |
| parser.feed(html_or_text) | |
| text = parser.text() | |
| return text if len(text) >= len(html_or_text) * 0.1 else re.sub(r"\s+", " ", html_or_text).strip() | |
| def _sha256(text: str) -> str: | |
| return hashlib.sha256(text.encode("utf-8")).hexdigest() | |
| def _junk_score(text: str) -> float: | |
| lower = text.lower() | |
| score = 0.0 | |
| if len(text) < 120: | |
| score += 0.4 | |
| if any(marker in lower for marker in JUNK_MARKERS): | |
| score += 0.5 | |
| unique = len(set(re.findall(r"[\wก-๙]+", lower, flags=re.UNICODE))) | |
| total = len(re.findall(r"[\wก-๙]+", lower, flags=re.UNICODE)) | |
| if total and unique / total < 0.25: | |
| score += 0.3 | |
| return min(score, 1.0) | |
| class InternetRecord: | |
| domain: str | |
| lang: str | |
| question: str | |
| answer: str | |
| claim: str | |
| evidence: str | |
| verification: str | |
| source_url: str | |
| fetched_at: str | |
| content_sha256: str | |
| source: str = "internet_evidence_ingestor" | |
| license: str = "source-url-required" | |
| quality_score: float = 0.97 | |
| rarity_score: float = 0.85 | |
| junk_score: float = 0.0 | |
| openness_label: str = "open_pure_knowledge" | |
| class InternetEvidenceIngestor: | |
| """Fetch URL evidence and emit Open Pure CEV JSONL rows.""" | |
| def __init__(self, client: httpx.Client | None = None, timeout_s: float = 20.0): | |
| self.client = client or httpx.Client(timeout=timeout_s, follow_redirects=True) | |
| def fetch(self, url: str) -> tuple[str, str]: | |
| response = self.client.get(url, headers={"User-Agent": "TinyMind-OpenPure-Ingestor/1.0"}) | |
| response.raise_for_status() | |
| text = extract_text(response.text) | |
| return text, _sha256(response.text) | |
| def _record(self, url: str, text: str, raw_hash: str, domain: str) -> InternetRecord: | |
| excerpt = text[:1200].strip() | |
| fetched_at = datetime.now(timezone.utc).isoformat() | |
| claim = "Fetched source contains updateable evidence for TinyMind knowledge extraction." | |
| evidence = f"url={url}; raw_sha256={raw_hash}; text_sha256={_sha256(text)}" | |
| verification = "Re-fetch URL, recompute raw/text SHA-256, compare with manifest, then review CEV fields." | |
| lang = "th" if sum(1 for ch in text if "\u0e00" <= ch <= "\u0e7f") > 20 else "en" | |
| return InternetRecord( | |
| domain=domain, | |
| lang=lang, | |
| question=f"What verified knowledge was extracted from {url}?", | |
| answer=excerpt, | |
| claim=claim, | |
| evidence=evidence, | |
| verification=verification, | |
| source_url=url, | |
| fetched_at=fetched_at, | |
| content_sha256=_sha256(text), | |
| junk_score=_junk_score(text), | |
| ) | |
| def ingest_urls(self, urls: Iterable[str], out_dir: str | Path, domain: str = "internet_update") -> dict: | |
| out = Path(out_dir) | |
| out.mkdir(parents=True, exist_ok=True) | |
| records: list[InternetRecord] = [] | |
| blocked: list[dict] = [] | |
| for url in urls: | |
| try: | |
| text, raw_hash = self.fetch(url) | |
| record = self._record(url, text, raw_hash, domain) | |
| if record.junk_score <= 0.30: | |
| records.append(record) | |
| else: | |
| blocked.append({"url": url, "reason": "junk_score", "junk_score": record.junk_score}) | |
| except Exception as exc: | |
| blocked.append({"url": url, "reason": type(exc).__name__, "error": str(exc)}) | |
| jsonl_path = out / "internet_evidence.jsonl" | |
| with jsonl_path.open("w", encoding="utf-8", newline="\n") as f: | |
| for record in records: | |
| row = record.__dict__.copy() | |
| row["id"] = _sha256(json.dumps(row, ensure_ascii=False, sort_keys=True))[:24] | |
| row["schema_version"] = SCHEMA_VERSION | |
| f.write(json.dumps(row, ensure_ascii=False, sort_keys=True) + "\n") | |
| manifest = { | |
| "schema_version": SCHEMA_VERSION, | |
| "jsonl_path": str(jsonl_path), | |
| "records_written": len(records), | |
| "blocked_records": len(blocked), | |
| "blocked": blocked, | |
| "domain_counts": dict(Counter(record.domain for record in records)), | |
| "source_urls": list(urls), | |
| "sha256": _sha256(jsonl_path.read_text(encoding="utf-8")), | |
| "purity_policy": [ | |
| "junk_only_filtering", | |
| "source_url_required", | |
| "content_hash_required", | |
| "claim_evidence_verification_required", | |
| "timestamped_fetch", | |
| ], | |
| } | |
| manifest_path = out / "internet_evidence_manifest.json" | |
| manifest["manifest_path"] = str(manifest_path) | |
| manifest_path.write_text(json.dumps(manifest, ensure_ascii=False, indent=2, sort_keys=True), encoding="utf-8") | |
| return manifest | |
Xet Storage Details
- Size:
- 6.15 kB
- Xet hash:
- 561e89e7e7db182f6e7c1d00d0d9646e63cb544b5f50e573d7220687a5f00867
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.