#!/usr/bin/env python3 """ Project-based literature mining CLI. Examples: python scripts/run_literature_mining.py --query "PEDOT:PSS thermoelectric" --limit 5 python scripts/run_literature_mining.py --project-id proj_xxx --query "P3HT conductivity" --save-mode files """ from __future__ import annotations import argparse import csv import json from pathlib import Path from typing import Any, Dict, List from dotenv import load_dotenv from src.literature_service import ( DataPointRepo, LiteraturePipeline, ProjectRepo, QueryIntentService, QuerySessionRepo, get_database, ) load_dotenv() def resolve_project_id(project_id: str | None, projects: ProjectRepo) -> str: if project_id: project = projects.get_project(project_id) if not project: raise ValueError(f"Project not found: {project_id}") return project_id existing = projects.list_projects() if existing: return existing[0]["id"] created = projects.create_project( name="Default Literature Project", description="Auto-created by run_literature_mining.py", ) return created["id"] def export_points_to_files(project_id: str, points: List[Dict[str, Any]], out_dir: Path) -> None: out_dir.mkdir(parents=True, exist_ok=True) jsonl_path = out_dir / "validated_points.jsonl" with jsonl_path.open("w", encoding="utf-8") as f: for row in points: f.write(json.dumps(row, ensure_ascii=False) + "\n") csv_path = out_dir / "validated_points.csv" if points: with csv_path.open("w", newline="", encoding="utf-8") as f: writer = csv.DictWriter(f, fieldnames=list(points[0].keys())) writer.writeheader() writer.writerows(points) else: csv_path.write_text("point_id,project_id\n", encoding="utf-8") print(f"Exported {len(points)} rows to:") print(f" - {jsonl_path}") print(f" - {csv_path}") def main() -> None: parser = argparse.ArgumentParser(description="Project-based Literature Mining CLI") parser.add_argument("--project-id", default=None, help="Target project ID") parser.add_argument("--query", default="PEDOT:PSS thermoelectric conductivity", help="Search query") parser.add_argument("--limit", type=int, default=5, help="Max papers per source") parser.add_argument("--strategy", choices=["simple", "paperqa"], default="simple", help="Extraction strategy") parser.add_argument("--model-provider", default="openai_compatible", help="Model provider name") parser.add_argument("--model-name", default="gpt-oss:latest", help="Model name") parser.add_argument("--save-mode", choices=["sqlite", "files"], default="sqlite", help="Result sink mode") parser.add_argument("--no-save", action="store_true", help="Do not persist result to sqlite") parser.add_argument("--manual-upload-dir", default="data/literature/manual_uploads", help="Reserved for batch manual upload") args = parser.parse_args() db = get_database("data/app.db") project_repo = ProjectRepo(db) point_repo = DataPointRepo(db) query_repo = QuerySessionRepo(db) query_intent = QueryIntentService(query_repo) pipeline = LiteraturePipeline(db_path="data/app.db") target_project_id = resolve_project_id(args.project_id, project_repo) project = project_repo.get_project(target_project_id) print("=" * 64) print("Project-Based Literature Mining") print(f"Project: {project['name']} ({target_project_id})") print(f"Query: {args.query}") print(f"Limit per source: {args.limit}") print(f"Strategy: {args.strategy}") print("=" * 64) query_session = query_intent.analyze_and_store(target_project_id, args.query) suggestions = json.loads(query_session.get("suggestions_json") or "[]") if suggestions: print("Query suggestions:") for s in suggestions: print(f" - {s}") if query_session.get("clarification_required"): print("Note: query marked as pending_clarification. Continuing by CLI override.") if args.no_save: discovered = pipeline.run_discovery(target_project_id, args.query, args.limit) retrieved = pipeline.run_retrieval(target_project_id, discovered) stats = pipeline.run_extraction( target_project_id, run_id=None, paper_rows=retrieved, strategy=args.strategy, model_name=args.model_name, use_full_text=True, ) print(f"Extraction complete without DB run record: {stats}") else: result = pipeline.run_full_pipeline( project_id=target_project_id, query=args.query, limit=args.limit, strategy=args.strategy, model_provider=args.model_provider, model_name=args.model_name, use_full_text=True, ) print(f"Pipeline status: {result.get('status')}") if result.get("status") != "completed": print(f"Error: {result.get('error')}") else: print(json.dumps(result.get("stats", {}), indent=2)) points = point_repo.list_points(target_project_id) if args.save_mode == "files": run_dir = Path("data/literature/runs") export_points_to_files(target_project_id, points, run_dir) print("=" * 64) print("Done.") print("=" * 64) if __name__ == "__main__": main()