POLYMER-PROPERTY / scripts /run_literature_mining.py
sobinalosious92's picture
Upload 119 files
3f4ebee verified
#!/usr/bin/env python3
"""
Project-based literature mining CLI.
Examples:
python scripts/run_literature_mining.py --query "PEDOT:PSS thermoelectric" --limit 5
python scripts/run_literature_mining.py --project-id proj_xxx --query "P3HT conductivity" --save-mode files
"""
from __future__ import annotations
import argparse
import csv
import json
from pathlib import Path
from typing import Any, Dict, List
from dotenv import load_dotenv
from src.literature_service import (
DataPointRepo,
LiteraturePipeline,
ProjectRepo,
QueryIntentService,
QuerySessionRepo,
get_database,
)
load_dotenv()
def resolve_project_id(project_id: str | None, projects: ProjectRepo) -> str:
if project_id:
project = projects.get_project(project_id)
if not project:
raise ValueError(f"Project not found: {project_id}")
return project_id
existing = projects.list_projects()
if existing:
return existing[0]["id"]
created = projects.create_project(
name="Default Literature Project",
description="Auto-created by run_literature_mining.py",
)
return created["id"]
def export_points_to_files(project_id: str, points: List[Dict[str, Any]], out_dir: Path) -> None:
out_dir.mkdir(parents=True, exist_ok=True)
jsonl_path = out_dir / "validated_points.jsonl"
with jsonl_path.open("w", encoding="utf-8") as f:
for row in points:
f.write(json.dumps(row, ensure_ascii=False) + "\n")
csv_path = out_dir / "validated_points.csv"
if points:
with csv_path.open("w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=list(points[0].keys()))
writer.writeheader()
writer.writerows(points)
else:
csv_path.write_text("point_id,project_id\n", encoding="utf-8")
print(f"Exported {len(points)} rows to:")
print(f" - {jsonl_path}")
print(f" - {csv_path}")
def main() -> None:
parser = argparse.ArgumentParser(description="Project-based Literature Mining CLI")
parser.add_argument("--project-id", default=None, help="Target project ID")
parser.add_argument("--query", default="PEDOT:PSS thermoelectric conductivity", help="Search query")
parser.add_argument("--limit", type=int, default=5, help="Max papers per source")
parser.add_argument("--strategy", choices=["simple", "paperqa"], default="simple", help="Extraction strategy")
parser.add_argument("--model-provider", default="openai_compatible", help="Model provider name")
parser.add_argument("--model-name", default="gpt-oss:latest", help="Model name")
parser.add_argument("--save-mode", choices=["sqlite", "files"], default="sqlite", help="Result sink mode")
parser.add_argument("--no-save", action="store_true", help="Do not persist result to sqlite")
parser.add_argument("--manual-upload-dir", default="data/literature/manual_uploads", help="Reserved for batch manual upload")
args = parser.parse_args()
db = get_database("data/app.db")
project_repo = ProjectRepo(db)
point_repo = DataPointRepo(db)
query_repo = QuerySessionRepo(db)
query_intent = QueryIntentService(query_repo)
pipeline = LiteraturePipeline(db_path="data/app.db")
target_project_id = resolve_project_id(args.project_id, project_repo)
project = project_repo.get_project(target_project_id)
print("=" * 64)
print("Project-Based Literature Mining")
print(f"Project: {project['name']} ({target_project_id})")
print(f"Query: {args.query}")
print(f"Limit per source: {args.limit}")
print(f"Strategy: {args.strategy}")
print("=" * 64)
query_session = query_intent.analyze_and_store(target_project_id, args.query)
suggestions = json.loads(query_session.get("suggestions_json") or "[]")
if suggestions:
print("Query suggestions:")
for s in suggestions:
print(f" - {s}")
if query_session.get("clarification_required"):
print("Note: query marked as pending_clarification. Continuing by CLI override.")
if args.no_save:
discovered = pipeline.run_discovery(target_project_id, args.query, args.limit)
retrieved = pipeline.run_retrieval(target_project_id, discovered)
stats = pipeline.run_extraction(
target_project_id,
run_id=None,
paper_rows=retrieved,
strategy=args.strategy,
model_name=args.model_name,
use_full_text=True,
)
print(f"Extraction complete without DB run record: {stats}")
else:
result = pipeline.run_full_pipeline(
project_id=target_project_id,
query=args.query,
limit=args.limit,
strategy=args.strategy,
model_provider=args.model_provider,
model_name=args.model_name,
use_full_text=True,
)
print(f"Pipeline status: {result.get('status')}")
if result.get("status") != "completed":
print(f"Error: {result.get('error')}")
else:
print(json.dumps(result.get("stats", {}), indent=2))
points = point_repo.list_points(target_project_id)
if args.save_mode == "files":
run_dir = Path("data/literature/runs")
export_points_to_files(target_project_id, points, run_dir)
print("=" * 64)
print("Done.")
print("=" * 64)
if __name__ == "__main__":
main()