liarMP4 / src /common_utils.py
GlazedDon0t's picture
final p1
4b424d6
import os
import re
import csv
import logging
import datetime
import subprocess
import hashlib
from pathlib import Path
import yt_dlp
import transcription
logger = logging.getLogger(__name__)
def robust_read_csv(file_path: Path):
if not file_path.exists():
return
try:
with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
clean_lines = (line.replace('\0', '') for line in f)
reader = csv.DictReader(clean_lines)
for row in reader:
if row:
yield row
except Exception as e:
logger.error(f"Error reading CSV {file_path}: {e}")
return
def extract_tweet_id(url: str) -> str | None:
if not url: return None
match = re.search(r"(?:twitter|x)\.com/[^/]+/status/(\d+)", url)
if match: return match.group(1)
return None
def extract_twitter_username(url: str) -> str | None:
if not url: return None
match = re.search(r"(?:twitter|x)\.com/([^/]+)/status/\d+", url)
if match: return match.group(1).lower()
return None
def normalize_link(link: str) -> str:
if not link: return ""
return link.split('?')[0].strip().rstrip('/').replace('http://', '').replace('https://', '').replace('www.', '')
def parse_vtt(file_path: str) -> str:
"""Parses a .vtt subtitle file and returns the clean text content."""
try:
if not os.path.exists(file_path):
return "Transcript file not found."
with open(file_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
text_lines =[]
for line in lines:
line = line.strip()
if line and not line.startswith('WEBVTT') and not '-->' in line and not line.isdigit():
clean_line = re.sub(r'<[^>]+>', '', line)
if clean_line and (not text_lines or clean_line != text_lines[-1]):
text_lines.append(clean_line)
return "\n".join(text_lines) if text_lines else "No speech found in transcript."
except Exception as e:
logger.error(f"Error parsing VTT file {file_path}: {e}")
return f"Error reading transcript: {e}"
async def prepare_video_assets(link: str, output_id: str) -> dict:
video_dir = Path("data/videos")
if not video_dir.exists():
video_dir.mkdir(parents=True, exist_ok=True)
video_path = video_dir / f"{output_id}.mp4"
audio_path = video_dir / f"{output_id}.wav"
transcript_path = video_dir / f"{output_id}.vtt"
caption = ""
video_downloaded = False
ydl_opts = {
'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/mp4',
'outtmpl': str(video_path),
'quiet': True, 'ignoreerrors': True, 'no_warnings': True, 'skip_download': False
}
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(link, download=False)
if info:
caption = info.get('description', '') or info.get('title', '')
formats = info.get('formats',[])
if not formats and not info.get('url'):
logger.info(f"No video formats found for {link}. Treating as text-only.")
else:
if not video_path.exists(): ydl.download([link])
except Exception as e:
logger.error(f"Download error for {link}: {e}")
if video_path.exists() and video_path.stat().st_size > 0:
video_downloaded = True
if not audio_path.exists():
subprocess.run(["ffmpeg", "-y", "-i", str(video_path), "-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", str(audio_path)], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
if audio_path.exists() and not transcript_path.exists():
transcription.load_model()
transcription.generate_transcript(str(audio_path))
return {
"video": str(video_path) if video_downloaded else None,
"transcript": str(transcript_path) if video_downloaded and transcript_path.exists() else None,
"caption": caption
}