Agentic_RAG / scripts /ingest_articles.py
Oleksii Obolonskyi
Initial commit
d10c06c
#!/usr/bin/env python3
import os
import re
import json
import time
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import requests
from bs4 import BeautifulSoup
from readability import Document
# PDF fallback for arXiv / PDFs
from pdfminer.high_level import extract_text as pdfminer_extract_text
# -----------------------------
# Output
# -----------------------------
OUT_DIR = os.environ.get("RAG_OUT_DIR", "data/normalized")
OUT_JSONL = os.path.join(OUT_DIR, "chunks_articles.jsonl")
OUT_MANIFEST = os.path.join(OUT_DIR, "manifest_articles.json")
# -----------------------------
# Fetch config
# -----------------------------
HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/121.0.0.0 Safari/537.36"
),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
"Cache-Control": "no-cache",
"Pragma": "no-cache",
}
TIMEOUT_S = 30
# -----------------------------
# Sources (latest recommendations)
# -----------------------------
# -----------------------------
# Sources file (recommended)
# -----------------------------
SOURCES_FILE = os.environ.get("RAG_ARTICLE_SOURCES", "sources_articles.json")
def load_sources() -> List[Dict]:
# Prefer JSON config so users can add sources without editing code.
p = Path(SOURCES_FILE)
if p.exists():
data = json.loads(p.read_text(encoding="utf-8"))
if not isinstance(data, list):
raise ValueError(f"{SOURCES_FILE} must be a JSON list of sources")
return data
return []
SOURCES: List[Dict] = load_sources() or [
{
"id": "anthropic_multi_agent_research_system",
"type": "html",
"publisher": "Anthropic",
"url": "https://www.anthropic.com/engineering/multi-agent-research-system",
},
{
"id": "anthropic_agentic_misalignment",
"type": "html",
"publisher": "Anthropic",
"url": "https://www.anthropic.com/research/agentic-misalignment",
},
{
"id": "react_arxiv_2210_03629",
"type": "pdf",
"publisher": "arXiv",
"url": "https://arxiv.org/pdf/2210.03629.pdf",
},
{
"id": "rag_arxiv_2005_11401",
"type": "pdf",
"publisher": "arXiv",
"url": "https://arxiv.org/pdf/2005.11401.pdf",
},
{
"id": "toolformer_arxiv_2302_04761",
"type": "pdf",
"publisher": "arXiv",
"url": "https://arxiv.org/pdf/2302.04761.pdf",
},
{
"id": "tds_single_vs_multi_agent_systems",
"type": "html",
"publisher": "Towards Data Science",
"url": "https://towardsdatascience.com/agentic-ai-single-vs-multi-agent-systems/",
},
{
"id": "tds_langgraph_101_deep_research_agent",
"type": "html",
"publisher": "Towards Data Science",
"url": "https://towardsdatascience.com/langgraph-101-lets-build-a-deep-research-agent/",
},
{
"id": "tds_effective_ai_agents_at_scale",
"type": "html",
"publisher": "Towards Data Science",
"url": "https://towardsdatascience.com/how-to-build-effective-ai-agents-to-process-millions-of-requests/",
},
{
"id": "ai_sdk_mcp_tools",
"type": "html",
"publisher": "AI SDK",
"url": "https://ai-sdk.dev/docs/ai-sdk-core/mcp-tools"
},
{
"id": "byteplus_mcp_topic",
"type": "html",
"publisher": "BytePlus",
"url": "https://www.byteplus.com/en/topic/542256?title="
},
{
"id": "merge_mcp_tool_schema",
"type": "html",
"publisher": "Merge.dev",
"url": "https://www.merge.dev/blog/mcp-tool-schema"
},
{
"id": "netfoundry_ai_agent_mcp_decision",
"type": "html",
"publisher": "NetFoundry",
"url": "https://netfoundry.io/ai/how-an-ai-agent-decides-to-call-mcp-tools/"
},
{
"id": "modelcontextprotocol_github",
"type": "html",
"publisher": "Model Context Protocol",
"url": "https://github.com/modelcontextprotocol/modelcontextprotocol"
},
{
"id": "devto_react_vs_plan_execute",
"type": "html",
"publisher": "Dev.to",
"url": "https://dev.to/jamesli/react-vs-plan-and-execute-a-practical-comparison-of-llm-agent-patterns-4gh9"
},
{
"id": "byaiteam_agent_planning_reliability",
"type": "html",
"publisher": "By AI Team",
"url": "https://byaiteam.com/blog/2025/12/09/ai-agent-planning-react-vs-plan-and-execute-for-reliability/"
},
{
"id": "linkedin_build_ai_agent_post",
"type": "html",
"publisher": "LinkedIn",
"url": "https://www.linkedin.com/posts/lewisowain_how-to-build-an-ai-agent-activity-7402339630764941312-_G5h/"
},
{
"id": "scitepress_multiagent_paper_2021",
"type": "pdf",
"publisher": "SciTePress",
"url": "https://www.scitepress.org/Papers/2021/105593/105593.pdf"
},
{
"id": "geeksforgeeks_informed_vs_uninformed_search",
"type": "html",
"publisher": "GeeksforGeeks",
"url": "https://www.geeksforgeeks.org/artificial-intelligence/difference-between-informed-and-uninformed-search-in-ai/"
},
{
"id": "baeldung_informed_vs_uninformed_search",
"type": "html",
"publisher": "Baeldung",
"url": "https://www.baeldung.com/cs/informed-vs-uninformed-search"
},
{
"id": "scaler_informed_vs_uninformed_search",
"type": "html",
"publisher": "Scaler",
"url": "https://www.scaler.com/topics/difference-between-informed-and-uninformed-search/"
},
{
"id": "scipub_agent_search_paper_2021",
"type": "pdf",
"publisher": "Science Publications",
"url": "https://thescipub.com/pdf/jcssp.2021.1147.1156.pdf"
},
{
"id": "ibm_ai_agent_orchestration",
"type": "html",
"publisher": "IBM",
"url": "https://www.ibm.com/think/topics/ai-agent-orchestration"
},
{
"id": "domo_ai_agent_orchestration",
"type": "html",
"publisher": "Domo",
"url": "https://www.domo.com/glossary/ai-agent-orchestration"
},
{
"id": "aimultiple_agentic_frameworks",
"type": "html",
"publisher": "AI Multiple",
"url": "https://research.aimultiple.com/agentic-frameworks/"
},
{
"id": "reddit_multiagent_system_evaluator",
"type": "html",
"publisher": "Reddit",
"url": "https://www.reddit.com/r/PromptSynergy/comments/1np7wxw/multiagent_system_evaluator_with_40point_analysis/"
},
{
"id": "dextra_ai_agent_orchestration",
"type": "html",
"publisher": "Dextra Labs",
"url": "https://dextralabs.com/blog/what-is-ai-agent-orchestration/"
},
{
"id": "kubiya_agent_orchestration_frameworks",
"type": "html",
"publisher": "Kubiya",
"url": "https://www.kubiya.ai/blog/ai-agent-orchestration-frameworks"
},
{
"id": "projectpro_ai_agent_evaluation",
"type": "html",
"publisher": "ProjectPro",
"url": "https://www.projectpro.io/article/ai-agent-evaluation/1178"
},
{
"id": "zyrix_multi_agent_testing_guide_2025",
"type": "html",
"publisher": "Zyrix AI",
"url": "https://zyrix.ai/blogs/multi-agent-ai-testing-guide-2025/"
}
]
# -----------------------------
# Utilities
# -----------------------------
def now_iso() -> str:
return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
def clean_ws(s: str) -> str:
s = s.replace("\r\n", "\n").replace("\r", "\n")
s = re.sub(r"\n{3,}", "\n\n", s)
s = re.sub(r"[ \t]{2,}", " ", s)
return s.strip()
STOPWORDS = {
"a","an","and","are","as","at","be","but","by","can","do","does","for","from","how","i","if","in","is","it","of","on","or",
"that","the","their","then","there","these","this","to","was","were","what","when","where","which","who","why","with","you","your"
}
def chunk_text(text: str, size: int = 1200, overlap: int = 150) -> List[str]:
text = text.strip()
if not text:
return []
chunks = []
start = 0
n = len(text)
while start < n:
end = min(start + size, n)
chunks.append(text[start:end])
if end == n:
break
start = max(0, end - overlap)
return chunks
def extract_tags(text: str, title: Optional[str], max_tags: int = 8) -> List[str]:
content = " ".join([t for t in [title, text] if t])
tokens = re.findall(r"[A-Za-z][A-Za-z0-9_]{2,}", content)
lowered = [t.lower() for t in tokens if t.lower() not in STOPWORDS]
freq = {}
for t in lowered:
freq[t] = freq.get(t, 0) + 1
keywords = sorted(freq.keys(), key=lambda k: (-freq[k], k))[:max_tags]
entities = []
for m in re.findall(r"\b[A-Z][a-zA-Z]+\b(?:\s+[A-Z][a-zA-Z]+\b){0,2}", content):
ent = m.strip()
if ent.lower() in STOPWORDS:
continue
if ent not in entities:
entities.append(ent)
if len(entities) >= max_tags:
break
tags = []
for k in keywords + entities:
if k and k not in tags:
tags.append(k)
return tags[:max_tags]
def normalize_url(url: str) -> str:
if url.endswith("title="):
return url[:-6].rstrip("?&")
return url
def extract_visible_text(html: str) -> str:
soup = BeautifulSoup(html, "html.parser")
for tag in soup(["script", "style", "noscript", "svg", "header", "footer", "nav", "aside"]):
tag.decompose()
text = soup.get_text("\n")
return clean_ws(text)
def safe_get(session: requests.Session, url: str) -> requests.Response:
# basic retry for transient blocks
last_exc = None
for attempt in range(3):
try:
r = session.get(url, timeout=TIMEOUT_S, allow_redirects=True)
return r
except Exception as e:
last_exc = e
time.sleep(1.25 * (attempt + 1))
raise last_exc
# -----------------------------
# Metadata extraction (best effort)
# -----------------------------
def extract_meta_from_html(html: str, url: str) -> Tuple[str, Optional[str], Optional[str]]:
"""
Returns: (title, author, publication_date_iso)
Best-effort using meta tags commonly found in blogs/news sites.
"""
soup = BeautifulSoup(html, "html.parser")
title = ""
if soup.title and soup.title.get_text(strip=True):
title = soup.title.get_text(strip=True)
# Common meta tags
def meta(name: str) -> Optional[str]:
tag = soup.find("meta", attrs={"name": name})
if tag and tag.get("content"):
return tag["content"].strip()
tag = soup.find("meta", attrs={"property": name})
if tag and tag.get("content"):
return tag["content"].strip()
return None
title2 = meta("og:title") or meta("twitter:title")
if title2:
title = title2
author = meta("author") or meta("article:author") or meta("og:article:author")
pub = meta("article:published_time") or meta("og:article:published_time") or meta("pubdate") or meta("date")
# Normalize date to ISO if possible (keep as-is if parsing fails)
pub_iso = None
if pub:
# Many sites already provide ISO; keep it if it looks like ISO
if re.match(r"^\d{4}-\d{2}-\d{2}", pub):
pub_iso = pub
else:
# Try minimal parsing like "Jan 10, 2025"
try:
from dateutil import parser as dtparser # python-dateutil in requirements
pub_iso = dtparser.parse(pub).astimezone(timezone.utc).isoformat().replace("+00:00", "Z")
except Exception:
pub_iso = pub # best-effort fallback
return title.strip(), (author.strip() if author else None), (pub_iso.strip() if pub_iso else None)
# -----------------------------
# HTML extraction
# -----------------------------
def extract_main_text_readability(html: str) -> Tuple[str, str]:
doc = Document(html)
title = doc.short_title() or ""
summary_html = doc.summary(html_partial=True)
soup = BeautifulSoup(summary_html, "html.parser")
parts = []
for el in soup.find_all(["h1", "h2", "h3", "p", "li"]):
t = el.get_text(" ", strip=True)
if t:
parts.append(t)
text = "\n".join(parts)
return title.strip(), clean_ws(text)
def fetch_html_article(session: requests.Session, url: str) -> Tuple[Optional[str], Optional[str], Optional[str], Optional[str]]:
url = normalize_url(url)
r = safe_get(session, url)
if r.status_code == 403:
return None, None, None, f"403 Forbidden (site blocked requests): {url}"
if r.status_code >= 400:
return None, None, None, f"HTTP {r.status_code}: {url}"
html = r.text
meta_title, author, pub_date = extract_meta_from_html(html, url)
title, text = extract_main_text_readability(html)
# Prefer readability title but fall back to meta
final_title = title or meta_title or url
# Fallback if readability is too thin
if not text or len(text) < 500:
soup = BeautifulSoup(html, "html.parser")
raw = "\n".join(p.get_text(" ", strip=True) for p in soup.find_all("p"))
raw = clean_ws(raw)
if len(raw) > len(text):
text = raw
if not text or len(text) < 300:
raw = extract_visible_text(html)
if len(raw) > len(text or ""):
text = raw
if not text or len(text) < 200:
return None, None, None, f"Could not extract sufficient text from: {url}"
return final_title, author, pub_date, text
# -----------------------------
# PDF extraction (arXiv etc.)
# -----------------------------
def fetch_pdf_text(session: requests.Session, url: str) -> Tuple[Optional[str], Optional[str], Optional[str], Optional[str]]:
r = safe_get(session, url)
if r.status_code >= 400:
return None, None, None, f"HTTP {r.status_code}: {url}"
# Save temp pdf
os.makedirs(os.path.join(OUT_DIR, "_tmp"), exist_ok=True)
tmp_path = os.path.join(OUT_DIR, "_tmp", f"tmp_{int(time.time()*1000)}.pdf")
with open(tmp_path, "wb") as f:
f.write(r.content)
# Extract text
try:
text = pdfminer_extract_text(tmp_path) or ""
finally:
# remove tmp
try:
os.remove(tmp_path)
except OSError:
pass
text = clean_ws(text)
if not text or len(text) < 800:
return None, None, None, f"PDF text extraction too small for: {url}"
# Title/author/date for arXiv PDFs: best-effort from first page text
# Keep these optional; you can enrich later via arXiv API if you want.
title = "arXiv paper"
author = None
pub_date = None
return title, author, pub_date, text
# -----------------------------
# Main ingestion
# -----------------------------
def main():
os.makedirs(OUT_DIR, exist_ok=True)
session = requests.Session()
session.headers.update(HEADERS)
written = 0
skipped = []
manifest_docs = []
with open(OUT_JSONL, "w", encoding="utf-8") as out:
for src in SOURCES:
doc_id = f"article::{src['id']}"
url = src["url"]
publisher = src.get("publisher")
if src["type"] == "html":
title, author, pub_date, text_or_err = fetch_html_article(session, url)
elif src["type"] == "pdf":
title, author, pub_date, text_or_err = fetch_pdf_text(session, url)
else:
skipped.append({"id": src["id"], "url": url, "reason": f"Unknown type: {src['type']}"})
continue
if title is None:
skipped.append({"id": src["id"], "url": url, "reason": text_or_err})
continue
text = text_or_err
chunks = chunk_text(text, size=1200, overlap=150)
if not chunks:
skipped.append({"id": src["id"], "url": url, "reason": "No chunks produced"})
continue
for i, chunk in enumerate(chunks, 1):
breadcrumbs = f"Article: {title}"
tags = extract_tags(chunk, title)
rec = {
"chunk_id": f"{doc_id}::{i:06d}",
"doc_id": doc_id,
"doc_title": title,
"title": title,
"doc_type": "article",
"publisher": publisher,
"author": author,
"publication_date": pub_date,
"source_url": url,
"section_title": None,
"page_start": None,
"page_end": None,
"source_type": "article",
"date": pub_date,
"url": url,
"priority": 1,
"tags": tags,
"breadcrumbs": breadcrumbs,
"chunk_type": "section",
"text": f"Breadcrumbs: {breadcrumbs}\n{chunk}",
}
out.write(json.dumps(rec, ensure_ascii=False) + "\n")
written += 1
manifest_docs.append(
{
"id": doc_id,
"title": title,
"format": "pdf" if src["type"] == "pdf" else "html",
"filename": url,
"blocks": len(chunks),
"source_type": "article",
"url": url,
"publisher": publisher,
"author": author,
"publication_date": pub_date,
"date": pub_date,
}
)
print(f"[OK] {src['id']}: {len(chunks)} chunks")
manifest = {
"generated_at": now_iso(),
"documents": manifest_docs,
}
with open(OUT_MANIFEST, "w", encoding="utf-8") as f:
json.dump(manifest, f, indent=2, ensure_ascii=False)
# Write a small ingestion report
report_path = os.path.join(OUT_DIR, "articles_ingest_report.json")
report = {
"generated_at": now_iso(),
"out_jsonl": OUT_JSONL,
"out_manifest": OUT_MANIFEST,
"total_chunks_written": written,
"sources_total": len(SOURCES),
"sources_skipped": skipped,
"notes": [
"Towards Data Science links may return 403 and are skipped to keep the pipeline reproducible.",
"arXiv PDFs are ingested via pdfminer; title/author/date may be enriched later.",
],
}
with open(report_path, "w", encoding="utf-8") as f:
json.dump(report, f, indent=2, ensure_ascii=False)
print(f"[DONE] Wrote {written} chunks to {OUT_JSONL}")
if skipped:
print(f"[WARN] Skipped {len(skipped)} sources. See {report_path}.")
if __name__ == "__main__":
main()