|
|
from __future__ import annotations |
|
|
import json, os, re, time, math, logging |
|
|
from pathlib import Path |
|
|
from typing import Dict, List, Iterable, Tuple, Optional |
|
|
|
|
|
import yaml |
|
|
import requests |
|
|
|
|
|
log = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_MD_FRONTMATTER = re.compile(r"^---\s*\n.*?\n---\s*\n", re.DOTALL) |
|
|
|
|
|
def normalize_text(text: str) -> str: |
|
|
lines = [ln.strip() for ln in text.splitlines()] |
|
|
cleaned = [] |
|
|
for ln in lines: |
|
|
if not ln: |
|
|
continue |
|
|
if sum(ch.isalnum() for ch in ln) < 3: |
|
|
continue |
|
|
cleaned.append(ln) |
|
|
s = "\n".join(cleaned) |
|
|
s = re.sub(r"\n{3,}", "\n\n", s) |
|
|
return s.strip() |
|
|
|
|
|
def md_to_text(md: str) -> str: |
|
|
md = re.sub(_MD_FRONTMATTER, "", md) |
|
|
md = re.sub(r"```.*?```", "", md, flags=re.DOTALL) |
|
|
md = re.sub(r"!\[[^\]]*\]\([^)]+\)", "", md) |
|
|
md = re.sub(r"\[([^\]]+)\]\([^)]+\)", r"\1", md) |
|
|
md = re.sub(r"^\s{0,3}#{1,6}\s*", "", md, flags=re.MULTILINE) |
|
|
md = md.replace("`", "") |
|
|
md = re.sub(r"^\s*[-*+]\s+", "• ", md, flags=re.MULTILINE) |
|
|
md = re.sub(r"^\s*>\s?", "", md, flags=re.MULTILINE) |
|
|
return normalize_text(md) |
|
|
|
|
|
def chunk_text(text: str, max_chars: int = 800, overlap: int = 120) -> List[str]: |
|
|
paras = [p.strip() for p in text.split("\n\n") if p.strip()] |
|
|
out: List[str] = [] |
|
|
buf = "" |
|
|
for p in paras: |
|
|
if len(p) > max_chars: |
|
|
i = 0 |
|
|
while i < len(p): |
|
|
j = min(i + max_chars, len(p)) |
|
|
out.append(p[i:j]) |
|
|
i = j - overlap if j - overlap > i else j |
|
|
continue |
|
|
if len(buf) + 2 + len(p) <= max_chars: |
|
|
buf = (buf + "\n\n" + p) if buf else p |
|
|
else: |
|
|
if buf: |
|
|
out.append(buf) |
|
|
buf = p |
|
|
if buf: |
|
|
out.append(buf) |
|
|
return out |
|
|
|
|
|
def write_jsonl(records: Iterable[Dict], out_path: Path) -> None: |
|
|
out_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
with out_path.open("w", encoding="utf-8") as f: |
|
|
for rec in records: |
|
|
f.write(json.dumps(rec, ensure_ascii=False) + "\n") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def gh_session() -> requests.Session: |
|
|
s = requests.Session() |
|
|
s.headers.update({ |
|
|
"Accept": "application/vnd.github+json", |
|
|
"User-Agent": "matrix-ai-rag-builder/1.0", |
|
|
}) |
|
|
tok = os.getenv("GITHUB_TOKEN") |
|
|
if tok: |
|
|
s.headers["Authorization"] = f"Bearer {tok}" |
|
|
return s |
|
|
|
|
|
def gh_get_json(url: str, sess: requests.Session, max_retries: int = 3) -> Dict | List: |
|
|
backoff = 1.0 |
|
|
for attempt in range(max_retries): |
|
|
r = sess.get(url, timeout=25) |
|
|
if r.status_code == 403 and "rate limit" in r.text.lower(): |
|
|
log.warning("GitHub rate-limited; sleeping %.1fs", backoff) |
|
|
time.sleep(backoff) |
|
|
backoff = min(backoff * 2, 30) |
|
|
continue |
|
|
r.raise_for_status() |
|
|
return r.json() |
|
|
r.raise_for_status() |
|
|
return {} |
|
|
|
|
|
def gh_list_org_repos(org: str, sess: requests.Session) -> List[Dict]: |
|
|
repos: List[Dict] = [] |
|
|
page = 1 |
|
|
while True: |
|
|
url = f"https://api.github.com/orgs/{org}/repos?per_page=100&page={page}" |
|
|
js = gh_get_json(url, sess) |
|
|
if not js: |
|
|
break |
|
|
repos.extend(js) |
|
|
if len(js) < 100: |
|
|
break |
|
|
page += 1 |
|
|
return repos |
|
|
|
|
|
def gh_list_tree(owner: str, repo: str, branch: str, sess: requests.Session) -> List[Dict]: |
|
|
url = f"https://api.github.com/repos/{owner}/{repo}/git/trees/{branch}?recursive=1" |
|
|
js = gh_get_json(url, sess) |
|
|
return js.get("tree", []) if isinstance(js, dict) else [] |
|
|
|
|
|
def gh_fetch_raw(owner: str, repo: str, branch: str, path: str, sess: requests.Session) -> Optional[str]: |
|
|
raw_url = f"https://raw.githubusercontent.com/{owner}/{repo}/{branch}/{path}" |
|
|
r = sess.get(raw_url, timeout=25) |
|
|
if r.status_code == 404 and branch == "main": |
|
|
raw_url = f"https://raw.githubusercontent.com/{owner}/{repo}/master/{path}" |
|
|
r = sess.get(raw_url, timeout=25) |
|
|
if r.status_code == 200: |
|
|
return r.text |
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def ingest_github_repo(owner: str, name: str, branch: str, docs_paths: List[str], |
|
|
include_readme: bool, exts: Tuple[str,...] = (".md",".mdx",".txt")) -> List[Tuple[str,str]]: |
|
|
sess = gh_session() |
|
|
out: List[Tuple[str,str]] = [] |
|
|
|
|
|
|
|
|
if include_readme: |
|
|
for candidate in ("README.md", "readme.md", "README.MD"): |
|
|
t = gh_fetch_raw(owner, name, branch, candidate, sess) |
|
|
if t: |
|
|
out.append((f"github:{owner}/{name}/{candidate}", md_to_text(t))) |
|
|
break |
|
|
|
|
|
|
|
|
tree = gh_list_tree(owner, name, branch, sess) |
|
|
if not tree: |
|
|
return out |
|
|
|
|
|
wanted_dirs = [p.strip("/").lower() for p in docs_paths] |
|
|
for entry in tree: |
|
|
if entry.get("type") != "blob": |
|
|
continue |
|
|
path = entry.get("path", "") |
|
|
lower = path.lower() |
|
|
if not lower.endswith(exts): |
|
|
continue |
|
|
if any(lower.startswith(d + "/") for d in wanted_dirs): |
|
|
t = gh_fetch_raw(owner, name, branch, path, sess) |
|
|
if not t: |
|
|
continue |
|
|
txt = md_to_text(t) if lower.endswith((".md",".mdx")) else normalize_text(t) |
|
|
if txt: |
|
|
out.append((f"github:{owner}/{name}/{path}", txt)) |
|
|
return out |
|
|
|
|
|
def ingest_github_sources(cfg: Dict) -> List[Tuple[str,str]]: |
|
|
out: List[Tuple[str,str]] = [] |
|
|
gh = cfg.get("github") or {} |
|
|
sess = gh_session() |
|
|
|
|
|
|
|
|
for repo in (gh.get("repos") or []): |
|
|
owner = repo["owner"] |
|
|
name = repo["name"] |
|
|
branch = repo.get("branch", "main") |
|
|
docs_paths = repo.get("docs_paths", ["docs"]) |
|
|
include_readme = bool(repo.get("include_readme", True)) |
|
|
out.extend(ingest_github_repo(owner, name, branch, docs_paths, include_readme)) |
|
|
|
|
|
|
|
|
for org in (gh.get("orgs") or []): |
|
|
try: |
|
|
repos = gh_list_org_repos(org, sess) |
|
|
except Exception as e: |
|
|
log.warning("Failed to list org %s: %s", org, e) |
|
|
continue |
|
|
for r in repos: |
|
|
owner = r["owner"]["login"] |
|
|
name = r["name"] |
|
|
default_branch = r.get("default_branch", "main") |
|
|
|
|
|
out.extend(ingest_github_repo(owner, name, default_branch, ["docs"], include_readme=True)) |
|
|
return out |
|
|
|
|
|
def ingest_local_sources(cfg: Dict) -> List[Tuple[str,str]]: |
|
|
out: List[Tuple[str,str]] = [] |
|
|
local = cfg.get("local") or {} |
|
|
paths = local.get("paths") or [] |
|
|
glob_pat = local.get("glob", "**/*.md") |
|
|
for p in paths: |
|
|
fp = Path(p) |
|
|
if fp.is_file(): |
|
|
try: |
|
|
raw = fp.read_text(encoding="utf-8", errors="ignore") |
|
|
txt = md_to_text(raw) if fp.suffix.lower() in {".md",".mdx"} else normalize_text(raw) |
|
|
if txt: |
|
|
out.append((str(fp), txt)) |
|
|
except Exception as e: |
|
|
log.warning("Failed reading %s: %s", fp, e) |
|
|
elif fp.is_dir(): |
|
|
for f in fp.rglob(glob_pat): |
|
|
try: |
|
|
raw = f.read_text(encoding="utf-8", errors="ignore") |
|
|
txt = md_to_text(raw) if f.suffix.lower() in {".md",".mdx"} else normalize_text(raw) |
|
|
if txt: |
|
|
out.append((str(f), txt)) |
|
|
except Exception as e: |
|
|
log.warning("Failed reading %s: %s", f, e) |
|
|
return out |
|
|
|
|
|
def build_kb_from_config(config_path: str = "configs/rag_sources.yaml", |
|
|
out_jsonl: str = "data/kb.jsonl", |
|
|
max_chars: int = 800, |
|
|
overlap: int = 120, |
|
|
minlen: int = 200, |
|
|
dedupe: bool = True) -> int: |
|
|
cfg: Dict = {} |
|
|
p = Path(config_path) |
|
|
if p.exists(): |
|
|
cfg = yaml.safe_load(p.read_text(encoding="utf-8")) or {} |
|
|
else: |
|
|
log.warning("rag_sources.yaml not found at %s (using defaults)", p) |
|
|
|
|
|
records: List[Dict] = [] |
|
|
|
|
|
|
|
|
try: |
|
|
gh_docs = ingest_github_sources(cfg) |
|
|
for src, text in gh_docs: |
|
|
for chunk in chunk_text(text, max_chars, overlap): |
|
|
if len(chunk) >= minlen: |
|
|
records.append({"text": chunk, "source": src}) |
|
|
except Exception as e: |
|
|
log.warning("GitHub ingest failed: %s", e) |
|
|
|
|
|
|
|
|
try: |
|
|
loc_docs = ingest_local_sources(cfg) |
|
|
for src, text in loc_docs: |
|
|
for chunk in chunk_text(text, max_chars, overlap): |
|
|
if len(chunk) >= minlen: |
|
|
records.append({"text": chunk, "source": src}) |
|
|
except Exception as e: |
|
|
log.warning("Local ingest failed: %s", e) |
|
|
|
|
|
|
|
|
for url in (cfg.get("urls") or []): |
|
|
try: |
|
|
r = requests.get(url, timeout=25) |
|
|
r.raise_for_status() |
|
|
txt = normalize_text(r.text) |
|
|
for chunk in chunk_text(txt, max_chars, overlap): |
|
|
if len(chunk) >= minlen: |
|
|
records.append({"text": chunk, "source": url}) |
|
|
except Exception as e: |
|
|
log.warning("URL ingest failed for %s: %s", url, e) |
|
|
|
|
|
if dedupe: |
|
|
seen = set() |
|
|
deduped: List[Dict] = [] |
|
|
for rec in records: |
|
|
h = hash(rec["text"]) |
|
|
if h in seen: |
|
|
continue |
|
|
seen.add(h) |
|
|
deduped.append(rec) |
|
|
records = deduped |
|
|
|
|
|
if not records: |
|
|
log.warning("No KB records produced.") |
|
|
return 0 |
|
|
|
|
|
out_path = Path(out_jsonl) |
|
|
write_jsonl(records, out_path) |
|
|
log.info("Wrote %d chunks to %s", len(records), out_path) |
|
|
return len(records) |
|
|
|
|
|
def ensure_kb(out_jsonl: str = "data/kb.jsonl", |
|
|
config_path: str = "configs/rag_sources.yaml", |
|
|
skip_if_exists: bool = True) -> bool: |
|
|
""" |
|
|
If kb.jsonl exists -> return True. |
|
|
Else -> build from config and return True on success. |
|
|
""" |
|
|
out = Path(out_jsonl) |
|
|
if skip_if_exists and out.exists() and out.stat().st_size > 0: |
|
|
log.info("KB already present at %s (skipping build)", out) |
|
|
return True |
|
|
n = build_kb_from_config(config_path=config_path, out_jsonl=out_jsonl) |
|
|
return n > 0 |
|
|
|