File size: 36,409 Bytes
4b5adfb
003d0a1
 
8813304
 
 
94f9a6d
 
87d01c8
dc249d2
cbed3ed
 
bd357e4
6aee0d2
 
 
dc249d2
94f9a6d
 
 
4b5adfb
 
 
 
347bcfa
8fd886e
 
4b5adfb
 
 
 
 
 
 
dc249d2
6aee0d2
 
 
 
94f9a6d
 
87d01c8
4b5adfb
571ece6
f9920b4
d842b52
003d0a1
8813304
d842b52
003d0a1
 
 
8813304
 
 
 
24aa5de
6aee0d2
8813304
6aee0d2
 
d842b52
87d01c8
 
 
6aee0d2
 
 
 
 
 
 
8813304
6aee0d2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
571ece6
3d4b8b5
6aee0d2
0f56eef
cbed3ed
8813304
6aee0d2
 
 
 
8813304
 
 
87d01c8
4b5adfb
dc249d2
 
8813304
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dc249d2
 
 
 
 
 
 
4b5adfb
dc249d2
6aee0d2
8813304
4b5adfb
6aee0d2
4b5adfb
 
 
 
 
6aee0d2
 
571ece6
f9920b4
 
571ece6
 
 
f9920b4
571ece6
 
f9920b4
 
 
 
 
571ece6
 
 
f9920b4
 
 
 
571ece6
 
 
5c13043
 
 
f9920b4
261378a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f9920b4
571ece6
3d4b8b5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6aee0d2
 
 
 
 
 
 
 
 
 
 
 
 
 
cbed3ed
 
 
 
8813304
 
cbed3ed
 
 
6aee0d2
 
 
347bcfa
 
 
 
 
 
 
 
 
 
 
 
6aee0d2
347bcfa
 
 
 
 
6aee0d2
347bcfa
 
 
 
6aee0d2
 
8813304
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cbed3ed
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8fd886e
d842b52
cbed3ed
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8813304
 
 
 
 
cbed3ed
 
 
 
 
 
 
 
 
 
8813304
 
 
 
 
cbed3ed
 
 
 
 
 
 
 
 
 
 
 
5c13043
 
 
 
 
 
 
cbed3ed
bd357e4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8fd886e
d842b52
bd357e4
d842b52
5c13043
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d842b52
 
 
24aa5de
d842b52
 
 
 
 
 
 
 
 
 
 
 
 
8813304
d842b52
 
 
 
 
 
 
 
 
 
24aa5de
 
 
 
 
 
 
 
 
 
 
 
 
8813304
24aa5de
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8813304
24aa5de
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8813304
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5c13043
8813304
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d842b52
5c13043
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d842b52
8fd886e
003d0a1
 
 
 
 
 
 
8fd886e
 
 
 
 
 
 
24aa5de
8fd886e
003d0a1
8fd886e
 
 
8813304
 
24aa5de
 
 
 
 
 
 
 
 
 
 
8813304
 
 
 
 
 
 
 
 
 
 
 
 
 
 
24aa5de
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8fd886e
003d0a1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8813304
003d0a1
 
8813304
003d0a1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d842b52
8813304
 
 
 
 
 
d842b52
 
 
 
 
 
003d0a1
 
 
 
 
 
 
 
 
d842b52
 
 
 
003d0a1
d842b52
 
 
 
 
0f56eef
bd357e4
 
 
 
 
 
 
 
 
87d01c8
 
9b94974
87d01c8
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
import json
import base64
import binascii
import html
import http.cookiejar
import inspect
import logging
import os
import re
import time
import requests
from curl_cffi import requests as curl_requests
from pathlib import Path
from typing import Callable, List, Tuple

from src.utils.config import settings


logger = logging.getLogger(__name__)

_FAST_FAIL_SSL_MARKERS = (
    "UNEXPECTED_EOF_WHILE_READING",
    "SSLEOFError",
    "EOF occurred in violation of protocol",
    "TLS",
    "TLS connect error",
    "invalid library",
)


def _is_fast_fail_ssl_error(exc: Exception) -> bool:
    error_text = str(exc)
    return any(marker in error_text for marker in _FAST_FAIL_SSL_MARKERS)


class TranscriptProviderError(RuntimeError):
    """Raised when a transcript provider cannot return usable transcript text."""


