File size: 5,619 Bytes
d2bfe97
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
"""
pipeline.py
Orchestrates fetch -> clean -> summarize in a single pipeline call.
Author: algorembrant
"""

from __future__ import annotations

import os
import sys
from dataclasses import dataclass, field
from typing import Optional

from fetcher import TranscriptResult, fetch, extract_video_id
from cleaner import clean
from summarizer import summarize
from config import DEFAULT_MODEL, QUALITY_MODEL


# ---------------------------------------------------------------------------
# Result container
# ---------------------------------------------------------------------------

@dataclass
class PipelineResult:
    video_id:   str
    raw:        str          = ""
    cleaned:    str          = ""
    summary:    str          = ""
    errors:     list[str]    = field(default_factory=list)

    @property
    def success(self) -> bool:
        return not self.errors


# ---------------------------------------------------------------------------
# Single-video pipeline
# ---------------------------------------------------------------------------

def run(
    url_or_id: str,
    languages: list[str] | None     = None,
    do_clean: bool                  = False,
    do_summarize: bool              = False,
    summary_mode: str               = "brief",
    model: str                      = DEFAULT_MODEL,
    quality: bool                   = False,
    stream: bool                    = True,
    output_dir: str | None          = None,
    transcript_format: str          = "text",
    timestamps: bool                = False,
) -> PipelineResult:
    """
    Full pipeline for one video.

    Args:
        url_or_id:         YouTube URL or video ID.
        languages:         Language preference list.
        do_clean:          Run paragraph cleaner.
        do_summarize:      Run summarizer.
        summary_mode:      One of 'brief', 'detailed', 'bullets', 'outline'.
        model:             Anthropic model identifier.
        quality:           Use the higher-quality model instead of the default fast one.
        stream:            Stream AI tokens to stderr.
        output_dir:        Directory to write output files (optional).
        transcript_format: Raw transcript format: 'text', 'json', 'srt', 'vtt'.
        timestamps:        Include timestamps in plain-text transcript.

    Returns:
        PipelineResult with all produced artifacts.
    """
    chosen_model = QUALITY_MODEL if quality else model
    result       = PipelineResult(video_id="")

    # 1. Extract ID
    try:
        video_id      = extract_video_id(url_or_id)
        result.video_id = video_id
    except ValueError as exc:
        result.errors.append(str(exc))
        return result

    # 2. Fetch
    print(f"\n[fetch] {video_id}", file=sys.stderr)
    transcript: TranscriptResult = fetch(video_id, languages=languages)
    result.raw = transcript.formatted(transcript_format, timestamps=timestamps)
    plain_text  = transcript.plain_text   # always used as AI input

    # 3. Clean
    if do_clean:
        print(f"\n[clean] Running paragraph cleaner...", file=sys.stderr)
        try:
            result.cleaned = clean(plain_text, model=chosen_model, stream=stream)
        except Exception as exc:
            result.errors.append(f"Cleaner error: {exc}")

    # 4. Summarize
    if do_summarize:
        print(f"\n[summarize] Mode: {summary_mode}", file=sys.stderr)
        # Prefer cleaned text if available
        source_text = result.cleaned if result.cleaned else plain_text
        try:
            result.summary = summarize(
                source_text, mode=summary_mode, model=chosen_model, stream=stream
            )
        except Exception as exc:
            result.errors.append(f"Summarizer error: {exc}")

    # 5. Save to disk
    if output_dir:
        _save(result, output_dir, transcript_format)

    return result


def _save(result: PipelineResult, output_dir: str, fmt: str) -> None:
    """Write all non-empty artifacts to output_dir."""
    os.makedirs(output_dir, exist_ok=True)
    vid = result.video_id

    ext_map = {"text": "txt", "json": "json", "srt": "srt", "vtt": "vtt"}
    ext = ext_map.get(fmt, "txt")

    files_written = []

    if result.raw:
        p = os.path.join(output_dir, f"{vid}_transcript.{ext}")
        _write(p, result.raw)
        files_written.append(p)

    if result.cleaned:
        p = os.path.join(output_dir, f"{vid}_cleaned.txt")
        _write(p, result.cleaned)
        files_written.append(p)

    if result.summary:
        p = os.path.join(output_dir, f"{vid}_summary.txt")
        _write(p, result.summary)
        files_written.append(p)

    for path in files_written:
        print(f"[saved] {path}", file=sys.stderr)


def _write(path: str, content: str) -> None:
    with open(path, "w", encoding="utf-8") as f:
        f.write(content)


# ---------------------------------------------------------------------------
# Batch pipeline
# ---------------------------------------------------------------------------

def run_batch(
    urls_or_ids: list[str],
    **kwargs,
) -> list[PipelineResult]:
    """
    Run the pipeline for multiple videos sequentially.
    All keyword arguments are forwarded to `run()`.

    Returns a list of PipelineResult, one per video.
    """
    results = []
    total   = len(urls_or_ids)
    for i, url_or_id in enumerate(urls_or_ids, 1):
        print(f"\n{'='*60}", file=sys.stderr)
        print(f"[{i}/{total}] Processing: {url_or_id}", file=sys.stderr)
        print(f"{'='*60}", file=sys.stderr)
        r = run(url_or_id, **kwargs)
        results.append(r)
    return results