Spaces:
Build error
Build error
| # ./backend/app/youtube_parser.py | |
| import os | |
| import re | |
| import contextlib | |
| import asyncio | |
| import json # json 모듈 추가 | |
| import shutil | |
| import sys # sys 모듈 추가 | |
| from urllib.parse import urlparse, parse_qs | |
| from loguru import logger | |
| from yt_dlp import YoutubeDL | |
| from proxy_manager import proxy_manager | |
| from dotenv import load_dotenv | |
| # 환경 변수 로드 (코드의 가장 위에 위치) | |
| load_dotenv() | |
| # --- Loguru 설정 시작 --- | |
| # 기본 핸들러(콘솔 출력) 제거 | |
| logger.remove() | |
| # 콘솔에 INFO 레벨 이상 로그 출력 | |
| logger.add(sys.stderr, level="INFO") | |
| # --- Loguru 설정 끝 --- | |
| def validate_youtube_url(url): | |
| ydl_opts = { | |
| 'quiet': True | |
| } | |
| try: | |
| with YoutubeDL(ydl_opts) as ydl: | |
| ydl.extract_info(url, download=False) | |
| return True | |
| except: | |
| return False | |
| async def get_youtube_video_id(url: str) -> str | None: | |
| """ | |
| 유튜브 URL에서 비디오 ID를 추출합니다. | |
| """ | |
| parsed_url = urlparse(url) | |
| if parsed_url.hostname and any(domain in parsed_url.hostname for domain in ['www.youtube.com', 'm.youtube.com', 'youtube.com']): | |
| query_params = parse_qs(parsed_url.query) | |
| if 'v' in query_params: | |
| return query_params['v'][0] | |
| elif parsed_url.hostname == 'youtu.be': | |
| video_id = parsed_url.path.strip('/') | |
| if len(video_id) == 11: | |
| return video_id | |
| logger.warning(f"알 수 없는 형식의 YouTube URL: {url}") | |
| return None | |
| def clean_caption_text(text: str) -> str: | |
| cleaned_text = re.sub(r'<[^>]+>', '', text) | |
| cleaned_text = re.sub(r'\s+', ' ', cleaned_text).strip() | |
| return cleaned_text | |
| async def get_transcript_with_timestamps(video_id: str) -> list[dict] | None: | |
| logger.info(f"비디오 ID '{video_id}'에 대한 자막 가져오기 시도.") | |
| processed_chunks = [] | |
| ydl_opts = { | |
| 'writesubtitles': True, | |
| 'writeautomaticsub': True, | |
| 'subtitleslangs': ['ko', 'en'], | |
| 'skip_download': True, | |
| 'outtmpl': '%(id)s.%(language)s.%(ext)s', | |
| 'quiet': False, | |
| 'no_warnings': False, | |
| 'extractor_args': { | |
| 'youtube': {'skip': ['dash']} | |
| } | |
| } | |
| logger.info("yt-dlp에 프록시가 적용되지 않았습니다.") | |
| temp_dir = f"./temp_captions_{video_id}" # 각 요청별 고유 디렉토리 생성 | |
| os.makedirs(temp_dir, exist_ok=True) | |
| original_cwd = os.getcwd() | |
| try: | |
| with contextlib.chdir(temp_dir): | |
| ydl_opts['outtmpl'] = '%(id)s.%(ext)s' | |
| with YoutubeDL(ydl_opts) as ydl: | |
| info_dict = await asyncio.to_thread(ydl.extract_info, video_id, download=True) | |
| caption_file_path = None | |
| if 'requested_subtitles' in info_dict and info_dict['requested_subtitles']: | |
| for lang_code in ydl_opts['subtitleslangs']: | |
| if lang_code in info_dict['requested_subtitles']: | |
| sub_info = info_dict['requested_subtitles'][lang_code] | |
| if 'filepath' in sub_info and sub_info['filepath']: | |
| caption_file_path = sub_info['filepath'] | |
| logger.info(f"yt-dlp가 '{lang_code}' 자막 파일을 info_dict에서 찾았습니다: {caption_file_path}") | |
| break | |
| if not caption_file_path: | |
| downloaded_files = [f for f in os.listdir('.') if f.startswith(video_id) and any(ext in f for ext in ['vtt', 'srt', 'json'])] | |
| for ext in ['vtt', 'srt', 'json']: | |
| ko_file = next((f for f in downloaded_files if f.endswith(f'.ko.{ext}')), None) | |
| if ko_file: | |
| caption_file_path = os.path.join(os.getcwd(), ko_file) | |
| break | |
| if not caption_file_path: | |
| for ext in ['vtt', 'srt', 'json']: | |
| any_file = next((f for f in downloaded_files if f.endswith(f'.{ext}')), None) | |
| if any_file: | |
| caption_file_path = os.path.join(os.getcwd(), any_file) | |
| break | |
| if caption_file_path and os.path.exists(caption_file_path): | |
| if caption_file_path.endswith('.vtt'): | |
| with open(caption_file_path, 'r', encoding='utf-8') as f: | |
| vtt_content = f.read() | |
| segments = vtt_content.split('\n\n') | |
| for segment in segments: | |
| if '-->' in segment: | |
| lines = segment.split('\n') | |
| time_str = lines[0].strip() | |
| text_content = ' '.join(lines[1:]).strip() | |
| text_content = clean_caption_text(text_content) | |
| if not text_content: continue | |
| try: | |
| start_time_parts = time_str.split(' --> ')[0].split(':') | |
| if len(start_time_parts) == 3: | |
| hours = int(start_time_parts[0]) | |
| minutes = int(start_time_parts[1]) | |
| seconds = int(float(start_time_parts[2].split('.')[0])) | |
| else: # MM:SS.ms | |
| hours = 0 | |
| minutes = int(start_time_parts[0]) | |
| seconds = int(float(start_time_parts[1].split('.')[0])) | |
| total_seconds = hours * 3600 + minutes * 60 + seconds | |
| timestamp_str = f"{hours:02d}:{minutes:02d}:{seconds:02d}" if hours > 0 else f"{minutes:02d}:{seconds:02d}" | |
| processed_chunks.append({ | |
| "text": text_content, | |
| "timestamp": timestamp_str, | |
| "start_seconds": total_seconds # Added | |
| }) | |
| except Exception as e: | |
| logger.warning(f"VTT 시간 파싱 오류: {time_str} - {e}") | |
| logger.info(f"VTT 자막 {len(processed_chunks)}개 청크 처리 완료.") | |
| elif caption_file_path.endswith('.json'): | |
| with open(caption_file_path, 'r', encoding='utf-8') as f: | |
| json_content = json.load(f) | |
| for entry in json_content: | |
| if 'start' in entry and 'text' in entry: | |
| total_seconds = int(entry['start']) | |
| hours = total_seconds // 3600 | |
| minutes = (total_seconds % 3600) // 60 | |
| seconds = total_seconds % 60 | |
| text = entry['text'] | |
| text = clean_caption_text(text) | |
| timestamp_str = f"{hours:02d}:{minutes:02d}:{seconds:02d}" if hours > 0 else f"{minutes:02d}:{seconds:02d}" | |
| processed_chunks.append({ | |
| "text": text, | |
| "timestamp": timestamp_str, | |
| "start_seconds": total_seconds # Added | |
| }) | |
| logger.info(f"JSON 자막 {len(processed_chunks)}개 청크 처리 완료.") | |
| elif caption_file_path.endswith('.srt'): | |
| with open(caption_file_path, 'r', encoding='utf-8') as f: | |
| srt_content = f.read() | |
| blocks = srt_content.strip().split('\n\n') | |
| for block in blocks: | |
| lines = block.split('\n') | |
| if len(lines) >= 3 and '-->' in lines[1]: | |
| time_str = lines[1].strip() | |
| text_content = ' '.join(lines[2:]).strip() | |
| text_content = clean_caption_text(text_content) | |
| try: | |
| start_time_parts = time_str.split(' --> ')[0].split(':') | |
| seconds_ms = float(start_time_parts[-1].replace(',', '.')) | |
| seconds = int(seconds_ms) | |
| minutes = int(start_time_parts[-2]) | |
| hours = int(start_time_parts[0]) if len(start_time_parts) == 3 else 0 | |
| total_seconds = hours * 3600 + minutes * 60 + seconds | |
| timestamp_str = f"{hours:02d}:{minutes:02d}:{seconds:02d}" if hours > 0 else f"{minutes:02d}:{seconds:02d}" | |
| processed_chunks.append({ | |
| "text": text_content, | |
| "timestamp": timestamp_str, | |
| "start_seconds": total_seconds # Added | |
| }) | |
| except Exception as e: | |
| logger.warning(f"SRT 시간 파싱 오류: {time_str} - {e}") | |
| logger.info(f"SRT 자막 {len(processed_chunks)}개 청크 처리 완료.") | |
| else: | |
| logger.warning(f"지원하지 않는 자막 파일 형식입니다: {caption_file_path}") | |
| else: | |
| logger.warning(f"비디오 ID '{video_id}'에 대한 yt-dlp 자막 파일을 찾을 수 없습니다. 최종 시도 경로: {caption_file_path}") | |
| except Exception as e: | |
| logger.error(f"자막 추출 중 오류: {e}") | |
| return [] | |
| finally: | |
| os.chdir(original_cwd) | |
| if os.path.exists(temp_dir): | |
| shutil.rmtree(temp_dir) # 임시 디렉토리와 내용물 모두 삭제 | |
| logger.info(f"임시 자막 디렉토리 '{temp_dir}' 정리 완료.") | |
| return processed_chunks | |
| def remove_duplicate_captions(chunks: list[dict]) -> list[dict]: | |
| if not chunks: | |
| return [] | |
| # 첫 번째 청크는 항상 포함 | |
| deduplicated_chunks = [chunks[0]] | |
| for i in range(1, len(chunks)): | |
| # 이전 청크의 텍스트와 현재 청크의 텍스트를 가져옴 | |
| # .strip()으로 양쪽 공백을 제거하여 비교 정확도 향상 | |
| prev_text = deduplicated_chunks[-1]["text"].strip() | |
| current_text = chunks[i]["text"].strip() | |
| # 현재 텍스트가 이전 텍스트로 시작하고, 길이가 더 긴 경우 (점진적 구성) | |
| # 예: prev="안녕하세요", current="안녕하세요 제 이름은" | |
| if current_text.startswith(prev_text) and len(current_text) > len(prev_text): | |
| # 이전 항목을 현재의 더 완전한 문장으로 교체 | |
| deduplicated_chunks[-1] = chunks[i] | |
| # 현재 텍스트와 이전 텍스트가 완전히 다른 내용일 경우에만 새로 추가 | |
| # (완전히 똑같은 중복도 이 조건에서 걸러짐) | |
| elif prev_text != current_text: | |
| deduplicated_chunks.append(chunks[i]) | |
| logger.info(f"중복 제거 후 최종 청크 수: {len(deduplicated_chunks)}") | |
| return deduplicated_chunks | |
| def merge_incomplete_sentences(chunks: list[dict]) -> list[dict]: | |
| if not chunks: | |
| return [] | |
| merged_chunks = [] | |
| current_merged_chunk = None | |
| for i, chunk in enumerate(chunks): | |
| text = chunk["text"].strip() | |
| if not text: | |
| continue | |
| if current_merged_chunk is None: | |
| current_merged_chunk = chunk.copy() | |
| else: | |
| # 이전 청크가 문장 종결 부호로 끝나는지 확인 | |
| prev_text_ends_with_punctuation = current_merged_chunk["text"].strip().endswith(('.', '?', '!', '...')) | |
| # 시간 간격 확인 (예: 0.5초 미만) | |
| time_gap_small = True | |
| if "start_seconds" in current_merged_chunk and "start_seconds" in chunk: | |
| if chunk["start_seconds"] - current_merged_chunk["start_seconds"] > 0.5: # 0.5초 이상 차이나면 병합하지 않음 | |
| time_gap_small = False | |
| # 이전 청크가 문장 종결 부호로 끝나지 않았고, 시간 간격이 작을 때만 병합 | |
| if not prev_text_ends_with_punctuation and time_gap_small: | |
| current_merged_chunk["text"] += " " + text | |
| # 병합된 청크의 시간은 첫 청크의 시간을 유지 | |
| else: | |
| # 병합하지 않는 경우, 현재까지 병합된 청크를 추가하고 새 청크 시작 | |
| merged_chunks.append(current_merged_chunk) | |
| current_merged_chunk = chunk.copy() | |
| # 마지막으로 남아있는 병합된 청크 추가 | |
| if current_merged_chunk is not None: | |
| merged_chunks.append(current_merged_chunk) | |
| logger.info(f"스마트 병합 후 청크 수: {len(merged_chunks)}") | |
| return merged_chunks | |
| async def process_youtube_video_data(video_url: str) -> list[dict] | None: | |
| video_id = await get_youtube_video_id(video_url) | |
| if not video_id: | |
| logger.error(f"유효하지 않은 YouTube URL: {video_url}") | |
| return None | |
| processed_chunks = await get_transcript_with_timestamps(video_id) | |
| if processed_chunks: | |
| # 1. 중복 제거 | |
| deduplicated_chunks = remove_duplicate_captions(processed_chunks) | |
| # 2. 불완전한 문장 병합 | |
| final_chunks = merge_incomplete_sentences(deduplicated_chunks) | |
| return final_chunks | |
| else: | |
| return processed_chunks | |