class YouTubeDownloader:
    def __init__(self):
        self._assemblyai_key = os.environ.get("ASSEMBLYAI_API_KEY", "").strip()
        self._supadata_key = os.environ.get("SUPADATA_API_KEY", "").strip()
        self._rapidapi_key = os.environ.get("RAPIDAPI_KEY", "").strip()
        self._rapidapi_host = os.environ.get("RAPIDAPI_HOST", "youtube-transcriptor.p.rapidapi.com").strip()
        self._youtube_cookies = os.environ.get("YOUTUBE_COOKIES", "").strip()
        self._youtube_cookies_b64 = os.environ.get("YOUTUBE_COOKIES_B64", "").strip()
        self._youtube_cookies_path = os.environ.get("YOUTUBE_COOKIES_PATH", "").strip()
        self._youtube_cookies_file = os.environ.get("YOUTUBE_COOKIES_FILE", "").strip()
        self._youtube_po_token = os.environ.get("YOUTUBE_PO_TOKEN", "").strip()
        self._youtube_po_token_client = os.environ.get("YOUTUBE_PO_TOKEN_CLIENT", "web").strip()
        self._youtube_po_token_context = os.environ.get("YOUTUBE_PO_TOKEN_CONTEXT", "gvs").strip()
        self._proxy_url = (
            os.environ.get("PROXY_URL", "").strip()
            or os.environ.get("YOUTUBE_PROXY", "").strip()
        )
        self._invidious_instances = self._load_invidious_instances()
        self._strategy = settings.youtube_transcript_strategy
        self._configure_proxy_environment()

        if self._strategy == "cookies_required":
            logger.info("Transcript strategy 'cookies_required' enabled.")

    def get_transcript(self, url: str) -> str:
        video_id = self._extract_video_id(url)
        logger.info("Transcript pipeline for video ID %s using strategy=%s", video_id, self._strategy)

        failures: List[str] = []
        providers = self._build_provider_plan()

        for index, (provider_name, provider) in enumerate(providers, start=1):
            try:
                logger.info("Trying transcript strategy: %s", provider_name)
                transcript = provider(video_id)
                if transcript:
                    return transcript
                raise TranscriptProviderError(f"{provider_name} returned empty transcript text.")
            except Exception as exc:
                failures.append(f"{provider_name}: {exc}")
                has_more_fallbacks = index < len(providers)
                if has_more_fallbacks:
                    logger.info("%s transcript provider unavailable, trying next fallback.", provider_name)
                else:
                    logger.error("All transcript providers failed for %s.", video_id)

        raise RuntimeError(
            f"All transcript strategies exhausted for {video_id}. "
            f"Failures: {' | '.join(failures)}"
        )

    def _build_provider_plan(self) -> List[Tuple[str, Callable[[str], str]]]:
        return [
            ("RapidAPI", self._get_transcript_via_rapidapi),
            ("RapidAPI-v2", self._get_transcript_via_rapidapi_v2),
            ("Supadata", self._get_transcript_via_supadata),
            ("YouTube Transcript API", self._get_transcript_via_youtube),
            ("yt-dlp", self._get_transcript_via_ytdlp),
            ("pytubefix captions", self._get_transcript_via_pytubefix),
        ]

    def _get_transcript_via_youtube(self, video_id: str) -> str:
        last_error: Exception | None = None
        languages = ["en", "ar", "en-US"]
        cookie_file = self._resolve_cookie_file()
        proxies = self._requests_proxies()

        for attempt in range(3):
            try:
                from youtube_transcript_api import YouTubeTranscriptApi

                if hasattr(YouTubeTranscriptApi, "get_transcript"):
                    kwargs = {"languages": languages}
                    if cookie_file:
                        kwargs["cookies"] = str(cookie_file)
                    if proxies:
                        kwargs["proxies"] = proxies

                    data = YouTubeTranscriptApi.get_transcript(video_id, **kwargs)
                    logger.info("YouTube Transcript API get_transcript succeeded on attempt %s", attempt + 1)
                    return self._join_transcript_entries(data)

                session = self._build_requests_session(cookie_file)
                api = YouTubeTranscriptApi(http_client=session)
                if hasattr(api, "fetch"):
                    data = api.fetch(video_id, languages=languages)
                    logger.info("YouTube Transcript API fetch succeeded on attempt %s", attempt + 1)
                    return self._join_transcript_entries(data)

                list_kwargs = {}
                if cookie_file:
                    list_kwargs["cookies"] = str(cookie_file)
                if proxies:
                    list_kwargs["proxies"] = proxies

                transcript_list = YouTubeTranscriptApi.list_transcripts(video_id, **list_kwargs)
                try:
                    transcript = transcript_list.find_manually_created_transcript(["en", "ar", "en-US"])
                except Exception:
                    try:
                        transcript = transcript_list.find_generated_transcript(["en", "ar", "en-US"])
                    except Exception:
                        transcript = next(iter(transcript_list))

                entries = transcript.fetch()
                logger.info("YouTube Transcript API succeeded on attempt %s", attempt + 1)
                return self._join_transcript_entries(entries)
            except Exception as exc:
                last_error = exc
                if _is_fast_fail_ssl_error(exc):
                    break
                if attempt < 2:
                    time.sleep(1.5 * (attempt + 1))

        raise TranscriptProviderError(f"YouTube Transcript API failed: {last_error}") from last_error

    def _get_transcript_via_rapidapi(self, video_id: str) -> str:
        import httpx

        if not self._rapidapi_key:
            raise TranscriptProviderError("RapidAPI key not configured.")

        url = f"https://{self._rapidapi_host}/transcript"
        headers = {
            "x-rapidapi-key": self._rapidapi_key,
            "x-rapidapi-host": self._rapidapi_host,
        }
        params = {
            "video_id": video_id,
            "lang": "en",
        }

        try:
            with httpx.Client(timeout=30) as client:
                response = client.get(url, headers=headers, params=params)
                response.raise_for_status()
                data = response.json()
        except Exception as exc:
            raise TranscriptProviderError(f"RapidAPI request failed: {exc}") from exc

        if isinstance(data, dict) and "error" in data:
            raise ValueError(f"RapidAPI: {data['error']}")

        if isinstance(data, list) and len(data) > 0:
            item = data[0]
            if isinstance(item, dict):
                # 1. FIRST try data[0]["transcriptionAsText"]
                full_text = item.get("transcriptionAsText", "")
                if full_text and str(full_text).strip():
                    logger.info("RapidAPI transcript (full text) fetched successfully (%d chars).", len(full_text.strip()))
                    return full_text.strip()
                
                # 2. FALLBACK to joining data[0]["transcription"][n]["subtitle"]
                transcription_list = item.get("transcription", [])
                if isinstance(transcription_list, list) and len(transcription_list) > 0:
                    transcript = " ".join(
                        str(seg.get("subtitle", "")) for seg in transcription_list if isinstance(seg, dict)
                    )
                    if transcript.strip():
                        logger.info("RapidAPI transcript (segments) fetched successfully (%d chars).", len(transcript.strip()))
                        return transcript.strip()

        # 3. If neither works, log the full raw response at WARNING level and raise
        logger.warning("RapidAPI raw response: %s", data)
        raise TranscriptProviderError("RapidAPI response did not contain usable transcript content.")

    def _get_transcript_via_rapidapi_v2(self, video_id: str) -> str:
        import httpx

        if not self._rapidapi_key:
            raise TranscriptProviderError("RapidAPI key not configured.")

        url = "https://youtube-transcripts-transcribe-youtube-video-to-text.p.rapidapi.com/transcribe"
        headers = {
            "x-rapidapi-key": self._rapidapi_key,
            "x-rapidapi-host": "youtube-transcripts-transcribe-youtube-video-to-text.p.rapidapi.com",
            "Content-Type": "application/json",
        }
        payload = {
            "url": f"https://www.youtube.com/watch?v={video_id}"
        }

        try:
            with httpx.Client(timeout=60) as client:
                response = client.post(url, headers=headers, json=payload)
                response.raise_for_status()
                data = response.json()
        except Exception as exc:
            raise TranscriptProviderError(f"RapidAPI-v2 request failed: {exc}") from exc

        # Log raw response for debugging
        logger.info(
            f"[RapidAPI-v2 DEBUG] status={response.status_code} "
            f"preview={str(data)[:300]}"
        )

        # Handle error responses
        if isinstance(data, dict) and "error" in data:
            raise ValueError(f"RapidAPI-v2: {data['error']}")

        # Response shape is typically:
        # {"transcript": "full text..."} 
        # OR {"segments": [{"text": "...", "start": 0.0}, ...]}
        # OR {"content": "full text..."}
        if isinstance(data, dict):
            for key in ("transcript", "content", "text", "result"):
                if data.get(key) and isinstance(data[key], str) and data[key].strip():
                    return data[key].strip()

            # Fallback: join segments array if present
            for key in ("segments", "transcription", "words"):
                if isinstance(data.get(key), list):
                    joined = " ".join(
                        seg.get("text", "") for seg in data[key]
                        if isinstance(seg, dict)
                    ).strip()
                    if joined:
                        return joined

        raise TranscriptProviderError("RapidAPI-v2 response did not contain usable transcript content.")

    def _get_transcript_via_supadata(self, video_id: str) -> str:
        if not self._supadata_key:
            raise TranscriptProviderError("Supadata API key not configured.")

        clean_url = f"https://www.youtube.com/watch?v={video_id}"
        headers = {
            "x-api-key": self._supadata_key,
            "User-Agent": (
                "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                "AppleWebKit/537.36 (KHTML, like Gecko) "
                "Chrome/120.0.0.0 Safari/537.36"
            ),
        }
        try:
            resp = curl_requests.get(
                f"https://api.supadata.ai/v1/youtube/transcript?url={clean_url}&text=true",
                headers=headers,
                impersonate="chrome124",
                timeout=30,
                proxies=self._requests_proxies() or None,
            )
            resp.raise_for_status()
            data = resp.json()
        except Exception as exc:
            raise TranscriptProviderError(f"Supadata request failed: {exc}") from exc

        # Handle both "content" (plain text) and "segments" (structured list)
        text = ""
        content_val = data.get("content")
        if isinstance(content_val, str) and content_val.strip():
            text = content_val.strip()
        elif isinstance(content_val, list):
            # If content is returned as a list of segments instead of text
            text = " ".join(
                s.get("text", "") for s in content_val if isinstance(s, dict)
            ).strip()
        
        # Fallback to "segments" key if content is empty
        if not text:
            segments = data.get("segments", [])
            if segments and isinstance(segments, list):
                text = " ".join(
                    s.get("text", "") for s in segments if isinstance(s, dict)
                ).strip()

        if not text:
            raise TranscriptProviderError("Supadata response did not include usable transcript content.")

        logger.info("Supadata transcript fetched successfully (%d chars).", len(text))
        return text

    def _get_transcript_via_pytubefix(self, video_id: str) -> str:
        url = f"https://www.youtube.com/watch?v={video_id}"

        try:
            from pytubefix import YouTube
        except Exception as exc:
            raise TranscriptProviderError(f"pytubefix import failed: {exc}") from exc

        try:
            init_kwargs = self._pytubefix_init_kwargs(YouTube)
            yt = YouTube(url, **init_kwargs)
            captions = getattr(yt, "captions", None)
            if not captions:
                raise TranscriptProviderError("pytubefix returned no captions.")

            caption = self._select_pytubefix_caption(captions)
            if caption is None:
                raise TranscriptProviderError("pytubefix found no preferred caption track.")

            text = self._caption_to_text(caption)
            if not text:
                raise TranscriptProviderError("pytubefix caption track was empty.")

            logger.info("pytubefix captions fetched successfully (%d chars).", len(text))
            return text
        except TranscriptProviderError:
            raise
        except Exception as exc:
            raise TranscriptProviderError(f"pytubefix captions failed: {exc}") from exc

    def _get_transcript_via_ytdlp(self, video_id: str) -> str:
        """
        Final fallback: uses yt-dlp which is most robust and supports POT tokens.
        """
        import yt_dlp
        url = f"https://www.youtube.com/watch?v={video_id}"
        
        # Configure yt-dlp to be quiet and only fetch metadata/subs
        ydl_opts = {
            'skip_download': True,
            'writesubtitles': True,
            'writeautomaticsubs': True,
            'subtitleslangs': ['en', 'ar', 'en-US'],
            'quiet': True,
            'no_warnings': True,
            'extract_flat': False,
        }
        self._apply_youtube_network_options(ydl_opts)
        self._apply_cookie_options(ydl_opts)

        try:
            with yt_dlp.YoutubeDL(ydl_opts) as ydl:
                info = ydl.extract_info(url, download=False)
                
                subtitles = info.get('subtitles') or {}
                auto_subs = info.get('automatic_captions') or {}
                
                # Preferred languages in order
                for lang in ['en', 'en-US', 'ar']:
                    # Try manual subs first, then auto
                    for source in [subtitles, auto_subs]:
                        if lang in source:
                            # Find a format we can parse (json3 is easiest, then vtt)
                            formats = source[lang]
                            # Try to find json3
                            json3_url = next((f['url'] for f in formats if f.get('ext') == 'json3'), None)
                            if json3_url:
                                resp = curl_requests.get(
                                    json3_url,
                                    impersonate="chrome124",
                                    proxies=self._requests_proxies() or None,
                                )
                                data = resp.json()
                                return " ".join(
                                    seg.get('utf8', '') 
                                    for event in data.get('events', []) 
                                    for seg in event.get('segs', [])
                                ).strip()
                            
                            # Fallback to vtt
                            vtt_url = next((f['url'] for f in formats if f.get('ext') == 'vtt'), None)
                            if vtt_url:
                                resp = curl_requests.get(
                                    vtt_url,
                                    impersonate="chrome124",
                                    proxies=self._requests_proxies() or None,
                                )
                                # Simple VTT parsing (strip tags and timestamps)
                                vtt_text = resp.text
                                lines = []
                                for line in vtt_text.splitlines():
                                    if '-->' not in line and line.strip() and not line.strip().isdigit() and line != 'WEBVTT':
                                        clean = re.sub(r'<[^>]+>', '', line).strip()
                                        if clean: lines.append(clean)
                                return " ".join(lines).strip()

                raise TranscriptProviderError("No usable subtitle formats found via yt-dlp.")
        except Exception as exc:
            raise TranscriptProviderError(f"yt-dlp failed: {exc}") from exc
        finally:
            cookiefile = ydl_opts.get("cookiefile")
            if cookiefile and os.path.exists(cookiefile) and ("tmp" in str(cookiefile).lower() or "temp" in str(cookiefile).lower()):
                try:
                    os.remove(cookiefile)
                except OSError:
                    pass

    def download_audio(self, url: str, output_stem: str) -> Path:
        """
        Download the best available audio stream for Whisper deep-scan fallback.
        """
        import yt_dlp

        settings.temp_dir.mkdir(parents=True, exist_ok=True)
        safe_stem = re.sub(r"[^A-Za-z0-9_-]+", "_", output_stem).strip("_") or "audio"
        output_template = str(settings.temp_dir / f"{safe_stem}.%(ext)s")
        expected_audio_path = settings.temp_dir / f"{safe_stem}.mp3"

        ydl_opts = {
            "format": "bestaudio/best",
            "outtmpl": output_template,
            "quiet": True,
            "no_warnings": True,
            "noplaylist": True,
            "postprocessors": [
                {
                    "key": "FFmpegExtractAudio",
                    "preferredcodec": "mp3",
                    "preferredquality": "128",
                }
            ],
        }
        self._apply_youtube_network_options(ydl_opts)
        self._apply_cookie_options(ydl_opts)

        failures: List[str] = []
        try:
            for provider_name, provider in self._build_audio_download_plan(ydl_opts):
                try:
                    provider(url, safe_stem)
                    break
                except Exception as exc:
                    failures.append(f"{provider_name}: {exc}")
                    logger.warning("%s audio extraction failed: %s", provider_name, exc)
            else:
                auth_hint = ""
                if self._looks_like_youtube_auth_block(failures) and not self._has_youtube_auth():
                    auth_hint = (
                        " YouTube authentication is required for this video/Space. "
                        "Set YOUTUBE_COOKIES_B64 (recommended) or YOUTUBE_COOKIES, "
                        "and optionally YOUTUBE_PO_TOKEN."
                    )
                raise RuntimeError(f"Audio extraction failed.{auth_hint} {' | '.join(failures)}")

            if expected_audio_path.exists() and expected_audio_path.stat().st_size > 0:
                logger.info("Audio extracted for deep scan: %s", expected_audio_path)
                return expected_audio_path

            matches = sorted(settings.temp_dir.glob(f"{safe_stem}.*"))
            for candidate in matches:
                if candidate.is_file() and candidate.stat().st_size > 0:
                    logger.info("Audio extracted for deep scan: %s", candidate)
                    return candidate

            raise RuntimeError("Audio extraction completed but no audio file was produced.")
        finally:
            cookiefile = ydl_opts.get("cookiefile")
            if cookiefile and os.path.exists(cookiefile) and ("tmp" in str(cookiefile).lower() or "temp" in str(cookiefile).lower()):
                try:
                    os.remove(cookiefile)
                except OSError:
                    pass
    def _build_audio_download_plan(self, ydl_opts: dict) -> List[Tuple[str, Callable[[str, str], None]]]:
        return [
            ("yt-dlp", lambda url, _safe_stem: self._download_audio_via_ytdlp(url, ydl_opts)),
            ("Invidious proxy", self._download_audio_via_invidious),
            ("pytubefix", self._download_audio_via_pytubefix),
        ]

    def _download_audio_via_ytdlp(self, url: str, ydl_opts: dict) -> None:
        import yt_dlp

        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            ydl.extract_info(url, download=True)

    def _download_audio_via_pytubefix(self, url: str, safe_stem: str) -> None:
        from pytubefix import YouTube

        try:
            yt = YouTube(url, **self._pytubefix_init_kwargs(YouTube))
            stream = yt.streams.filter(only_audio=True).order_by("abr").desc().first()
            if stream is None:
                raise RuntimeError("No audio stream returned by pytubefix.")
            stream.download(
                output_path=str(settings.temp_dir),
                filename=f"{safe_stem}.{stream.subtype or 'mp4'}",
            )
        except Exception as exc:
            raise RuntimeError(f"pytubefix failed: {exc}") from exc

    def _download_audio_via_invidious(self, url: str, safe_stem: str) -> None:
        video_id = self._extract_video_id(url)
        if not video_id or video_id == "unknown":
            raise RuntimeError("Could not extract video ID for Invidious fallback.")

        failures: List[str] = []
        for instance in self._invidious_instances:
            instance = instance.rstrip("/")
            try:
                api_url = f"{instance}/api/v1/videos/{video_id}"
                resp = requests.get(
                    api_url,
                    headers=self._browser_headers(),
                    proxies=self._requests_proxies() or None,
                    timeout=20,
                )
                resp.raise_for_status()
                data = resp.json()
                audio_formats = self._extract_invidious_audio_formats(data)
                if not audio_formats:
                    raise RuntimeError("No audio formats in Invidious response.")

                selected = audio_formats[0]
                itag = selected.get("itag")
                download_url = (
                    f"{instance}/latest_version?id={video_id}&itag={itag}&local=true"
                    if itag
                    else selected.get("url", "")
                )
                if not download_url:
                    raise RuntimeError("No downloadable audio URL in Invidious response.")

                extension = self._extension_from_mime(selected.get("type", "audio/webm"))
                output_path = settings.temp_dir / f"{safe_stem}.{extension}"
                self._stream_download(download_url, output_path)
                logger.info("Invidious audio extracted via %s: %s", instance, output_path)
                return
            except Exception as exc:
                failures.append(f"{instance}: {exc}")
                logger.warning("Invidious instance failed for audio extraction: %s", exc)

        raise RuntimeError("All Invidious instances failed. " + " | ".join(failures))

    def _extract_invidious_audio_formats(self, data: dict) -> List[dict]:
        formats = data.get("adaptiveFormats") or data.get("formatStreams") or []
        audio_formats = [
            item
            for item in formats
            if isinstance(item, dict)
            and str(item.get("type", "")).startswith("audio/")
            and (item.get("itag") or item.get("url"))
        ]
        return sorted(
            audio_formats,
            key=lambda item: int(item.get("bitrate") or item.get("bitrateBps") or 0),
            reverse=True,
        )

    def _stream_download(self, url: str, output_path: Path) -> None:
        with requests.get(
            url,
            headers=self._browser_headers(),
            proxies=self._requests_proxies() or None,
            stream=True,
            timeout=60,
        ) as resp:
            resp.raise_for_status()
            content_type = resp.headers.get("Content-Type", "").lower()
            if "text/html" in content_type or "application/json" in content_type:
                raise RuntimeError(f"Unexpected audio response type: {content_type}")

            with output_path.open("wb") as audio_file:
                for chunk in resp.iter_content(chunk_size=1024 * 1024):
                    if chunk:
                        audio_file.write(chunk)

        if not output_path.exists() or output_path.stat().st_size == 0:
            raise RuntimeError("Downloaded audio file is empty.")

    def _extension_from_mime(self, mime_type: str) -> str:
        if "mp4" in mime_type or "m4a" in mime_type:
            return "m4a"
        if "mpeg" in mime_type or "mp3" in mime_type:
            return "mp3"
        if "ogg" in mime_type:
            return "ogg"
        return "webm"

    def _join_transcript_entries(self, entries) -> str:
        texts = []
        for entry in entries:
            if isinstance(entry, dict):
                text = entry.get("text", "")
            else:
                text = getattr(entry, "text", "")
            if text:
                texts.append(str(text))
        return " ".join(texts).strip()

    def _build_requests_session(self, cookie_file: Path | None = None) -> requests.Session:
        session = requests.Session()
        session.headers.update(self._browser_headers())
        proxies = self._requests_proxies()
        if proxies:
            session.proxies.update(proxies)

        if cookie_file:
            try:
                cookie_jar = http.cookiejar.MozillaCookieJar(str(cookie_file))
                cookie_jar.load(ignore_discard=True, ignore_expires=True)
                session.cookies.update(cookie_jar)
            except Exception as exc:
                logger.warning("Could not load YouTube cookies from %s: %s", cookie_file, exc)

        return session

    def _pytubefix_init_kwargs(self, youtube_cls) -> dict:
        kwargs = {}
        try:
            params = inspect.signature(youtube_cls).parameters
        except (TypeError, ValueError):
            params = {}

        if "use_oauth" in params:
            kwargs["use_oauth"] = False
        if "allow_oauth_cache" in params:
            kwargs["allow_oauth_cache"] = True
        if self._proxy_url:
            if "proxies" in params:
                kwargs["proxies"] = self._requests_proxies()
            elif "proxy" in params:
                kwargs["proxy"] = self._proxy_url

        return kwargs

    def _select_pytubefix_caption(self, captions):
        preferred_codes = ["en", "a.en", "en-US", "a.en-US", "ar", "a.ar"]

        for code in preferred_codes:
            try:
                return captions[code]
            except Exception:
                pass

            getter = getattr(captions, "get_by_language_code", None)
            if callable(getter):
                try:
                    caption = getter(code)
                    if caption is not None:
                        return caption
                except Exception:
                    pass

        try:
            for caption in captions:
                if not isinstance(caption, str):
                    return caption
                try:
                    return captions[caption]
                except Exception:
                    pass
        except Exception:
            return None

        return None

    def _caption_to_text(self, caption) -> str:
        srt_method = getattr(caption, "generate_srt_captions", None)
        if callable(srt_method):
            return self._strip_srt(srt_method())

        for attr_name in ("xml_captions", "xml_caption", "caption_xml"):
            value = getattr(caption, attr_name, None)
            if value:
                return self._strip_markup(str(value))

        json_value = getattr(caption, "json_captions", None)
        if json_value:
            try:
                data = json.loads(json_value) if isinstance(json_value, str) else json_value
                return self._join_caption_json(data)
            except Exception:
                pass

        return self._strip_markup(str(caption))

    def _strip_srt(self, srt_text: str) -> str:
        lines = []
        for line in srt_text.splitlines():
            stripped = line.strip()
            if not stripped or stripped.isdigit() or "-->" in stripped:
                continue
            lines.append(stripped)
        return " ".join(lines).strip()

    def _strip_markup(self, value: str) -> str:
        no_tags = re.sub(r"<[^>]+>", " ", value)
        return re.sub(r"\s+", " ", html.unescape(no_tags)).strip()

    def _join_caption_json(self, data) -> str:
        texts = []
        for event in data.get("events", []) if isinstance(data, dict) else []:
            for segment in event.get("segs", []) or []:
                text = segment.get("utf8", "")
                if text:
                    texts.append(text)
        return " ".join(texts).strip()

    def _apply_cookie_options(self, ydl_opts: dict) -> None:
        cookie_b64 = os.getenv("YOUTUBE_COOKIES_B64")
        if cookie_b64:
            import tempfile, base64
            try:
                cookie_bytes = base64.b64decode(cookie_b64)
                with tempfile.NamedTemporaryFile(delete=False, suffix=".txt") as f:
                    f.write(cookie_bytes)
                    cookie_path = f.name
                ydl_opts["cookiefile"] = cookie_path
            except Exception as exc:
                logger.warning("Failed to decode YOUTUBE_COOKIES_B64: %s", exc)
        else:
            cookie_file = self._resolve_cookie_file()
            if cookie_file:
                ydl_opts["cookiefile"] = str(cookie_file)

    def _apply_youtube_network_options(self, ydl_opts: dict) -> None:
        youtube_args = {
            "player_client": ["android", "web_safari", "tv"],
        }
        po_tokens = self._build_po_token_args()
        if po_tokens:
            youtube_args["po_token"] = po_tokens

        ydl_opts.update(
            {
                "source_address": "0.0.0.0",
                "socket_timeout": 30,
                "retries": 5,
                "fragment_retries": 5,
                "geo_bypass": True,
                "http_headers": self._browser_headers(),
                "extractor_args": {
                    "youtube": youtube_args,
                },
            }
        )
        if self._proxy_url:
            ydl_opts["proxy"] = self._proxy_url

    def _browser_headers(self) -> dict:
        return {
            "User-Agent": (
                "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                "AppleWebKit/537.36 (KHTML, like Gecko) "
                "Chrome/124.0.0.0 Safari/537.36"
            ),
            "Accept-Language": "en-US,en;q=0.9",
        }

    def _requests_proxies(self) -> dict:
        if not self._proxy_url:
            return {}
        return {
            "http": self._proxy_url,
            "https": self._proxy_url,
        }

    def _configure_proxy_environment(self) -> None:
        if not self._proxy_url:
            return

        for key in ("HTTP_PROXY", "HTTPS_PROXY", "http_proxy", "https_proxy"):
            os.environ.setdefault(key, self._proxy_url)

    def _load_invidious_instances(self) -> List[str]:
        configured = os.environ.get("INVIDIOUS_INSTANCES", "").strip()
        if configured:
            return [
                item.strip().rstrip("/")
                for item in configured.split(",")
                if item.strip()
            ]

        return [
            "https://yewtu.be",
            "https://inv.nadeko.net",
            "https://invidious.privacyredirect.com",
            "https://vid.puffyan.us",
        ]

    def _build_po_token_args(self) -> List[str]:
        if not self._youtube_po_token:
            return []

        raw_tokens = [
            token.strip()
            for token in re.split(r"[\n,]+", self._youtube_po_token)
            if token.strip()
        ]
        if not raw_tokens:
            return []

        po_tokens = []
        for token in raw_tokens:
            if "+" in token:
                po_tokens.append(token)
            else:
                client = self._youtube_po_token_client or "web"
                context = self._youtube_po_token_context or "gvs"
                po_tokens.append(f"{client}.{context}+{token}")
        return po_tokens

    def _has_youtube_auth(self) -> bool:
        return bool(
            self._youtube_cookies
            or self._youtube_cookies_b64
            or self._youtube_cookies_path
            or self._youtube_cookies_file
            or self._youtube_po_token
            or self._proxy_url
        )

    def _looks_like_youtube_auth_block(self, failures: List[str]) -> bool:
        combined = " ".join(failures).lower()
        return any(
            marker in combined
            for marker in (
                "sign in to confirm",
                "detected as a bot",
                "po_token",
                "bot",
                "forbidden",
                "403",
            )
        )

    def _resolve_cookie_file(self) -> Path | None:
        if self._youtube_cookies_path:
            cookie_path = Path(self._youtube_cookies_path)
            if cookie_path.exists():
                return cookie_path
            logger.warning("YOUTUBE_COOKIES_PATH is set but does not exist: %s", cookie_path)

        if self._youtube_cookies_file:
            cookie_path = Path(self._youtube_cookies_file)
            if cookie_path.exists():
                return cookie_path
            logger.warning("YOUTUBE_COOKIES_FILE is set but does not exist: %s", cookie_path)

        cookie_text = self._youtube_cookies
        if not cookie_text and self._youtube_cookies_b64:
            try:
                cookie_text = base64.b64decode(self._youtube_cookies_b64).decode("utf-8")
            except (binascii.Error, UnicodeDecodeError) as exc:
                logger.warning("YOUTUBE_COOKIES_B64 could not be decoded: %s", exc)
                return None

        if not cookie_text:
            return None

        settings.temp_dir.mkdir(parents=True, exist_ok=True)
        cookie_path = settings.temp_dir / "youtube_cookies.txt"
        cookie_text = cookie_text.replace("\\n", "\n")
        if not cookie_text.endswith("\n"):
            cookie_text += "\n"
        cookie_path.write_text(cookie_text, encoding="utf-8")
        return cookie_path

    def cleanup(self, path=None):
        if path is None:
            return

        try:
            audio_path = Path(path)
            if audio_path.exists() and audio_path.is_file():
                audio_path.unlink()
        except Exception as exc:
            logger.warning("Failed to clean up temporary audio file %s: %s", path, exc)

    def _extract_video_id(self, url: str) -> str:
        match = re.search(r"(?:v=|youtu\.be/|shorts/|embed/)([A-Za-z0-9_-]{11})", str(url))
        return match.group(1) if match else "unknown"