File size: 7,531 Bytes
ec53434
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bb84ada
 
 
 
 
 
 
 
 
 
 
 
ec53434
 
 
 
 
 
 
 
 
 
ddff88f
ec53434
ddff88f
ec53434
 
ddff88f
 
ec53434
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ddff88f
 
ec53434
 
 
 
ddff88f
ec53434
 
ddff88f
 
 
 
ec53434
ddff88f
ec53434
 
 
 
 
 
ddff88f
 
ec53434
 
 
ddff88f
 
 
 
 
 
ec53434
 
 
 
 
 
 
 
 
 
 
 
a36a4f5
 
 
69e1201
 
a36a4f5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
69e1201
 
 
a36a4f5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ec53434
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
import asyncio
import json
from pathlib import Path

import click

from mediastorm.config import CHROMADB_PATH


@click.group()
def cli():
    """MediaStorm RAG — Documentary archive search tool."""
    pass


@cli.command()
@click.option("--uid", default=None, help="Ingest a single story by UID.")
@click.option("--cache-dir", default="./data/ingest_cache", help="Cache directory for fetched data.")
def ingest(uid: str | None, cache_dir: str):
    """Fetch and parse MediaStorm data."""
    from mediastorm.ingest.pipeline import ingest_all, ingest_story

    cache_path = Path(cache_dir)

    async def _run():
        if uid:
            click.echo(f"Ingesting story {uid}...")
            story = await ingest_story(uid)
            cache_path.mkdir(parents=True, exist_ok=True)
            (cache_path / f"{uid}.json").write_text(
                json.dumps(story, ensure_ascii=False, indent=2)
            )
            click.echo(f"Done. {len(story.get('videos', []))} videos found.")
        else:
            click.echo("Ingesting all stories...")
            stories = await ingest_all(cache_dir=cache_path)
            click.echo(f"Done. {len(stories)} stories ingested.")

    asyncio.run(_run())


@cli.command()
@click.option("--cache-dir", default="./data/ingest_cache", help="Directory with ingested data.")
@click.option("--enrichment-dir", default="./data/enrichments", help="Cache for enrichment results.")
def enrich(cache_dir: str, enrichment_dir: str):
    """Enrich ingested stories with LLM-extracted metadata."""
    from mediastorm.enrich.enricher import enrich_story as _enrich

    cache_path = Path(cache_dir)
    enrichment_path = Path(enrichment_dir)
    enrichment_path.mkdir(parents=True, exist_ok=True)

    if not cache_path.exists():
        click.echo("No ingested data found. Run 'ingest' first.")
        return

    files = list(cache_path.glob("*.json"))
    click.echo(f"Enriching {len(files)} stories...")

    async def _run():
        for i, f in enumerate(files):
            story = json.loads(f.read_text())
            uid = story["uid"]

            # Combine text for enrichment
            description = story.get("short_description", "") + " " + story.get("long_description", "")
            transcript = ""
            for video in story.get("videos", []):
                for turn in video.get("speaker_turns", []):
                    speaker = turn.get("speaker", "")
                    text = turn.get("text", "")
                    transcript += f"{speaker}: {text}\n" if speaker else f"{text}\n"

            credits_str = ", ".join(
                f"{c.get('name', '')} ({c.get('role', '')})"
                for c in story.get("credits", [])
                if c.get("name")
            )
            try:
                result = await _enrich(
                    title=story.get("name", ""),
                    description=description.strip(),
                    transcript=transcript.strip(),
                    cache_dir=enrichment_path,
                    story_uid=uid,
                    credits=credits_str,
                )
                click.echo(f"  [{i+1}/{len(files)}] {story.get('name', uid)}{len(result.topics)} topics")
            except Exception as e:
                click.echo(f"  [{i+1}/{len(files)}] FAIL {story.get('name', uid)}: {e}")

    asyncio.run(_run())
    click.echo("Enrichment complete.")


