Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Project-based literature mining CLI. | |
| Examples: | |
| python scripts/run_literature_mining.py --query "PEDOT:PSS thermoelectric" --limit 5 | |
| python scripts/run_literature_mining.py --project-id proj_xxx --query "P3HT conductivity" --save-mode files | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import csv | |
| import json | |
| from pathlib import Path | |
| from typing import Any, Dict, List | |
| from dotenv import load_dotenv | |
| from src.literature_service import ( | |
| DataPointRepo, | |
| LiteraturePipeline, | |
| ProjectRepo, | |
| QueryIntentService, | |
| QuerySessionRepo, | |
| get_database, | |
| ) | |
| load_dotenv() | |
| def resolve_project_id(project_id: str | None, projects: ProjectRepo) -> str: | |
| if project_id: | |
| project = projects.get_project(project_id) | |
| if not project: | |
| raise ValueError(f"Project not found: {project_id}") | |
| return project_id | |
| existing = projects.list_projects() | |
| if existing: | |
| return existing[0]["id"] | |
| created = projects.create_project( | |
| name="Default Literature Project", | |
| description="Auto-created by run_literature_mining.py", | |
| ) | |
| return created["id"] | |
| def export_points_to_files(project_id: str, points: List[Dict[str, Any]], out_dir: Path) -> None: | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| jsonl_path = out_dir / "validated_points.jsonl" | |
| with jsonl_path.open("w", encoding="utf-8") as f: | |
| for row in points: | |
| f.write(json.dumps(row, ensure_ascii=False) + "\n") | |
| csv_path = out_dir / "validated_points.csv" | |
| if points: | |
| with csv_path.open("w", newline="", encoding="utf-8") as f: | |
| writer = csv.DictWriter(f, fieldnames=list(points[0].keys())) | |
| writer.writeheader() | |
| writer.writerows(points) | |
| else: | |
| csv_path.write_text("point_id,project_id\n", encoding="utf-8") | |
| print(f"Exported {len(points)} rows to:") | |
| print(f" - {jsonl_path}") | |
| print(f" - {csv_path}") | |
| def main() -> None: | |
| parser = argparse.ArgumentParser(description="Project-based Literature Mining CLI") | |
| parser.add_argument("--project-id", default=None, help="Target project ID") | |
| parser.add_argument("--query", default="PEDOT:PSS thermoelectric conductivity", help="Search query") | |
| parser.add_argument("--limit", type=int, default=5, help="Max papers per source") | |
| parser.add_argument("--strategy", choices=["simple", "paperqa"], default="simple", help="Extraction strategy") | |
| parser.add_argument("--model-provider", default="openai_compatible", help="Model provider name") | |
| parser.add_argument("--model-name", default="gpt-oss:latest", help="Model name") | |
| parser.add_argument("--save-mode", choices=["sqlite", "files"], default="sqlite", help="Result sink mode") | |
| parser.add_argument("--no-save", action="store_true", help="Do not persist result to sqlite") | |
| parser.add_argument("--manual-upload-dir", default="data/literature/manual_uploads", help="Reserved for batch manual upload") | |
| args = parser.parse_args() | |
| db = get_database("data/app.db") | |
| project_repo = ProjectRepo(db) | |
| point_repo = DataPointRepo(db) | |
| query_repo = QuerySessionRepo(db) | |
| query_intent = QueryIntentService(query_repo) | |
| pipeline = LiteraturePipeline(db_path="data/app.db") | |
| target_project_id = resolve_project_id(args.project_id, project_repo) | |
| project = project_repo.get_project(target_project_id) | |
| print("=" * 64) | |
| print("Project-Based Literature Mining") | |
| print(f"Project: {project['name']} ({target_project_id})") | |
| print(f"Query: {args.query}") | |
| print(f"Limit per source: {args.limit}") | |
| print(f"Strategy: {args.strategy}") | |
| print("=" * 64) | |
| query_session = query_intent.analyze_and_store(target_project_id, args.query) | |
| suggestions = json.loads(query_session.get("suggestions_json") or "[]") | |
| if suggestions: | |
| print("Query suggestions:") | |
| for s in suggestions: | |
| print(f" - {s}") | |
| if query_session.get("clarification_required"): | |
| print("Note: query marked as pending_clarification. Continuing by CLI override.") | |
| if args.no_save: | |
| discovered = pipeline.run_discovery(target_project_id, args.query, args.limit) | |
| retrieved = pipeline.run_retrieval(target_project_id, discovered) | |
| stats = pipeline.run_extraction( | |
| target_project_id, | |
| run_id=None, | |
| paper_rows=retrieved, | |
| strategy=args.strategy, | |
| model_name=args.model_name, | |
| use_full_text=True, | |
| ) | |
| print(f"Extraction complete without DB run record: {stats}") | |
| else: | |
| result = pipeline.run_full_pipeline( | |
| project_id=target_project_id, | |
| query=args.query, | |
| limit=args.limit, | |
| strategy=args.strategy, | |
| model_provider=args.model_provider, | |
| model_name=args.model_name, | |
| use_full_text=True, | |
| ) | |
| print(f"Pipeline status: {result.get('status')}") | |
| if result.get("status") != "completed": | |
| print(f"Error: {result.get('error')}") | |
| else: | |
| print(json.dumps(result.get("stats", {}), indent=2)) | |
| points = point_repo.list_points(target_project_id) | |
| if args.save_mode == "files": | |
| run_dir = Path("data/literature/runs") | |
| export_points_to_files(target_project_id, points, run_dir) | |
| print("=" * 64) | |
| print("Done.") | |
| print("=" * 64) | |
| if __name__ == "__main__": | |
| main() | |