File size: 5,451 Bytes
3f4ebee
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
#!/usr/bin/env python3
"""
Project-based literature mining CLI.

Examples:
  python scripts/run_literature_mining.py --query "PEDOT:PSS thermoelectric" --limit 5
  python scripts/run_literature_mining.py --project-id proj_xxx --query "P3HT conductivity" --save-mode files
"""
from __future__ import annotations

import argparse
import csv
import json
from pathlib import Path
from typing import Any, Dict, List

from dotenv import load_dotenv

from src.literature_service import (
    DataPointRepo,
    LiteraturePipeline,
    ProjectRepo,
    QueryIntentService,
    QuerySessionRepo,
    get_database,
)

load_dotenv()


def resolve_project_id(project_id: str | None, projects: ProjectRepo) -> str:
    if project_id:
        project = projects.get_project(project_id)
        if not project:
            raise ValueError(f"Project not found: {project_id}")
        return project_id

    existing = projects.list_projects()
    if existing:
        return existing[0]["id"]

    created = projects.create_project(
        name="Default Literature Project",
        description="Auto-created by run_literature_mining.py",
    )
    return created["id"]


def export_points_to_files(project_id: str, points: List[Dict[str, Any]], out_dir: Path) -> None:
    out_dir.mkdir(parents=True, exist_ok=True)

    jsonl_path = out_dir / "validated_points.jsonl"
    with jsonl_path.open("w", encoding="utf-8") as f:
        for row in points:
            f.write(json.dumps(row, ensure_ascii=False) + "\n")

    csv_path = out_dir / "validated_points.csv"
    if points:
        with csv_path.open("w", newline="", encoding="utf-8") as f:
            writer = csv.DictWriter(f, fieldnames=list(points[0].keys()))
            writer.writeheader()
            writer.writerows(points)
    else:
        csv_path.write_text("point_id,project_id\n", encoding="utf-8")

    print(f"Exported {len(points)} rows to:")
    print(f"  - {jsonl_path}")
    print(f"  - {csv_path}")


def main() -> None:
    parser = argparse.ArgumentParser(description="Project-based Literature Mining CLI")
    parser.add_argument("--project-id", default=None, help="Target project ID")
    parser.add_argument("--query", default="PEDOT:PSS thermoelectric conductivity", help="Search query")
    parser.add_argument("--limit", type=int, default=5, help="Max papers per source")
    parser.add_argument("--strategy", choices=["simple", "paperqa"], default="simple", help="Extraction strategy")
    parser.add_argument("--model-provider", default="openai_compatible", help="Model provider name")
    parser.add_argument("--model-name", default="gpt-oss:latest", help="Model name")
    parser.add_argument("--save-mode", choices=["sqlite", "files"], default="sqlite", help="Result sink mode")
    parser.add_argument("--no-save", action="store_true", help="Do not persist result to sqlite")
    parser.add_argument("--manual-upload-dir", default="data/literature/manual_uploads", help="Reserved for batch manual upload")
    args = parser.parse_args()

    db = get_database("data/app.db")
    project_repo = ProjectRepo(db)
    point_repo = DataPointRepo(db)
    query_repo = QuerySessionRepo(db)
    query_intent = QueryIntentService(query_repo)
    pipeline = LiteraturePipeline(db_path="data/app.db")

    target_project_id = resolve_project_id(args.project_id, project_repo)
    project = project_repo.get_project(target_project_id)
    print("=" * 64)
    print("Project-Based Literature Mining")
    print(f"Project: {project['name']} ({target_project_id})")
    print(f"Query: {args.query}")
    print(f"Limit per source: {args.limit}")
    print(f"Strategy: {args.strategy}")
    print("=" * 64)

    query_session = query_intent.analyze_and_store(target_project_id, args.query)
    suggestions = json.loads(query_session.get("suggestions_json") or "[]")
    if suggestions:
        print("Query suggestions:")
        for s in suggestions:
            print(f"  - {s}")
    if query_session.get("clarification_required"):
        print("Note: query marked as pending_clarification. Continuing by CLI override.")

    if args.no_save:
        discovered = pipeline.run_discovery(target_project_id, args.query, args.limit)
        retrieved = pipeline.run_retrieval(target_project_id, discovered)
        stats = pipeline.run_extraction(
            target_project_id,
            run_id=None,
            paper_rows=retrieved,
            strategy=args.strategy,
            model_name=args.model_name,
            use_full_text=True,
        )
        print(f"Extraction complete without DB run record: {stats}")
    else:
        result = pipeline.run_full_pipeline(
            project_id=target_project_id,
            query=args.query,
            limit=args.limit,
            strategy=args.strategy,
            model_provider=args.model_provider,
            model_name=args.model_name,
            use_full_text=True,
        )
        print(f"Pipeline status: {result.get('status')}")
        if result.get("status") != "completed":
            print(f"Error: {result.get('error')}")
        else:
            print(json.dumps(result.get("stats", {}), indent=2))

    points = point_repo.list_points(target_project_id)
    if args.save_mode == "files":
        run_dir = Path("data/literature/runs")
        export_points_to_files(target_project_id, points, run_dir)

    print("=" * 64)
    print("Done.")
    print("=" * 64)


if __name__ == "__main__":
    main()