import asyncio import json from pathlib import Path import click from mediastorm.config import CHROMADB_PATH @click.group() def cli(): """MediaStorm RAG — Documentary archive search tool.""" pass @cli.command() @click.option("--uid", default=None, help="Ingest a single story by UID.") @click.option("--cache-dir", default="./data/ingest_cache", help="Cache directory for fetched data.") def ingest(uid: str | None, cache_dir: str): """Fetch and parse MediaStorm data.""" from mediastorm.ingest.pipeline import ingest_all, ingest_story cache_path = Path(cache_dir) async def _run(): if uid: click.echo(f"Ingesting story {uid}...") story = await ingest_story(uid) cache_path.mkdir(parents=True, exist_ok=True) (cache_path / f"{uid}.json").write_text( json.dumps(story, ensure_ascii=False, indent=2) ) click.echo(f"Done. {len(story.get('videos', []))} videos found.") else: click.echo("Ingesting all stories...") stories = await ingest_all(cache_dir=cache_path) click.echo(f"Done. {len(stories)} stories ingested.") asyncio.run(_run()) @cli.command() @click.option("--cache-dir", default="./data/ingest_cache", help="Directory with ingested data.") @click.option("--enrichment-dir", default="./data/enrichments", help="Cache for enrichment results.") def enrich(cache_dir: str, enrichment_dir: str): """Enrich ingested stories with LLM-extracted metadata.""" from mediastorm.enrich.enricher import enrich_story as _enrich cache_path = Path(cache_dir) enrichment_path = Path(enrichment_dir) enrichment_path.mkdir(parents=True, exist_ok=True) if not cache_path.exists(): click.echo("No ingested data found. Run 'ingest' first.") return files = list(cache_path.glob("*.json")) click.echo(f"Enriching {len(files)} stories...") async def _run(): for i, f in enumerate(files): story = json.loads(f.read_text()) uid = story["uid"] # Combine text for enrichment description = story.get("short_description", "") + " " + story.get("long_description", "") transcript = "" for video in story.get("videos", []): for turn in video.get("speaker_turns", []): speaker = turn.get("speaker", "") text = turn.get("text", "") transcript += f"{speaker}: {text}\n" if speaker else f"{text}\n" credits_str = ", ".join( f"{c.get('name', '')} ({c.get('role', '')})" for c in story.get("credits", []) if c.get("name") ) try: result = await _enrich( title=story.get("name", ""), description=description.strip(), transcript=transcript.strip(), cache_dir=enrichment_path, story_uid=uid, credits=credits_str, ) click.echo(f" [{i+1}/{len(files)}] {story.get('name', uid)} — {len(result.topics)} topics") except Exception as e: click.echo(f" [{i+1}/{len(files)}] FAIL {story.get('name', uid)}: {e}") asyncio.run(_run()) click.echo("Enrichment complete.") @cli.command() @click.option("--cache-dir", default="./data/ingest_cache") @click.option("--enrichment-dir", default="./data/enrichments") @click.option("--db-path", default=None, help="ChromaDB path (default from config).") def vectorize(cache_dir: str, enrichment_dir: str, db_path: str | None): """Embed and index stories into ChromaDB + BM25.""" from mediastorm.enrich.schemas import StoryEnrichment from mediastorm.vectorize.chunker import chunk_story from mediastorm.vectorize.embedder import Embedder from mediastorm.vectorize.store import VectorStore from mediastorm.vectorize.bm25_store import BM25Store from mediastorm.config import BM25_INDEX_PATH cache_path = Path(cache_dir) enrichment_path = Path(enrichment_dir) chromadb_path = Path(db_path) if db_path else CHROMADB_PATH if not cache_path.exists(): click.echo("No ingested data. Run 'ingest' first.") return embedder = Embedder() store = VectorStore(path=chromadb_path) files = list(cache_path.glob("*.json")) click.echo(f"Vectorizing {len(files)} stories...") bm25_docs = [] for i, f in enumerate(files): story = json.loads(f.read_text()) uid = story["uid"] # Load enrichment (handle schema changes gracefully) enrichment_file = enrichment_path / f"{uid}.json" if enrichment_file.exists(): try: enrichment = StoryEnrichment.model_validate_json(enrichment_file.read_text()) except Exception: enrichment = StoryEnrichment(summary="") else: enrichment = StoryEnrichment(summary="") # Chunk story story_doc = chunk_story(story, enrichment) story_doc["embedding"] = embedder.embed_texts([story_doc["text"]])[0] store.upsert_stories([story_doc]) # Collect for BM25 bm25_docs.append({"id": story_doc["id"], "text": story_doc["text"]}) click.echo(f" [{i+1}/{len(files)}] {story.get('name', uid)}") # Build and save BM25 index bm25 = BM25Store(path=BM25_INDEX_PATH) bm25.build(bm25_docs) bm25.save() click.echo(f"Done. Stories: {store.stories_count()}, BM25 docs: {len(bm25_docs)}") @cli.command() @click.option("--subset", default=0, help="Limit to N stories (0 = all).") @click.option("--verbose", is_flag=True, help="Verbose output.") def audit(subset: int, verbose: bool): """Run pipeline audit/benchmark.""" from audit import run_audit asyncio.run(run_audit(subset=subset, verbose=verbose)) @cli.command(name="eval") @click.option("--verbose", "-v", is_flag=True, help="Show per-query details.") @click.option("--history", is_flag=True, help="Show history of past runs.") @click.option("--pipeline", is_flag=True, help="Eval full pipeline (retriever + Gemini filter). Requires GEMINI_API_KEY.") def eval_cmd(verbose: bool, history: bool, pipeline: bool): """Run retrieval evaluation and compare to previous run.""" from mediastorm.eval.runner import ( _build_run_data, save_run, load_previous_run, load_all_runs, ) from mediastorm.eval.display import ( print_scores, print_verbose, print_diff, print_history, ) if history: runs = load_all_runs() print_history(runs) return # Load previous run BEFORE running eval (so the new run isn't compared to itself) previous = load_previous_run() # Run evaluation from eval_retrieval import run_eval mode = "pipeline (retriever + Gemini)" if pipeline else "retriever only" click.echo(f"Running retrieval evaluation ({mode})...") eval_result = asyncio.run(run_eval(verbose=False, quiet=True, pipeline=pipeline)) # Build and save run data run_data = _build_run_data(eval_result) path = save_run(run_data) click.echo(f"Results saved to {path}") # Display if verbose: print_verbose(run_data) print_scores(run_data) if previous: print_diff(run_data, previous) else: click.echo("\nFirst run — no comparison available.") if __name__ == "__main__": cli()