import json from typing import Dict import tqdm from ai.classifier import ToxcitiyClassifier from core.data_manager import DataManager from models.schemas import AnalyzedTranscript, ChunkedSegment def batch_analyze_transcripts( data_manager: DataManager, classifier: ToxcitiyClassifier, chunk_size: int = 60, overlap: int = 10, ) -> Dict[str, AnalyzedTranscript]: """모든 트랜스크립트를 분석하고 결과를 저장""" # 기존 분석 결과 로드 try: with open("./data/analyzed_transcripts.json", "r", encoding="utf-8") as f: existing_data = json.load(f) analyzed_transcripts = existing_data.get("analyzed_transcripts", {}) except FileNotFoundError: analyzed_transcripts = {} # 모든 트랜스크립트 처리 all_transcripts = data_manager.transcript_data print(f"총 {len(all_transcripts)}개의 트랜스크립트 처리 시작...") for transcript_data in tqdm.tqdm(all_transcripts): video_id = transcript_data.get("video_id") # 이미 분석된 트랜스크립트는 건너뛰기 if video_id in analyzed_transcripts: print(f"Video {video_id}: 이미 분석됨, 건너뛰기") continue # 트랜스크립트 데이터 준비 transcript = data_manager.get_transcript_by_video_id(video_id) if transcript is None: print(f"Video {video_id}: 트랜스크립트를 찾을 수 없음") continue try: # 전체 영상 길이 계산 total_duration = max( segment["start"] + segment["duration"] for segment in transcript.transcript_segments ) # 청크 처리 num_chunks = math.ceil(total_duration / chunk_size) chunked_segments = [] is_toxic = False max_toxicity = 0.0 for i in range(num_chunks): chunk_start = i * chunk_size chunk_end = (i + 1) * chunk_size # 보간 범위 설정 overlap_start = max(0, chunk_start - overlap) overlap_end = min(total_duration, chunk_end + overlap) # 해당 청크에 포함될 트랜스크립트 수집 chunk_text = [] for segment in transcript.transcript_segments: segment_start = segment["start"] segment_end = segment_start + segment["duration"] if not (segment_end < overlap_start or segment_start > overlap_end): chunk_text.append(segment["text"]) # 청크 텍스트 생성 chunk_transcript = " ".join(chunk_text) # toxicity inference 수행 if chunk_transcript.strip(): # 빈 텍스트가 아닌 경우만 분석 toxicity_score = classifier.infer(chunk_transcript) max_toxicity = max(max_toxicity, toxicity_score) else: toxicity_score = 0.0 # 청크 세그먼트 생성 chunk = ChunkedSegment( start=overlap_start, end=overlap_end, transcript=chunk_transcript, toxicity_score=float(toxicity_score), ) chunked_segments.append(chunk) # 유해성 판단 (임계값 0.5 적용) is_toxic = max_toxicity > 0.5 # AnalyzedTranscript 객체 생성 analyzed_transcript = AnalyzedTranscript( video_id=video_id, chunk_count=len(chunked_segments), chunked_segments=chunked_segments, is_toxic=is_toxic, ) # 결과 저장 analyzed_transcripts[video_id] = analyzed_transcript # 중간 저장 (매 영상 분석 후) with open("./data/analyzed_transcripts.json", "w", encoding="utf-8") as f: json.dump( { "analyzed_transcripts": { vid: asdict(transcript) for vid, transcript in analyzed_transcripts.items() } }, f, ensure_ascii=False, indent=2, ) print( f"Video {video_id}: 분석 완료 (유해성: {is_toxic}, 최대 점수: {max_toxicity:.3f})" ) except Exception as e: print(f"Video {video_id} 처리 중 오류 발생: {str(e)}") continue return analyzed_transcripts if __name__ == "__main__": import math from dataclasses import asdict # 데이터 매니저와 분류기 초기화 data_manager = DataManager() classifier = ToxcitiyClassifier() # 배치 처리 실행 results = batch_analyze_transcripts(data_manager, classifier) # 최종 통계 total_analyzed = len(results) total_toxic = sum(1 for transcript in results.values() if transcript.is_toxic) print("\n분석 완료 통계:") print(f"총 처리된 영상: {total_analyzed}") print(f"유해 판정 영상: {total_toxic}") print(f"유해 비율: {(total_toxic/total_analyzed)*100:.1f}%")