Spaces:
Sleeping
Sleeping
File size: 13,738 Bytes
4051511 d33fc74 4051511 d33fc74 4051511 d33fc74 4051511 d33fc74 4051511 d33fc74 4051511 d33fc74 4051511 d33fc74 4051511 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 |
import json
import re
from typing import Dict, List, Any, Optional
from datetime import timedelta
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class SubtitleFormatter:
"""Format transcription results into various subtitle formats"""
# Settings for grouping words into subtitle segments
MAX_WORDS_PER_SEGMENT = 12
MAX_DURATION_SECONDS = 7.0
MIN_DURATION_SECONDS = 1.0
@staticmethod
def group_words_into_segments(chunks: List[Dict]) -> List[Dict]:
"""
Group word-level chunks into proper subtitle segments.
Groups by:
- Sentence-ending punctuation (. ! ? etc.)
- Maximum words per segment
- Maximum duration per segment
Args:
chunks: List of word-level chunks with timestamps
Returns:
List of grouped segments with combined text and timestamps
"""
if not chunks:
return []
segments = []
current_segment = {
'words': [],
'start': None,
'end': None,
'text': ''
}
sentence_endings = re.compile(r'[.!?;:]$')
for chunk in chunks:
text = chunk.get('text', '').strip()
if not text:
continue
timestamp = chunk.get('timestamp', (None, None))
word_start = timestamp[0] if timestamp[0] is not None else 0.0
word_end = timestamp[1] if timestamp[1] is not None else word_start + 0.5
# Initialize segment start time
if current_segment['start'] is None:
current_segment['start'] = word_start
current_segment['words'].append(text)
current_segment['end'] = word_end
# Calculate current segment duration
duration = current_segment['end'] - current_segment['start']
word_count = len(current_segment['words'])
# Check if we should end the current segment
should_end_segment = (
sentence_endings.search(text) or # Sentence ending punctuation
word_count >= SubtitleFormatter.MAX_WORDS_PER_SEGMENT or # Max words reached
duration >= SubtitleFormatter.MAX_DURATION_SECONDS # Max duration reached
)
if should_end_segment:
# Finalize current segment
current_segment['text'] = ' '.join(current_segment['words'])
# Clean up double spaces
current_segment['text'] = re.sub(r'\s+', ' ', current_segment['text']).strip()
segments.append({
'timestamp': (current_segment['start'], current_segment['end']),
'text': current_segment['text']
})
# Reset for next segment
current_segment = {
'words': [],
'start': None,
'end': None,
'text': ''
}
# Don't forget the last segment if there are remaining words
if current_segment['words']:
current_segment['text'] = ' '.join(current_segment['words'])
current_segment['text'] = re.sub(r'\s+', ' ', current_segment['text']).strip()
segments.append({
'timestamp': (current_segment['start'], current_segment['end']),
'text': current_segment['text']
})
return segments
@staticmethod
def format_timestamp_srt(seconds: float) -> str:
"""
Convert seconds to SRT timestamp format (HH:MM:SS,mmm)
Args:
seconds: Time in seconds
Returns:
Formatted timestamp string
"""
if seconds is None:
seconds = 0.0
td = timedelta(seconds=seconds)
hours = int(td.total_seconds() // 3600)
minutes = int((td.total_seconds() % 3600) // 60)
secs = int(td.total_seconds() % 60)
millis = int((seconds - int(seconds)) * 1000)
return f"{hours:02d}:{minutes:02d}:{secs:02d},{millis:03d}"
@staticmethod
def format_timestamp_vtt(seconds: float) -> str:
"""
Convert seconds to VTT timestamp format (HH:MM:SS.mmm)
Args:
seconds: Time in seconds
Returns:
Formatted timestamp string
"""
if seconds is None:
seconds = 0.0
td = timedelta(seconds=seconds)
hours = int(td.total_seconds() // 3600)
minutes = int((td.total_seconds() % 3600) // 60)
secs = int(td.total_seconds() % 60)
millis = int((seconds - int(seconds)) * 1000)
return f"{hours:02d}:{minutes:02d}:{secs:02d}.{millis:03d}"
@staticmethod
def to_srt(result: Dict[str, Any], speaker_labels: Optional[Dict] = None) -> str:
"""
Convert transcription result to SRT format
Args:
result: Transcription result dictionary
speaker_labels: Optional speaker diarization labels
Returns:
SRT formatted string
"""
srt_content = []
chunks = result.get('chunks', [])
# Group words into proper subtitle segments
segments = SubtitleFormatter.group_words_into_segments(chunks)
for idx, segment in enumerate(segments, 1):
timestamp = segment.get('timestamp', (0, 0))
text = segment.get('text', '').strip()
if not text:
continue
start_time = timestamp[0] if timestamp[0] is not None else 0.0
end_time = timestamp[1] if timestamp[1] is not None else start_time + 1.0
# Add speaker label if available (simplified for grouped segments)
if speaker_labels:
# Find the most common speaker in this time range
speaker = SubtitleFormatter._get_speaker_for_segment(
speaker_labels, chunks, start_time, end_time
)
if speaker:
text = f"[{speaker}]: {text}"
# Format: index, timestamp, text
srt_content.append(f"{idx}")
srt_content.append(
f"{SubtitleFormatter.format_timestamp_srt(start_time)} --> "
f"{SubtitleFormatter.format_timestamp_srt(end_time)}"
)
srt_content.append(text)
srt_content.append("") # Blank line between entries
return "\n".join(srt_content)
@staticmethod
def _get_speaker_for_segment(
speaker_labels: Dict,
chunks: List[Dict],
start_time: float,
end_time: float
) -> Optional[str]:
"""Get the most common speaker for a time segment"""
speakers = []
for idx, chunk in enumerate(chunks):
ts = chunk.get('timestamp', (None, None))
if ts[0] is not None and start_time <= ts[0] <= end_time:
if idx in speaker_labels:
speakers.append(speaker_labels[idx])
if speakers:
# Return most common speaker
return max(set(speakers), key=speakers.count)
return None
@staticmethod
def to_vtt(result: Dict[str, Any], speaker_labels: Optional[Dict] = None) -> str:
"""
Convert transcription result to VTT (WebVTT) format
Args:
result: Transcription result dictionary
speaker_labels: Optional speaker diarization labels
Returns:
VTT formatted string
"""
vtt_content = ["WEBVTT", ""]
chunks = result.get('chunks', [])
# Group words into proper subtitle segments
segments = SubtitleFormatter.group_words_into_segments(chunks)
for idx, segment in enumerate(segments):
timestamp = segment.get('timestamp', (0, 0))
text = segment.get('text', '').strip()
if not text:
continue
start_time = timestamp[0] if timestamp[0] is not None else 0.0
end_time = timestamp[1] if timestamp[1] is not None else start_time + 1.0
# Add speaker label if available
if speaker_labels:
speaker = SubtitleFormatter._get_speaker_for_segment(
speaker_labels, chunks, start_time, end_time
)
if speaker:
text = f"<v {speaker}>{text}</v>"
# Format: timestamp, text
vtt_content.append(
f"{SubtitleFormatter.format_timestamp_vtt(start_time)} --> "
f"{SubtitleFormatter.format_timestamp_vtt(end_time)}"
)
vtt_content.append(text)
vtt_content.append("") # Blank line between entries
return "\n".join(vtt_content)
@staticmethod
def to_txt(result: Dict[str, Any], speaker_labels: Optional[Dict] = None) -> str:
"""
Convert transcription result to plain text format
Args:
result: Transcription result dictionary
speaker_labels: Optional speaker diarization labels
Returns:
Plain text string
"""
if speaker_labels:
# Format with speaker labels
txt_lines = []
chunks = result.get('chunks', [])
current_speaker = None
current_text = []
for idx, chunk in enumerate(chunks):
text = chunk.get('text', '').strip()
if not text:
continue
speaker = speaker_labels.get(idx, 'UNKNOWN')
if speaker != current_speaker:
# New speaker, write previous speaker's text
if current_text:
txt_lines.append(f"[{current_speaker}]: {' '.join(current_text)}")
current_text = []
current_speaker = speaker
current_text.append(text)
# Add last speaker's text
if current_text:
txt_lines.append(f"[{current_speaker}]: {' '.join(current_text)}")
return "\n\n".join(txt_lines)
else:
# Simple format without speakers
return result.get('text', '')
@staticmethod
def to_json(result: Dict[str, Any], speaker_labels: Optional[Dict] = None) -> str:
"""
Convert transcription result to JSON format with word-level timestamps
Args:
result: Transcription result dictionary
speaker_labels: Optional speaker diarization labels
Returns:
JSON formatted string
"""
output = {
'text': result.get('text', ''),
'language': result.get('language', 'unknown'),
'segments': []
}
chunks = result.get('chunks', [])
# Process chunks into segments with word-level details
for idx, chunk in enumerate(chunks):
timestamp = chunk.get('timestamp', (None, None))
text = chunk.get('text', '').strip()
if not text:
continue
segment = {
'index': idx,
'start': timestamp[0],
'end': timestamp[1],
'text': text,
}
# Add speaker label if available
if speaker_labels and idx in speaker_labels:
segment['speaker'] = speaker_labels[idx]
output['segments'].append(segment)
return json.dumps(output, indent=2, ensure_ascii=False)
@staticmethod
def save_to_file(content: str, file_path: str):
"""
Save formatted content to file
Args:
content: Formatted subtitle content
file_path: Path to save file
"""
try:
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
logger.info(f"Saved output to: {file_path}")
except Exception as e:
logger.error(f"Failed to save file: {e}")
raise Exception(f"Failed to save file: {str(e)}")
@staticmethod
def generate_all_formats(
result: Dict[str, Any],
output_prefix: str,
speaker_labels: Optional[Dict] = None
) -> Dict[str, str]:
"""
Generate all output formats and save to files
Args:
result: Transcription result dictionary
output_prefix: Prefix for output filenames
speaker_labels: Optional speaker diarization labels
Returns:
Dictionary mapping format to file path
"""
outputs = {}
# Generate SRT
srt_content = SubtitleFormatter.to_srt(result, speaker_labels)
srt_path = f"{output_prefix}.srt"
SubtitleFormatter.save_to_file(srt_content, srt_path)
outputs['srt'] = srt_path
# Generate VTT
vtt_content = SubtitleFormatter.to_vtt(result, speaker_labels)
vtt_path = f"{output_prefix}.vtt"
SubtitleFormatter.save_to_file(vtt_content, vtt_path)
outputs['vtt'] = vtt_path
# Generate TXT
txt_content = SubtitleFormatter.to_txt(result, speaker_labels)
txt_path = f"{output_prefix}.txt"
SubtitleFormatter.save_to_file(txt_content, txt_path)
outputs['txt'] = txt_path
# Generate JSON
json_content = SubtitleFormatter.to_json(result, speaker_labels)
json_path = f"{output_prefix}.json"
SubtitleFormatter.save_to_file(json_content, json_path)
outputs['json'] = json_path
logger.info(f"Generated all formats: {list(outputs.keys())}")
return outputs
|