File size: 3,538 Bytes
3b47bbc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import json
import re
from typing import List, Dict, Tuple

def parse_timestamp(timestamp: str) -> Tuple[int, int]:
    """Convert timestamp string like '00:15' to seconds."""
    minutes, seconds = map(int, timestamp.split(':'))
    return minutes * 60 + seconds

def extract_time_and_speaker(line: str) -> Tuple[Tuple[int, int], str]:
    """Extract time range and speaker from a line."""
    # Extract time range
    time_match = re.match(r'\[(\d{2}:\d{2}) - (\d{2}:\d{2})\] (Speaker [A-Z]):', line)
    if not time_match:
        return None, None
    
    start_time = parse_timestamp(time_match.group(1))
    end_time = parse_timestamp(time_match.group(2))
    speaker = time_match.group(3)
    
    return (start_time, end_time), speaker

def has_overlap(range1: Tuple[int, int], range2: Tuple[int, int]) -> bool:
    """Check if two time ranges overlap."""
    start1, end1 = range1
    start2, end2 = range2
    return not (end1 <= start2 or end2 <= start1)

def has_same_speaker_overlap(transcript: str) -> bool:
    """Check if a transcript contains overlapping timestamps for the same speaker."""
    lines = transcript.split('\n')
    # Dictionary to store time ranges for each speaker
    speaker_ranges = {}
    
    for line in lines:
        if not line.strip():
            continue
            
        time_range, speaker = extract_time_and_speaker(line)
        if time_range is None or speaker is None:
            continue
            
        # Check for overlaps with existing ranges of the same speaker
        if speaker in speaker_ranges:
            for existing_range in speaker_ranges[speaker]:
                if has_overlap(time_range, existing_range):
                    return True
            
            speaker_ranges[speaker].append(time_range)
        else:
            speaker_ranges[speaker] = [time_range]
    
    return False

def process_file(input_file: str, output_file: str, delete_file: str):
    """Process the JSON file and separate entries with same-speaker overlapping timestamps."""
    with open(input_file, 'r', encoding='utf-8') as f:
        data = json.load(f)
        
    if isinstance(data, dict):
        data = [data]
        
    cleaned_data = []
    deleted_data = []
    removed_count = 0
    
    for entry in data:
        if 'model_output' in entry:
            if not has_same_speaker_overlap(entry['model_output']):
                cleaned_data.append(entry)
            else:
                deleted_data.append(entry)
                removed_count += 1
                print(f"Removing entry with key: {entry.get('key', 'unknown')}")
    
    # Save cleaned data
    with open(output_file, 'w', encoding='utf-8') as f:
        json.dump(cleaned_data, f, ensure_ascii=False, indent=2)
    
    # Save deleted data
    with open(delete_file, 'w', encoding='utf-8') as f:
        json.dump(deleted_data, f, ensure_ascii=False, indent=2)
    
    print(f"\nProcessing Summary:")
    print(f"Processed {len(data)} entries")
    print(f"Removed {removed_count} entries with same-speaker overlapping timestamps")
    print(f"Remaining entries: {len(cleaned_data)}")

if __name__ == '__main__':
    input_file = 'silence_overlaps/transcriptions.json'
    output_file = 'silence_overlaps/cleaned_transcriptions2.json'
    delete_file = 'silence_overlaps/delete_transcript2.json'
    process_file(input_file, output_file, delete_file)
    print(f"\nCleaned transcriptions have been saved to {output_file}")
    print(f"Deleted entries have been saved to {delete_file}")