bbkdevops's picture
download
raw
11 kB
"""Multi-source research mesh for current TinyMind knowledge.
The mesh expands beyond a single web-search loop while staying auditable:
public web search, direct URLs/RSS-like pages, GitHub public metadata, and
Hugging Face public metadata. Social platforms are represented by policy gates
and must use official APIs or user-provided exports; the mesh does not bypass
terms, auth walls, or private content.
"""
from __future__ import annotations
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
import hashlib
import json
from pathlib import Path
import re
from typing import Any
from urllib.parse import quote_plus
import httpx
from data.external_research import ExternalResearcher, _TextExtractor, _terms
def _sha256(text: str) -> str:
return hashlib.sha256(text.encode("utf-8", errors="replace")).hexdigest()
@dataclass(frozen=True)
class PlatformPolicy:
platform: str
connector: str
default_status: str
rule: str
DEFAULT_PLATFORM_POLICIES = [
PlatformPolicy("public_web", "duckduckgo_html", "enabled", "Fetch public pages, hash/cache evidence, and cite sources."),
PlatformPolicy("direct_url", "http_fetch", "enabled", "Fetch user-approved public URLs and local/public docs."),
PlatformPolicy("github", "official_public_api", "enabled", "Use GitHub public API/raw public files; respect rate limits."),
PlatformPolicy("huggingface", "official_public_api", "enabled", "Use Hugging Face public API/dataset metadata; use token only from env/cache when needed."),
PlatformPolicy("rss_sitemap", "http_fetch", "enabled", "Use public RSS, Atom, sitemap, and documentation pages when supplied or discovered."),
PlatformPolicy("reddit", "official_or_public_json", "disabled_by_default", "Enable only for public posts/API-compliant access; no private or auth-wall scraping."),
PlatformPolicy("youtube", "official_api_or_rss", "disabled_by_default", "Use official API/RSS metadata; no video bypass/downloading by default."),
PlatformPolicy("x_twitter", "official_api_required", "blocked_without_credentials", "Requires official API/user authorization."),
PlatformPolicy("facebook", "official_api_required", "blocked_without_credentials", "Requires official API/user authorization."),
PlatformPolicy("instagram", "official_api_required", "blocked_without_credentials", "Requires official API/user authorization."),
PlatformPolicy("tiktok", "official_api_required", "blocked_without_credentials", "Requires official API/user authorization."),
PlatformPolicy("linkedin", "official_api_required", "blocked_without_credentials", "Requires official API/user authorization."),
]
def _text_from_response(resp: httpx.Response) -> str:
content_type = resp.headers.get("content-type", "")
if "text/html" in content_type or "<html" in resp.text[:500].lower():
parser = _TextExtractor()
parser.feed(resp.text)
return parser.text
return re.sub(r"\s+", " ", resp.text).strip()
def _score_source(query: str, text: str) -> tuple[list[str], float]:
q_terms = _terms(query)
matched = sorted(q_terms & _terms(text))
return matched, len(matched) / max(len(q_terms), 1)
def _source(url: str, title: str, text: str, query: str, connector: str) -> dict[str, Any] | None:
text = re.sub(r"\s+", " ", text).strip()[:16000]
if not text:
return None
matched, score = _score_source(query, text)
if score <= 0:
return None
return {
"url": url,
"title": title or text[:90],
"text": text,
"sha256": _sha256(text),
"matched_terms": matched,
"score": score,
"connector": connector,
}
class MultiSourceResearcher:
"""ExternalResearcher-compatible multi-source connector mesh."""
def __init__(
self,
client: httpx.Client | None = None,
*,
timeout_s: float = 15.0,
direct_urls: list[str] | None = None,
include_disabled_social: bool = False,
):
self.client = client or httpx.Client(
timeout=timeout_s,
follow_redirects=True,
headers={"User-Agent": "TinyMindSourceMesh/1.0 source-grounded research"},
)
self.direct_urls = direct_urls or []
self.include_disabled_social = include_disabled_social
def research(self, query: str, out_dir: str | Path, max_results: int = 8) -> dict[str, Any]:
out = Path(out_dir)
out.mkdir(parents=True, exist_ok=True)
connector_reports: list[dict[str, Any]] = []
sources: list[dict[str, Any]] = []
sources.extend(self._web_search(query, out, max_results=max(2, min(max_results, 6)), reports=connector_reports))
sources.extend(self._direct_urls(query, max_results=max_results, reports=connector_reports))
sources.extend(self._github(query, max_results=max(1, min(max_results, 4)), reports=connector_reports))
sources.extend(self._huggingface(query, max_results=max(1, min(max_results, 4)), reports=connector_reports))
deduped: dict[str, dict[str, Any]] = {}
for item in sources:
key = str(item.get("sha256") or item.get("url"))
current = deduped.get(key)
if current is None or float(item.get("score", 0)) > float(current.get("score", 0)):
deduped[key] = item
ranked = sorted(deduped.values(), key=lambda row: float(row.get("score", 0)), reverse=True)[: max_results * 2]
policy_rows = [asdict(policy) for policy in DEFAULT_PLATFORM_POLICIES]
report = {
"schema_version": "tinymind-source-mesh-v1",
"created_at": datetime.now(timezone.utc).isoformat(),
"query": query,
"policy": "multi_source_fetch_hash_verify_no_auth_bypass",
"connectors": connector_reports,
"platform_policy": policy_rows,
"sources": ranked,
"source_count": len(ranked),
"claim_gate": {
"multi_source_mesh_ready": True,
"social_media_full_access_claim_allowed": False,
"private_or_auth_wall_scraping_allowed": False,
"reason": "Social sources require official APIs, user export, or public endpoints; the mesh records blocked sources instead of bypassing them.",
},
}
path = out / "source_mesh_report.json"
report["report_path"] = str(path)
path.write_text(json.dumps(report, ensure_ascii=False, indent=2, sort_keys=True), encoding="utf-8")
return report
def _web_search(self, query: str, out: Path, *, max_results: int, reports: list[dict[str, Any]]) -> list[dict[str, Any]]:
try:
report = ExternalResearcher(client=self.client).research(query, out / "web_search", max_results=max_results)
reports.append({"connector": "public_web", "status": "ok", "source_count": report.get("source_count", 0)})
rows = []
for source in report.get("sources", []):
item = dict(source)
item["connector"] = "public_web"
rows.append(item)
return rows
except Exception as exc:
reports.append({"connector": "public_web", "status": "error", "error": str(exc)[:400]})
return []
def _direct_urls(self, query: str, *, max_results: int, reports: list[dict[str, Any]]) -> list[dict[str, Any]]:
rows: list[dict[str, Any]] = []
for url in self.direct_urls[:max_results]:
try:
resp = self.client.get(url)
resp.raise_for_status()
text = _text_from_response(resp)
item = _source(url, text[:90], text, query, "direct_url")
if item:
rows.append(item)
except Exception as exc:
reports.append({"connector": "direct_url", "url": url, "status": "error", "error": str(exc)[:300]})
reports.append({"connector": "direct_url", "status": "ok", "source_count": len(rows)})
return rows
def _github(self, query: str, *, max_results: int, reports: list[dict[str, Any]]) -> list[dict[str, Any]]:
try:
url = f"https://api.github.com/search/repositories?q={quote_plus(query)}&per_page={max_results}"
resp = self.client.get(url)
resp.raise_for_status()
payload = resp.json()
rows = []
for repo in payload.get("items", [])[:max_results]:
full_name = str(repo.get("full_name") or repo.get("name") or "")
text = " ".join(
[
full_name,
str(repo.get("description") or ""),
" ".join(str(tag) for tag in repo.get("topics", []) or []),
str(repo.get("language") or ""),
]
)
item = _source(str(repo.get("html_url") or ""), full_name, text, query, "github_public_api")
if item:
rows.append(item)
reports.append({"connector": "github_public_api", "status": "ok", "source_count": len(rows)})
return rows
except Exception as exc:
reports.append({"connector": "github_public_api", "status": "error", "error": str(exc)[:400]})
return []
def _huggingface(self, query: str, *, max_results: int, reports: list[dict[str, Any]]) -> list[dict[str, Any]]:
rows: list[dict[str, Any]] = []
endpoints = [
("hf_datasets", f"https://huggingface.co/api/datasets?search={quote_plus(query)}&limit={max_results}"),
("hf_models", f"https://huggingface.co/api/models?search={quote_plus(query)}&limit={max_results}"),
]
for connector, url in endpoints:
try:
resp = self.client.get(url)
resp.raise_for_status()
payload = resp.json()
for item_payload in payload[:max_results] if isinstance(payload, list) else []:
item_id = str(item_payload.get("id") or item_payload.get("modelId") or "")
text = " ".join(
[
item_id,
str(item_payload.get("description") or ""),
" ".join(str(tag) for tag in item_payload.get("tags", []) or []),
str(item_payload.get("pipeline_tag") or ""),
]
)
item = _source(f"https://huggingface.co/{item_id}", item_id, text, query, connector)
if item:
rows.append(item)
reports.append({"connector": connector, "status": "ok", "source_count": len(rows)})
except Exception as exc:
reports.append({"connector": connector, "status": "error", "error": str(exc)[:400]})
return rows

Xet Storage Details

Size:
11 kB
·
Xet hash:
006d32517388f156608e9086ae473a2df8555d26fe997c71d73c14d5fb0b3247

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.