@cli.command()
@click.option("--cache-dir", default="./data/ingest_cache")
@click.option("--enrichment-dir", default="./data/enrichments")
@click.option("--db-path", default=None, help="ChromaDB path (default from config).")
def vectorize(cache_dir: str, enrichment_dir: str, db_path: str | None):
    """Embed and index stories into ChromaDB + BM25."""
    from mediastorm.enrich.schemas import StoryEnrichment
    from mediastorm.vectorize.chunker import chunk_story
    from mediastorm.vectorize.embedder import Embedder
    from mediastorm.vectorize.store import VectorStore
    from mediastorm.vectorize.bm25_store import BM25Store
    from mediastorm.config import BM25_INDEX_PATH

    cache_path = Path(cache_dir)
    enrichment_path = Path(enrichment_dir)
    chromadb_path = Path(db_path) if db_path else CHROMADB_PATH

    if not cache_path.exists():
        click.echo("No ingested data. Run 'ingest' first.")
        return

    embedder = Embedder()
    store = VectorStore(path=chromadb_path)

    files = list(cache_path.glob("*.json"))
    click.echo(f"Vectorizing {len(files)} stories...")

    bm25_docs = []

    for i, f in enumerate(files):
        story = json.loads(f.read_text())
        uid = story["uid"]

        # Load enrichment (handle schema changes gracefully)
        enrichment_file = enrichment_path / f"{uid}.json"
        if enrichment_file.exists():
            try:
                enrichment = StoryEnrichment.model_validate_json(enrichment_file.read_text())
            except Exception:
                enrichment = StoryEnrichment(summary="")
        else:
            enrichment = StoryEnrichment(summary="")

        # Chunk story
        story_doc = chunk_story(story, enrichment)
        story_doc["embedding"] = embedder.embed_texts([story_doc["text"]])[0]
        store.upsert_stories([story_doc])

        # Collect for BM25
        bm25_docs.append({"id": story_doc["id"], "text": story_doc["text"]})

        click.echo(f"  [{i+1}/{len(files)}] {story.get('name', uid)}")

    # Build and save BM25 index
    bm25 = BM25Store(path=BM25_INDEX_PATH)
    bm25.build(bm25_docs)
    bm25.save()

    click.echo(f"Done. Stories: {store.stories_count()}, BM25 docs: {len(bm25_docs)}")



@cli.command()
@click.option("--subset", default=0, help="Limit to N stories (0 = all).")
@click.option("--verbose", is_flag=True, help="Verbose output.")
def audit(subset: int, verbose: bool):
    """Run pipeline audit/benchmark."""
    from audit import run_audit
    asyncio.run(run_audit(subset=subset, verbose=verbose))


@cli.command(name="eval")
@click.option("--verbose", "-v", is_flag=True, help="Show per-query details.")
@click.option("--history", is_flag=True, help="Show history of past runs.")
@click.option("--pipeline", is_flag=True, help="Eval full pipeline (retriever + Gemini filter). Requires GEMINI_API_KEY.")
def eval_cmd(verbose: bool, history: bool, pipeline: bool):
    """Run retrieval evaluation and compare to previous run."""
    from mediastorm.eval.runner import (
        _build_run_data, save_run, load_previous_run, load_all_runs,
    )
    from mediastorm.eval.display import (
        print_scores, print_verbose, print_diff, print_history,
    )

    if history:
        runs = load_all_runs()
        print_history(runs)
        return

    # Load previous run BEFORE running eval (so the new run isn't compared to itself)
    previous = load_previous_run()

    # Run evaluation
    from eval_retrieval import run_eval
    mode = "pipeline (retriever + Gemini)" if pipeline else "retriever only"
    click.echo(f"Running retrieval evaluation ({mode})...")
    eval_result = asyncio.run(run_eval(verbose=False, quiet=True, pipeline=pipeline))

    # Build and save run data
    run_data = _build_run_data(eval_result)
    path = save_run(run_data)
    click.echo(f"Results saved to {path}")

    # Display
    if verbose:
        print_verbose(run_data)
    print_scores(run_data)

    if previous:
        print_diff(run_data, previous)
    else:
        click.echo("\nFirst run — no comparison available.")


if __name__ == "__main__":
    cli()