mediastorm / cli.py
remdms's picture
fix(cli): remove sync and serve commands referencing non-existent modules
b7f7f9a
import asyncio
import json
from pathlib import Path
import click
from mediastorm.config import CHROMADB_PATH
@click.group()
def cli():
"""MediaStorm RAG — Documentary archive search tool."""
pass
@cli.command()
@click.option("--uid", default=None, help="Ingest a single story by UID.")
@click.option("--cache-dir", default="./data/ingest_cache", help="Cache directory for fetched data.")
def ingest(uid: str | None, cache_dir: str):
"""Fetch and parse MediaStorm data."""
from mediastorm.ingest.pipeline import ingest_all, ingest_story
cache_path = Path(cache_dir)
async def _run():
if uid:
click.echo(f"Ingesting story {uid}...")
story = await ingest_story(uid)
cache_path.mkdir(parents=True, exist_ok=True)
(cache_path / f"{uid}.json").write_text(
json.dumps(story, ensure_ascii=False, indent=2)
)
click.echo(f"Done. {len(story.get('videos', []))} videos found.")
else:
click.echo("Ingesting all stories...")
stories = await ingest_all(cache_dir=cache_path)
click.echo(f"Done. {len(stories)} stories ingested.")
asyncio.run(_run())
@cli.command()
@click.option("--cache-dir", default="./data/ingest_cache", help="Directory with ingested data.")
@click.option("--enrichment-dir", default="./data/enrichments", help="Cache for enrichment results.")
def enrich(cache_dir: str, enrichment_dir: str):
"""Enrich ingested stories with LLM-extracted metadata."""
from mediastorm.enrich.enricher import enrich_story as _enrich
cache_path = Path(cache_dir)
enrichment_path = Path(enrichment_dir)
enrichment_path.mkdir(parents=True, exist_ok=True)
if not cache_path.exists():
click.echo("No ingested data found. Run 'ingest' first.")
return
files = list(cache_path.glob("*.json"))
click.echo(f"Enriching {len(files)} stories...")
async def _run():
for i, f in enumerate(files):
story = json.loads(f.read_text())
uid = story["uid"]
# Combine text for enrichment
description = story.get("short_description", "") + " " + story.get("long_description", "")
transcript = ""
for video in story.get("videos", []):
for turn in video.get("speaker_turns", []):
speaker = turn.get("speaker", "")
text = turn.get("text", "")
transcript += f"{speaker}: {text}\n" if speaker else f"{text}\n"
credits_str = ", ".join(
f"{c.get('name', '')} ({c.get('role', '')})"
for c in story.get("credits", [])
if c.get("name")
)
try:
result = await _enrich(
title=story.get("name", ""),
description=description.strip(),
transcript=transcript.strip(),
cache_dir=enrichment_path,
story_uid=uid,
credits=credits_str,
)
click.echo(f" [{i+1}/{len(files)}] {story.get('name', uid)}{len(result.topics)} topics")
except Exception as e:
click.echo(f" [{i+1}/{len(files)}] FAIL {story.get('name', uid)}: {e}")
asyncio.run(_run())
click.echo("Enrichment complete.")
@cli.command()
@click.option("--cache-dir", default="./data/ingest_cache")
@click.option("--enrichment-dir", default="./data/enrichments")
@click.option("--db-path", default=None, help="ChromaDB path (default from config).")
def vectorize(cache_dir: str, enrichment_dir: str, db_path: str | None):
"""Embed and index stories into ChromaDB + BM25."""
from mediastorm.enrich.schemas import StoryEnrichment
from mediastorm.vectorize.chunker import chunk_story
from mediastorm.vectorize.embedder import Embedder
from mediastorm.vectorize.store import VectorStore
from mediastorm.vectorize.bm25_store import BM25Store
from mediastorm.config import BM25_INDEX_PATH
cache_path = Path(cache_dir)
enrichment_path = Path(enrichment_dir)
chromadb_path = Path(db_path) if db_path else CHROMADB_PATH
if not cache_path.exists():
click.echo("No ingested data. Run 'ingest' first.")
return
embedder = Embedder()
store = VectorStore(path=chromadb_path)
files = list(cache_path.glob("*.json"))
click.echo(f"Vectorizing {len(files)} stories...")
bm25_docs = []
for i, f in enumerate(files):
story = json.loads(f.read_text())
uid = story["uid"]
# Load enrichment (handle schema changes gracefully)
enrichment_file = enrichment_path / f"{uid}.json"
if enrichment_file.exists():
try:
enrichment = StoryEnrichment.model_validate_json(enrichment_file.read_text())
except Exception:
enrichment = StoryEnrichment(summary="")
else:
enrichment = StoryEnrichment(summary="")
# Chunk story
story_doc = chunk_story(story, enrichment)
story_doc["embedding"] = embedder.embed_texts([story_doc["text"]])[0]
store.upsert_stories([story_doc])
# Collect for BM25
bm25_docs.append({"id": story_doc["id"], "text": story_doc["text"]})
click.echo(f" [{i+1}/{len(files)}] {story.get('name', uid)}")
# Build and save BM25 index
bm25 = BM25Store(path=BM25_INDEX_PATH)
bm25.build(bm25_docs)
bm25.save()
click.echo(f"Done. Stories: {store.stories_count()}, BM25 docs: {len(bm25_docs)}")
@cli.command()
@click.option("--subset", default=0, help="Limit to N stories (0 = all).")
@click.option("--verbose", is_flag=True, help="Verbose output.")
def audit(subset: int, verbose: bool):
"""Run pipeline audit/benchmark."""
from audit import run_audit
asyncio.run(run_audit(subset=subset, verbose=verbose))
@cli.command(name="eval")
@click.option("--verbose", "-v", is_flag=True, help="Show per-query details.")
@click.option("--history", is_flag=True, help="Show history of past runs.")
@click.option("--pipeline", is_flag=True, help="Eval full pipeline (retriever + Gemini filter). Requires GEMINI_API_KEY.")
def eval_cmd(verbose: bool, history: bool, pipeline: bool):
"""Run retrieval evaluation and compare to previous run."""
from mediastorm.eval.runner import (
_build_run_data, save_run, load_previous_run, load_all_runs,
)
from mediastorm.eval.display import (
print_scores, print_verbose, print_diff, print_history,
)
if history:
runs = load_all_runs()
print_history(runs)
return
# Load previous run BEFORE running eval (so the new run isn't compared to itself)
previous = load_previous_run()
# Run evaluation
from eval_retrieval import run_eval
mode = "pipeline (retriever + Gemini)" if pipeline else "retriever only"
click.echo(f"Running retrieval evaluation ({mode})...")
eval_result = asyncio.run(run_eval(verbose=False, quiet=True, pipeline=pipeline))
# Build and save run data
run_data = _build_run_data(eval_result)
path = save_run(run_data)
click.echo(f"Results saved to {path}")
# Display
if verbose:
print_verbose(run_data)
print_scores(run_data)
if previous:
print_diff(run_data, previous)
else:
click.echo("\nFirst run — no comparison available.")
if __name__ == "__main__":
cli()