| import os | |
| import re | |
| import csv | |
| import logging | |
| import datetime | |
| import subprocess | |
| import hashlib | |
| from pathlib import Path | |
| import yt_dlp | |
| import transcription | |
| logger = logging.getLogger(__name__) | |
| def robust_read_csv(file_path: Path): | |
| if not file_path.exists(): | |
| return | |
| try: | |
| with open(file_path, 'r', encoding='utf-8', errors='replace') as f: | |
| clean_lines = (line.replace('\0', '') for line in f) | |
| reader = csv.DictReader(clean_lines) | |
| for row in reader: | |
| if row: | |
| yield row | |
| except Exception as e: | |
| logger.error(f"Error reading CSV {file_path}: {e}") | |
| return | |
| def extract_tweet_id(url: str) -> str | None: | |
| if not url: return None | |
| match = re.search(r"(?:twitter|x)\.com/[^/]+/status/(\d+)", url) | |
| if match: return match.group(1) | |
| return None | |
| def extract_twitter_username(url: str) -> str | None: | |
| if not url: return None | |
| match = re.search(r"(?:twitter|x)\.com/([^/]+)/status/\d+", url) | |
| if match: return match.group(1).lower() | |
| return None | |
| def normalize_link(link: str) -> str: | |
| if not link: return "" | |
| return link.split('?')[0].strip().rstrip('/').replace('http://', '').replace('https://', '').replace('www.', '') | |
| def parse_vtt(file_path: str) -> str: | |
| """Parses a .vtt subtitle file and returns the clean text content.""" | |
| try: | |
| if not os.path.exists(file_path): | |
| return "Transcript file not found." | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| lines = f.readlines() | |
| text_lines =[] | |
| for line in lines: | |
| line = line.strip() | |
| if line and not line.startswith('WEBVTT') and not '-->' in line and not line.isdigit(): | |
| clean_line = re.sub(r'<[^>]+>', '', line) | |
| if clean_line and (not text_lines or clean_line != text_lines[-1]): | |
| text_lines.append(clean_line) | |
| return "\n".join(text_lines) if text_lines else "No speech found in transcript." | |
| except Exception as e: | |
| logger.error(f"Error parsing VTT file {file_path}: {e}") | |
| return f"Error reading transcript: {e}" | |
| async def prepare_video_assets(link: str, output_id: str) -> dict: | |
| video_dir = Path("data/videos") | |
| if not video_dir.exists(): | |
| video_dir.mkdir(parents=True, exist_ok=True) | |
| video_path = video_dir / f"{output_id}.mp4" | |
| audio_path = video_dir / f"{output_id}.wav" | |
| transcript_path = video_dir / f"{output_id}.vtt" | |
| caption = "" | |
| video_downloaded = False | |
| ydl_opts = { | |
| 'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/mp4', | |
| 'outtmpl': str(video_path), | |
| 'quiet': True, 'ignoreerrors': True, 'no_warnings': True, 'skip_download': False | |
| } | |
| try: | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(link, download=False) | |
| if info: | |
| caption = info.get('description', '') or info.get('title', '') | |
| formats = info.get('formats',[]) | |
| if not formats and not info.get('url'): | |
| logger.info(f"No video formats found for {link}. Treating as text-only.") | |
| else: | |
| if not video_path.exists(): ydl.download([link]) | |
| except Exception as e: | |
| logger.error(f"Download error for {link}: {e}") | |
| if video_path.exists() and video_path.stat().st_size > 0: | |
| video_downloaded = True | |
| if not audio_path.exists(): | |
| subprocess.run(["ffmpeg", "-y", "-i", str(video_path), "-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", str(audio_path)], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
| if audio_path.exists() and not transcript_path.exists(): | |
| transcription.load_model() | |
| transcription.generate_transcript(str(audio_path)) | |
| return { | |
| "video": str(video_path) if video_downloaded else None, | |
| "transcript": str(transcript_path) if video_downloaded and transcript_path.exists() else None, | |
| "caption": caption | |
| } | |