Researcher / src /pipelines /security.py
amarck's picture
Initial commit: Research Intelligence System
a0f27fa
"""Security paper pipeline.
Fetches security papers from arXiv (cs.CR + adjacent categories),
finds code URLs, and writes to the database.
"""
import logging
import re
import time
from datetime import datetime, timedelta, timezone
import arxiv
import requests
from src.config import (
ADJACENT_CATEGORIES,
GITHUB_TOKEN,
GITHUB_URL_RE,
SECURITY_EXCLUDE_RE,
SECURITY_KEYWORDS,
SECURITY_LLM_RE,
)
from src.db import create_run, finish_run, insert_papers
log = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# arXiv fetching
# ---------------------------------------------------------------------------
def fetch_arxiv_papers(start: datetime, end: datetime, max_papers: int) -> list[dict]:
"""Fetch papers from arXiv: all cs.CR + security-filtered adjacent categories."""
client = arxiv.Client(page_size=500, delay_seconds=3.0, num_retries=3)
papers: dict[str, dict] = {}
# Primary: all cs.CR papers
log.info("Fetching cs.CR papers ...")
cr_query = arxiv.Search(
query="cat:cs.CR",
max_results=max_papers,
sort_by=arxiv.SortCriterion.SubmittedDate,
sort_order=arxiv.SortOrder.Descending,
)
for result in client.results(cr_query):
pub = result.published.replace(tzinfo=timezone.utc)
if pub < start:
break
if pub > end:
continue
paper = _result_to_dict(result)
papers[paper["entry_id"]] = paper
log.info("cs.CR: %d papers", len(papers))
# Adjacent categories with security keyword filter
for cat in ADJACENT_CATEGORIES:
adj_query = arxiv.Search(
query=f"cat:{cat}",
max_results=max_papers // len(ADJACENT_CATEGORIES),
sort_by=arxiv.SortCriterion.SubmittedDate,
sort_order=arxiv.SortOrder.Descending,
)
count = 0
for result in client.results(adj_query):
pub = result.published.replace(tzinfo=timezone.utc)
if pub < start:
break
if pub > end:
continue
text = f"{result.title} {result.summary}"
if SECURITY_KEYWORDS.search(text):
paper = _result_to_dict(result)
if paper["entry_id"] not in papers:
papers[paper["entry_id"]] = paper
count += 1
log.info(" %s: %d security-relevant papers", cat, count)
# Pre-filter: remove excluded topics (blockchain, surveys, etc.)
before = len(papers)
papers = {
eid: p for eid, p in papers.items()
if not SECURITY_EXCLUDE_RE.search(f"{p['title']} {p['abstract']}")
}
excluded = before - len(papers)
if excluded:
log.info("Excluded %d papers (blockchain/survey/off-topic)", excluded)
# Tag LLM-adjacent papers so the scoring prompt can apply hard caps
for p in papers.values():
text = f"{p['title']} {p['abstract']}"
p["llm_adjacent"] = bool(SECURITY_LLM_RE.search(text))
llm_count = sum(1 for p in papers.values() if p["llm_adjacent"])
if llm_count:
log.info("Tagged %d papers as LLM-adjacent", llm_count)
all_papers = list(papers.values())
log.info("Total unique papers: %d", len(all_papers))
return all_papers
def _result_to_dict(result: arxiv.Result) -> dict:
"""Convert an arxiv.Result to a plain dict."""
arxiv_id = result.entry_id.split("/abs/")[-1]
base_id = re.sub(r"v\d+$", "", arxiv_id)
return {
"arxiv_id": base_id,
"entry_id": result.entry_id,
"title": result.title.replace("\n", " ").strip(),
"authors": [a.name for a in result.authors[:10]],
"abstract": result.summary.replace("\n", " ").strip(),
"published": result.published.isoformat(),
"categories": list(result.categories),
"pdf_url": result.pdf_url,
"arxiv_url": result.entry_id,
"comment": (result.comment or "").replace("\n", " ").strip(),
"source": "arxiv",
"github_repo": "",
"github_stars": None,
"hf_upvotes": 0,
"hf_models": [],
"hf_datasets": [],
"hf_spaces": [],
}
# ---------------------------------------------------------------------------
# Code URL finding
# ---------------------------------------------------------------------------
def extract_github_urls(paper: dict) -> list[str]:
"""Extract GitHub URLs from abstract and comments."""
text = f"{paper['abstract']} {paper.get('comment', '')}"
return list(set(GITHUB_URL_RE.findall(text)))
def search_github_for_paper(title: str, token: str | None) -> str | None:
"""Search GitHub for a repo matching the paper title."""
headers = {"Accept": "application/vnd.github.v3+json"}
if token:
headers["Authorization"] = f"token {token}"
if token:
try:
resp = requests.get("https://api.github.com/rate_limit", headers=headers, timeout=10)
if resp.ok:
remaining = resp.json().get("resources", {}).get("search", {}).get("remaining", 0)
if remaining < 5:
return None
except requests.RequestException:
pass
clean = re.sub(r"[^\w\s]", " ", title)
words = clean.split()[:8]
query = " ".join(words)
try:
resp = requests.get(
"https://api.github.com/search/repositories",
params={"q": query, "sort": "updated", "per_page": 3},
headers=headers,
timeout=10,
)
if not resp.ok:
return None
items = resp.json().get("items", [])
if items:
return items[0]["html_url"]
except requests.RequestException:
pass
return None
def find_code_urls(papers: list[dict]) -> dict[str, str | None]:
"""Find code/repo URLs for each paper."""
token = GITHUB_TOKEN or None
code_urls: dict[str, str | None] = {}
for paper in papers:
urls = extract_github_urls(paper)
if urls:
code_urls[paper["entry_id"]] = urls[0]
continue
url = search_github_for_paper(paper["title"], token)
code_urls[paper["entry_id"]] = url
if not token:
time.sleep(2)
return code_urls
# ---------------------------------------------------------------------------
# Pipeline entry point
# ---------------------------------------------------------------------------
def run_security_pipeline(
start: datetime | None = None,
end: datetime | None = None,
max_papers: int = 300,
) -> int:
"""Run the full security pipeline. Returns the run ID."""
if end is None:
end = datetime.now(timezone.utc)
if start is None:
start = end - timedelta(days=7)
if start.tzinfo is None:
start = start.replace(tzinfo=timezone.utc)
if end.tzinfo is None:
end = end.replace(tzinfo=timezone.utc, hour=23, minute=59, second=59)
run_id = create_run("security", start.date().isoformat(), end.date().isoformat())
log.info("Run %d: %s to %s", run_id, start.date(), end.date())
try:
# Step 1: Fetch papers
papers = fetch_arxiv_papers(start, end, max_papers)
if not papers:
log.info("No papers found")
finish_run(run_id, 0)
return run_id
# Step 2: Find code URLs
log.info("Searching for code repositories ...")
code_urls = find_code_urls(papers)
with_code = sum(1 for v in code_urls.values() if v)
log.info("Found code for %d/%d papers", with_code, len(papers))
# Attach code URLs to papers as github_repo
for paper in papers:
url = code_urls.get(paper["entry_id"])
if url:
paper["github_repo"] = url
# Step 3: Insert into DB
insert_papers(papers, run_id, "security")
finish_run(run_id, len(papers))
log.info("Done — %d papers inserted", len(papers))
return run_id
except Exception as e:
finish_run(run_id, 0, status="failed")
log.exception("Pipeline failed")
raise