File size: 5,619 Bytes
d2bfe97 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 | """
pipeline.py
Orchestrates fetch -> clean -> summarize in a single pipeline call.
Author: algorembrant
"""
from __future__ import annotations
import os
import sys
from dataclasses import dataclass, field
from typing import Optional
from fetcher import TranscriptResult, fetch, extract_video_id
from cleaner import clean
from summarizer import summarize
from config import DEFAULT_MODEL, QUALITY_MODEL
# ---------------------------------------------------------------------------
# Result container
# ---------------------------------------------------------------------------
@dataclass
class PipelineResult:
video_id: str
raw: str = ""
cleaned: str = ""
summary: str = ""
errors: list[str] = field(default_factory=list)
@property
def success(self) -> bool:
return not self.errors
# ---------------------------------------------------------------------------
# Single-video pipeline
# ---------------------------------------------------------------------------
def run(
url_or_id: str,
languages: list[str] | None = None,
do_clean: bool = False,
do_summarize: bool = False,
summary_mode: str = "brief",
model: str = DEFAULT_MODEL,
quality: bool = False,
stream: bool = True,
output_dir: str | None = None,
transcript_format: str = "text",
timestamps: bool = False,
) -> PipelineResult:
"""
Full pipeline for one video.
Args:
url_or_id: YouTube URL or video ID.
languages: Language preference list.
do_clean: Run paragraph cleaner.
do_summarize: Run summarizer.
summary_mode: One of 'brief', 'detailed', 'bullets', 'outline'.
model: Anthropic model identifier.
quality: Use the higher-quality model instead of the default fast one.
stream: Stream AI tokens to stderr.
output_dir: Directory to write output files (optional).
transcript_format: Raw transcript format: 'text', 'json', 'srt', 'vtt'.
timestamps: Include timestamps in plain-text transcript.
Returns:
PipelineResult with all produced artifacts.
"""
chosen_model = QUALITY_MODEL if quality else model
result = PipelineResult(video_id="")
# 1. Extract ID
try:
video_id = extract_video_id(url_or_id)
result.video_id = video_id
except ValueError as exc:
result.errors.append(str(exc))
return result
# 2. Fetch
print(f"\n[fetch] {video_id}", file=sys.stderr)
transcript: TranscriptResult = fetch(video_id, languages=languages)
result.raw = transcript.formatted(transcript_format, timestamps=timestamps)
plain_text = transcript.plain_text # always used as AI input
# 3. Clean
if do_clean:
print(f"\n[clean] Running paragraph cleaner...", file=sys.stderr)
try:
result.cleaned = clean(plain_text, model=chosen_model, stream=stream)
except Exception as exc:
result.errors.append(f"Cleaner error: {exc}")
# 4. Summarize
if do_summarize:
print(f"\n[summarize] Mode: {summary_mode}", file=sys.stderr)
# Prefer cleaned text if available
source_text = result.cleaned if result.cleaned else plain_text
try:
result.summary = summarize(
source_text, mode=summary_mode, model=chosen_model, stream=stream
)
except Exception as exc:
result.errors.append(f"Summarizer error: {exc}")
# 5. Save to disk
if output_dir:
_save(result, output_dir, transcript_format)
return result
def _save(result: PipelineResult, output_dir: str, fmt: str) -> None:
"""Write all non-empty artifacts to output_dir."""
os.makedirs(output_dir, exist_ok=True)
vid = result.video_id
ext_map = {"text": "txt", "json": "json", "srt": "srt", "vtt": "vtt"}
ext = ext_map.get(fmt, "txt")
files_written = []
if result.raw:
p = os.path.join(output_dir, f"{vid}_transcript.{ext}")
_write(p, result.raw)
files_written.append(p)
if result.cleaned:
p = os.path.join(output_dir, f"{vid}_cleaned.txt")
_write(p, result.cleaned)
files_written.append(p)
if result.summary:
p = os.path.join(output_dir, f"{vid}_summary.txt")
_write(p, result.summary)
files_written.append(p)
for path in files_written:
print(f"[saved] {path}", file=sys.stderr)
def _write(path: str, content: str) -> None:
with open(path, "w", encoding="utf-8") as f:
f.write(content)
# ---------------------------------------------------------------------------
# Batch pipeline
# ---------------------------------------------------------------------------
def run_batch(
urls_or_ids: list[str],
**kwargs,
) -> list[PipelineResult]:
"""
Run the pipeline for multiple videos sequentially.
All keyword arguments are forwarded to `run()`.
Returns a list of PipelineResult, one per video.
"""
results = []
total = len(urls_or_ids)
for i, url_or_id in enumerate(urls_or_ids, 1):
print(f"\n{'='*60}", file=sys.stderr)
print(f"[{i}/{total}] Processing: {url_or_id}", file=sys.stderr)
print(f"{'='*60}", file=sys.stderr)
r = run(url_or_id, **kwargs)
results.append(r)
return results
|