Spaces:
Sleeping
Sleeping
File size: 7,531 Bytes
ec53434 bb84ada ec53434 ddff88f ec53434 ddff88f ec53434 ddff88f ec53434 ddff88f ec53434 ddff88f ec53434 ddff88f ec53434 ddff88f ec53434 ddff88f ec53434 ddff88f ec53434 a36a4f5 69e1201 a36a4f5 69e1201 a36a4f5 ec53434 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 | import asyncio
import json
from pathlib import Path
import click
from mediastorm.config import CHROMADB_PATH
@click.group()
def cli():
"""MediaStorm RAG — Documentary archive search tool."""
pass
@cli.command()
@click.option("--uid", default=None, help="Ingest a single story by UID.")
@click.option("--cache-dir", default="./data/ingest_cache", help="Cache directory for fetched data.")
def ingest(uid: str | None, cache_dir: str):
"""Fetch and parse MediaStorm data."""
from mediastorm.ingest.pipeline import ingest_all, ingest_story
cache_path = Path(cache_dir)
async def _run():
if uid:
click.echo(f"Ingesting story {uid}...")
story = await ingest_story(uid)
cache_path.mkdir(parents=True, exist_ok=True)
(cache_path / f"{uid}.json").write_text(
json.dumps(story, ensure_ascii=False, indent=2)
)
click.echo(f"Done. {len(story.get('videos', []))} videos found.")
else:
click.echo("Ingesting all stories...")
stories = await ingest_all(cache_dir=cache_path)
click.echo(f"Done. {len(stories)} stories ingested.")
asyncio.run(_run())
@cli.command()
@click.option("--cache-dir", default="./data/ingest_cache", help="Directory with ingested data.")
@click.option("--enrichment-dir", default="./data/enrichments", help="Cache for enrichment results.")
def enrich(cache_dir: str, enrichment_dir: str):
"""Enrich ingested stories with LLM-extracted metadata."""
from mediastorm.enrich.enricher import enrich_story as _enrich
cache_path = Path(cache_dir)
enrichment_path = Path(enrichment_dir)
enrichment_path.mkdir(parents=True, exist_ok=True)
if not cache_path.exists():
click.echo("No ingested data found. Run 'ingest' first.")
return
files = list(cache_path.glob("*.json"))
click.echo(f"Enriching {len(files)} stories...")
async def _run():
for i, f in enumerate(files):
story = json.loads(f.read_text())
uid = story["uid"]
# Combine text for enrichment
description = story.get("short_description", "") + " " + story.get("long_description", "")
transcript = ""
for video in story.get("videos", []):
for turn in video.get("speaker_turns", []):
speaker = turn.get("speaker", "")
text = turn.get("text", "")
transcript += f"{speaker}: {text}\n" if speaker else f"{text}\n"
credits_str = ", ".join(
f"{c.get('name', '')} ({c.get('role', '')})"
for c in story.get("credits", [])
if c.get("name")
)
try:
result = await _enrich(
title=story.get("name", ""),
description=description.strip(),
transcript=transcript.strip(),
cache_dir=enrichment_path,
story_uid=uid,
credits=credits_str,
)
click.echo(f" [{i+1}/{len(files)}] {story.get('name', uid)} — {len(result.topics)} topics")
except Exception as e:
click.echo(f" [{i+1}/{len(files)}] FAIL {story.get('name', uid)}: {e}")
asyncio.run(_run())
click.echo("Enrichment complete.")
@cli.command()
@click.option("--cache-dir", default="./data/ingest_cache")
@click.option("--enrichment-dir", default="./data/enrichments")
@click.option("--db-path", default=None, help="ChromaDB path (default from config).")
def vectorize(cache_dir: str, enrichment_dir: str, db_path: str | None):
"""Embed and index stories into ChromaDB + BM25."""
from mediastorm.enrich.schemas import StoryEnrichment
from mediastorm.vectorize.chunker import chunk_story
from mediastorm.vectorize.embedder import Embedder
from mediastorm.vectorize.store import VectorStore
from mediastorm.vectorize.bm25_store import BM25Store
from mediastorm.config import BM25_INDEX_PATH
cache_path = Path(cache_dir)
enrichment_path = Path(enrichment_dir)
chromadb_path = Path(db_path) if db_path else CHROMADB_PATH
if not cache_path.exists():
click.echo("No ingested data. Run 'ingest' first.")
return
embedder = Embedder()
store = VectorStore(path=chromadb_path)
files = list(cache_path.glob("*.json"))
click.echo(f"Vectorizing {len(files)} stories...")
bm25_docs = []
for i, f in enumerate(files):
story = json.loads(f.read_text())
uid = story["uid"]
# Load enrichment (handle schema changes gracefully)
enrichment_file = enrichment_path / f"{uid}.json"
if enrichment_file.exists():
try:
enrichment = StoryEnrichment.model_validate_json(enrichment_file.read_text())
except Exception:
enrichment = StoryEnrichment(summary="")
else:
enrichment = StoryEnrichment(summary="")
# Chunk story
story_doc = chunk_story(story, enrichment)
story_doc["embedding"] = embedder.embed_texts([story_doc["text"]])[0]
store.upsert_stories([story_doc])
# Collect for BM25
bm25_docs.append({"id": story_doc["id"], "text": story_doc["text"]})
click.echo(f" [{i+1}/{len(files)}] {story.get('name', uid)}")
# Build and save BM25 index
bm25 = BM25Store(path=BM25_INDEX_PATH)
bm25.build(bm25_docs)
bm25.save()
click.echo(f"Done. Stories: {store.stories_count()}, BM25 docs: {len(bm25_docs)}")
@cli.command()
@click.option("--subset", default=0, help="Limit to N stories (0 = all).")
@click.option("--verbose", is_flag=True, help="Verbose output.")
def audit(subset: int, verbose: bool):
"""Run pipeline audit/benchmark."""
from audit import run_audit
asyncio.run(run_audit(subset=subset, verbose=verbose))
@cli.command(name="eval")
@click.option("--verbose", "-v", is_flag=True, help="Show per-query details.")
@click.option("--history", is_flag=True, help="Show history of past runs.")
@click.option("--pipeline", is_flag=True, help="Eval full pipeline (retriever + Gemini filter). Requires GEMINI_API_KEY.")
def eval_cmd(verbose: bool, history: bool, pipeline: bool):
"""Run retrieval evaluation and compare to previous run."""
from mediastorm.eval.runner import (
_build_run_data, save_run, load_previous_run, load_all_runs,
)
from mediastorm.eval.display import (
print_scores, print_verbose, print_diff, print_history,
)
if history:
runs = load_all_runs()
print_history(runs)
return
# Load previous run BEFORE running eval (so the new run isn't compared to itself)
previous = load_previous_run()
# Run evaluation
from eval_retrieval import run_eval
mode = "pipeline (retriever + Gemini)" if pipeline else "retriever only"
click.echo(f"Running retrieval evaluation ({mode})...")
eval_result = asyncio.run(run_eval(verbose=False, quiet=True, pipeline=pipeline))
# Build and save run data
run_data = _build_run_data(eval_result)
path = save_run(run_data)
click.echo(f"Results saved to {path}")
# Display
if verbose:
print_verbose(run_data)
print_scores(run_data)
if previous:
print_diff(run_data, previous)
else:
click.echo("\nFirst run — no comparison available.")
if __name__ == "__main__":
cli()
|