| | """ |
| | pipeline.py |
| | Orchestrates fetch -> clean -> summarize in a single pipeline call. |
| | Author: algorembrant |
| | """ |
| |
|
| | from __future__ import annotations |
| |
|
| | import os |
| | import sys |
| | from dataclasses import dataclass, field |
| | from typing import Optional |
| |
|
| | from fetcher import TranscriptResult, fetch, extract_video_id |
| | from cleaner import clean |
| | from summarizer import summarize |
| | from config import DEFAULT_MODEL, QUALITY_MODEL |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | @dataclass |
| | class PipelineResult: |
| | video_id: str |
| | raw: str = "" |
| | cleaned: str = "" |
| | summary: str = "" |
| | errors: list[str] = field(default_factory=list) |
| |
|
| | @property |
| | def success(self) -> bool: |
| | return not self.errors |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def run( |
| | url_or_id: str, |
| | languages: list[str] | None = None, |
| | do_clean: bool = False, |
| | do_summarize: bool = False, |
| | summary_mode: str = "brief", |
| | model: str = DEFAULT_MODEL, |
| | quality: bool = False, |
| | stream: bool = True, |
| | output_dir: str | None = None, |
| | transcript_format: str = "text", |
| | timestamps: bool = False, |
| | ) -> PipelineResult: |
| | """ |
| | Full pipeline for one video. |
| | |
| | Args: |
| | url_or_id: YouTube URL or video ID. |
| | languages: Language preference list. |
| | do_clean: Run paragraph cleaner. |
| | do_summarize: Run summarizer. |
| | summary_mode: One of 'brief', 'detailed', 'bullets', 'outline'. |
| | model: Anthropic model identifier. |
| | quality: Use the higher-quality model instead of the default fast one. |
| | stream: Stream AI tokens to stderr. |
| | output_dir: Directory to write output files (optional). |
| | transcript_format: Raw transcript format: 'text', 'json', 'srt', 'vtt'. |
| | timestamps: Include timestamps in plain-text transcript. |
| | |
| | Returns: |
| | PipelineResult with all produced artifacts. |
| | """ |
| | chosen_model = QUALITY_MODEL if quality else model |
| | result = PipelineResult(video_id="") |
| |
|
| | |
| | try: |
| | video_id = extract_video_id(url_or_id) |
| | result.video_id = video_id |
| | except ValueError as exc: |
| | result.errors.append(str(exc)) |
| | return result |
| |
|
| | |
| | print(f"\n[fetch] {video_id}", file=sys.stderr) |
| | transcript: TranscriptResult = fetch(video_id, languages=languages) |
| | result.raw = transcript.formatted(transcript_format, timestamps=timestamps) |
| | plain_text = transcript.plain_text |
| |
|
| | |
| | if do_clean: |
| | print(f"\n[clean] Running paragraph cleaner...", file=sys.stderr) |
| | try: |
| | result.cleaned = clean(plain_text, model=chosen_model, stream=stream) |
| | except Exception as exc: |
| | result.errors.append(f"Cleaner error: {exc}") |
| |
|
| | |
| | if do_summarize: |
| | print(f"\n[summarize] Mode: {summary_mode}", file=sys.stderr) |
| | |
| | source_text = result.cleaned if result.cleaned else plain_text |
| | try: |
| | result.summary = summarize( |
| | source_text, mode=summary_mode, model=chosen_model, stream=stream |
| | ) |
| | except Exception as exc: |
| | result.errors.append(f"Summarizer error: {exc}") |
| |
|
| | |
| | if output_dir: |
| | _save(result, output_dir, transcript_format) |
| |
|
| | return result |
| |
|
| |
|
| | def _save(result: PipelineResult, output_dir: str, fmt: str) -> None: |
| | """Write all non-empty artifacts to output_dir.""" |
| | os.makedirs(output_dir, exist_ok=True) |
| | vid = result.video_id |
| |
|
| | ext_map = {"text": "txt", "json": "json", "srt": "srt", "vtt": "vtt"} |
| | ext = ext_map.get(fmt, "txt") |
| |
|
| | files_written = [] |
| |
|
| | if result.raw: |
| | p = os.path.join(output_dir, f"{vid}_transcript.{ext}") |
| | _write(p, result.raw) |
| | files_written.append(p) |
| |
|
| | if result.cleaned: |
| | p = os.path.join(output_dir, f"{vid}_cleaned.txt") |
| | _write(p, result.cleaned) |
| | files_written.append(p) |
| |
|
| | if result.summary: |
| | p = os.path.join(output_dir, f"{vid}_summary.txt") |
| | _write(p, result.summary) |
| | files_written.append(p) |
| |
|
| | for path in files_written: |
| | print(f"[saved] {path}", file=sys.stderr) |
| |
|
| |
|
| | def _write(path: str, content: str) -> None: |
| | with open(path, "w", encoding="utf-8") as f: |
| | f.write(content) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def run_batch( |
| | urls_or_ids: list[str], |
| | **kwargs, |
| | ) -> list[PipelineResult]: |
| | """ |
| | Run the pipeline for multiple videos sequentially. |
| | All keyword arguments are forwarded to `run()`. |
| | |
| | Returns a list of PipelineResult, one per video. |
| | """ |
| | results = [] |
| | total = len(urls_or_ids) |
| | for i, url_or_id in enumerate(urls_or_ids, 1): |
| | print(f"\n{'='*60}", file=sys.stderr) |
| | print(f"[{i}/{total}] Processing: {url_or_id}", file=sys.stderr) |
| | print(f"{'='*60}", file=sys.stderr) |
| | r = run(url_or_id, **kwargs) |
| | results.append(r) |
| | return results |
| |
|