Spaces:
Sleeping
Sleeping
| import os | |
| import subprocess | |
| import time | |
| import json | |
| import argparse | |
| from pathlib import Path | |
| import numpy as np | |
| import torch | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| import re | |
| from docx import Document | |
| from docx.shared import RGBColor, Pt | |
| from docx.enum.text import WD_ALIGN_PARAGRAPH | |
| from langdetect import detect | |
| # Import Hugging Face components | |
| from transformers import WhisperProcessor, WhisperForConditionalGeneration, pipeline | |
| from pyannote.audio import Pipeline | |
| from datasets import Dataset | |
| # Constants | |
| SPACY_MODELS = { | |
| 'es': 'es_core_news_sm', # Spanish | |
| 'en': 'en_core_web_sm', # English | |
| 'fr': 'fr_core_news_sm', # French | |
| 'it': 'it_core_news_sm', # Italian | |
| 'de': 'de_core_news_sm', # German | |
| 'pt': 'pt_core_news_sm', # Portuguese | |
| 'nl': 'nl_core_news_sm', # Dutch | |
| 'ca': 'ca_core_news_sm', # Catalan | |
| } | |
| # Function to load Spacy model based on language | |
| def load_spacy_model(language): | |
| import spacy | |
| from spacy.cli import download as spacy_download | |
| model_name = SPACY_MODELS.get(language, 'es_core_news_sm') | |
| try: | |
| print(f"Attempting to load Spacy model for language: {language} ({model_name})...") | |
| nlp = spacy.load(model_name) | |
| return nlp | |
| except OSError: | |
| print(f"Model {model_name} not found. Installing...") | |
| spacy_download(model_name) | |
| nlp = spacy.load(model_name) | |
| return nlp | |
| except Exception as e: | |
| print(f"Could not load Spacy model for language {language}: {str(e)}") | |
| print("Trying to load default English model...") | |
| try: | |
| spacy_download('en_core_web_sm') | |
| return spacy.load('en_core_web_sm') | |
| except Exception as e2: | |
| print(f"Could not load English model either: {str(e2)}") | |
| print("Using a minimal model...") | |
| return spacy.blank('en') | |
| # Function to extract audio from a video | |
| def extract_audio(video_path, audio_path): | |
| try: | |
| command = f"ffmpeg -i '{video_path}' -ar 16000 -ac 1 -c:a pcm_s16le '{audio_path}' -y" | |
| subprocess.run(command, shell=True, check=True) | |
| print(f"Audio extracted and saved to: {audio_path}") | |
| return True | |
| except subprocess.CalledProcessError as e: | |
| print(f"Error extracting audio: {e}") | |
| return False | |
| # Function to detect language of the audio | |
| def detect_language(transcribed_text): | |
| try: | |
| language = detect(transcribed_text) | |
| print(f"Detected language: {language}") | |
| return language | |
| except Exception as e: | |
| print(f"Error detecting language: {e}") | |
| return "es" # Spanish by default | |
| # Function to perform speaker diarization with pyannote.audio | |
| def diarize_speakers(audio_path, huggingface_token=None): | |
| try: | |
| print("Initializing speaker diarization...") | |
| # Use pyannote.audio for diarization | |
| use_auth = True if huggingface_token else False | |
| # If Hugging Face token is provided, use it | |
| if huggingface_token: | |
| diarization_pipeline = Pipeline.from_pretrained( | |
| "pyannote/speaker-diarization-3.1", | |
| use_auth_token=huggingface_token | |
| ) | |
| else: | |
| # Try to load without token (will only work if license has been accepted) | |
| try: | |
| diarization_pipeline = Pipeline.from_pretrained( | |
| "pyannote/speaker-diarization-3.1", | |
| use_auth_token=False | |
| ) | |
| except Exception as e: | |
| print(f"Error loading diarization model without token: {e}") | |
| print("It's recommended to create a Hugging Face account, accept the model license, and provide a token.") | |
| return {} | |
| print("Running diarization...") | |
| diarization = diarization_pipeline(audio_path) | |
| # Store speaker information and turns | |
| speakers = {} | |
| for turn, _, speaker in diarization.itertracks(yield_label=True): | |
| if speaker not in speakers: | |
| speakers[speaker] = [] | |
| speakers[speaker].append({ | |
| 'start': turn.start, | |
| 'end': turn.end | |
| }) | |
| # Rename speakers to be more user-friendly | |
| renamed_speakers = {} | |
| for i, (speaker, turns) in enumerate(speakers.items(), 1): | |
| renamed_speakers[f"Speaker {i}"] = turns | |
| print(f"Diarization completed. {len(renamed_speakers)} speakers identified.") | |
| return renamed_speakers | |
| except Exception as e: | |
| print(f"Error in speaker diarization: {e}") | |
| print("Continuing without diarization...") | |
| return {} | |
| # Function to transcribe audio with Whisper and get timestamps | |
| def transcribe_audio_with_timing(audio_path, model_name="openai/whisper-base", language=None): | |
| try: | |
| print(f"Loading Whisper model ({model_name})...") | |
| # Use Transformers pipeline for transcription | |
| transcription_pipeline = pipeline( | |
| "automatic-speech-recognition", | |
| model=model_name, | |
| chunk_length_s=30, | |
| device=0 if torch.cuda.is_available() else -1, | |
| return_timestamps="word" | |
| ) | |
| print("Transcribing audio with timestamps...") | |
| # If language is provided, use it; otherwise, let Whisper detect it | |
| if language: | |
| result = transcription_pipeline(audio_path, language=language) | |
| else: | |
| result = transcription_pipeline(audio_path) | |
| # Process the result to match the expected format | |
| transcribed_text = result.get("text", "") | |
| # Create segments from chunks with timestamps | |
| segments = [] | |
| chunk_words = result.get("chunks", []) | |
| # Group words into sentences/segments | |
| current_segment = { | |
| "start": 0, | |
| "end": 0, | |
| "text": "", | |
| "words": [] | |
| } | |
| for word_data in chunk_words: | |
| word = word_data.get("text", "") | |
| start_time = word_data.get("timestamp", (0, 0))[0] | |
| end_time = word_data.get("timestamp", (0, 0))[1] | |
| # Initialize first segment | |
| if not current_segment["text"]: | |
| current_segment["start"] = start_time | |
| current_segment["text"] += " " + word | |
| current_segment["words"].append(word_data) | |
| current_segment["end"] = end_time | |
| # Start a new segment at sentence end | |
| if word.endswith((".", "!", "?")): | |
| segments.append(current_segment) | |
| current_segment = { | |
| "start": end_time, | |
| "end": end_time, | |
| "text": "", | |
| "words": [] | |
| } | |
| # Add the last segment if not empty | |
| if current_segment["text"]: | |
| segments.append(current_segment) | |
| detected_language = result.get("language", "unknown") | |
| print(f"Transcription completed in language: {detected_language}") | |
| return transcribed_text, segments, detected_language | |
| except Exception as e: | |
| print(f"Error in transcription: {e}") | |
| return "", [], "unknown" | |
| # Function to assign speakers to transcribed segments | |
| def assign_speakers_to_segments(segments, speakers): | |
| if not speakers: | |
| # If no speaker information, assign "Unknown Speaker" to all segments | |
| for segment in segments: | |
| segment['speaker'] = "Unknown Speaker" | |
| return segments | |
| for segment in segments: | |
| start_time = segment['start'] | |
| end_time = segment['end'] | |
| # Find the speaker with the most overlap for this segment | |
| best_speaker = None | |
| max_overlap = 0 | |
| for speaker, turns in speakers.items(): | |
| for turn in turns: | |
| turn_start = turn['start'] | |
| turn_end = turn['end'] | |
| # Calculate overlap time | |
| overlap_start = max(start_time, turn_start) | |
| overlap_end = min(end_time, turn_end) | |
| overlap = max(0, overlap_end - overlap_start) | |
| if overlap > max_overlap: | |
| max_overlap = overlap | |
| best_speaker = speaker | |
| # Assign the best speaker found or "Unknown" if no match | |
| segment['speaker'] = best_speaker if best_speaker else "Unknown Speaker" | |
| return segments | |
| # Function to extract speaker information (how much each one speaks) | |
| def analyze_speaker_stats(segments): | |
| speaker_stats = {} | |
| total_duration = 0 | |
| for segment in segments: | |
| speaker = segment.get('speaker', 'Unknown Speaker') | |
| duration = segment['end'] - segment['start'] | |
| total_duration += duration | |
| if speaker not in speaker_stats: | |
| speaker_stats[speaker] = { | |
| 'total_time': 0, | |
| 'word_count': 0, | |
| 'segments': 0 | |
| } | |
| speaker_stats[speaker]['total_time'] += duration | |
| speaker_stats[speaker]['word_count'] += len(segment['text'].split()) | |
| speaker_stats[speaker]['segments'] += 1 | |
| # Calculate percentages | |
| for speaker in speaker_stats: | |
| speaker_stats[speaker]['percentage'] = (speaker_stats[speaker]['total_time'] / total_duration) * 100 | |
| return speaker_stats, total_duration | |
| # Function to generate speaker analysis charts | |
| def generate_speaker_analysis_charts(speaker_stats, output_path): | |
| try: | |
| # Create DataFrame for easier visualization | |
| speakers = list(speaker_stats.keys()) | |
| percentages = [speaker_stats[speaker]['percentage'] for speaker in speakers] | |
| word_counts = [speaker_stats[speaker]['word_count'] for speaker in speakers] | |
| # Create figure with two subplots | |
| fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6)) | |
| # Chart 1: Speaking time by speaker (pie) | |
| ax1.pie(percentages, labels=speakers, autopct='%1.1f%%', startangle=90) | |
| ax1.set_title('Speaking Time Distribution') | |
| # Chart 2: Number of words by speaker (bars) | |
| ax2.bar(speakers, word_counts) | |
| ax2.set_title('Word Count by Speaker') | |
| ax2.set_ylabel('Word Count') | |
| ax2.tick_params(axis='x', rotation=45) | |
| plt.tight_layout() | |
| plt.savefig(output_path) | |
| print(f"Analysis charts saved to: {output_path}") | |
| return True | |
| except Exception as e: | |
| print(f"Error generating analysis charts: {e}") | |
| return False | |
| # Function to choose organization mode: chronological or by speakers | |
| def organize_segments(segments, mode="chronological"): | |
| if mode == "by_speaker": | |
| # Organize by speakers | |
| speakers_content = {} | |
| for segment in segments: | |
| speaker = segment.get('speaker', 'Unknown Speaker') | |
| if speaker not in speakers_content: | |
| speakers_content[speaker] = [] | |
| speakers_content[speaker].append(segment) | |
| # Sort segments by time within each speaker | |
| for speaker in speakers_content: | |
| speakers_content[speaker].sort(key=lambda x: x['start']) | |
| return speakers_content | |
| else: | |
| # Organize chronologically (already sorted by time) | |
| return segments | |
| # Function to divide text into paragraphs based on organization mode | |
| def process_segments_for_document(segments, mode="chronological"): | |
| if mode == "by_speaker": | |
| # Organize by speakers | |
| speakers_content = organize_segments(segments, "by_speaker") | |
| paragraphs = [] | |
| for speaker, speaker_segments in speakers_content.items(): | |
| speaker_text = "" | |
| for segment in speaker_segments: | |
| speaker_text += segment['text'] + " " | |
| paragraphs.append({ | |
| 'speaker': speaker, | |
| 'text': speaker_text | |
| }) | |
| return paragraphs | |
| else: | |
| # Organize chronologically | |
| chronological_paragraphs = [] | |
| current_paragraph = [] | |
| current_speaker = None | |
| current_timestamp = None | |
| for segment in segments: | |
| speaker = segment.get('speaker', 'Unknown Speaker') | |
| text = segment['text'] | |
| start_time = segment['start'] | |
| end_time = segment['end'] | |
| # Format time as HH:MM:SS | |
| time_str = format_timestamp(start_time) | |
| # If speaker changes, start a new paragraph | |
| if current_speaker and current_speaker != speaker and current_paragraph: | |
| chronological_paragraphs.append({ | |
| 'speaker': current_speaker, | |
| 'text': ' '.join(current_paragraph), | |
| 'timestamp': current_timestamp | |
| }) | |
| current_paragraph = [] | |
| # Update current speaker and add text | |
| current_speaker = speaker | |
| current_timestamp = time_str | |
| current_paragraph.append(text) | |
| # Add the last paragraph if there's content | |
| if current_paragraph: | |
| chronological_paragraphs.append({ | |
| 'speaker': current_speaker, | |
| 'text': ' '.join(current_paragraph), | |
| 'timestamp': current_timestamp | |
| }) | |
| return chronological_paragraphs | |
| # Function to format time in HH:MM:SS format | |
| def format_timestamp(seconds): | |
| m, s = divmod(seconds, 60) | |
| h, m = divmod(m, 60) | |
| return f"{int(h):02d}:{int(m):02d}:{int(s):02d}" | |
| # Function to improve text style and grammar before saving | |
| def correct_text(text, language="es"): | |
| try: | |
| import language_tool_python | |
| language_code = language[:2].lower() # Get only the 2-letter language code | |
| supported_languages = ["es", "en", "fr", "de", "pt", "nl"] | |
| if language_code not in supported_languages: | |
| print(f"Grammar correction not available for language {language_code}, using Spanish by default.") | |
| language_code = "es" | |
| tool = language_tool_python.LanguageTool(language_code) | |
| matches = tool.check(text) | |
| corrected_text = language_tool_python.utils.correct(text, matches) | |
| return corrected_text | |
| except Exception as e: | |
| print(f"Error correcting text: {e}") | |
| return text # Return original text if there's an error | |
| # Function to create Word document with organized transcription | |
| def create_word_document(paragraphs, output_path, include_timestamps=True, stats=None, chart_path=None): | |
| try: | |
| doc = Document() | |
| # Configure document style | |
| style = doc.styles['Normal'] | |
| style.font.name = 'Arial' | |
| style.font.size = Pt(11) | |
| # Main title | |
| title = doc.add_heading('Transcription with Speaker Identification', 0) | |
| title.alignment = WD_ALIGN_PARAGRAPH.CENTER | |
| # Add statistics information if available | |
| if stats: | |
| doc.add_heading('Participation Summary', level=1) | |
| stats_table = doc.add_table(rows=1, cols=5) | |
| stats_table.style = 'Table Grid' | |
| # Table headers | |
| hdr_cells = stats_table.rows[0].cells | |
| hdr_cells[0].text = 'Speaker' | |
| hdr_cells[1].text = 'Time (s)' | |
| hdr_cells[2].text = 'Percentage (%)' | |
| hdr_cells[3].text = 'Words' | |
| hdr_cells[4].text = 'Interventions' | |
| # Add data for each speaker | |
| for speaker, data in stats.items(): | |
| row_cells = stats_table.add_row().cells | |
| row_cells[0].text = speaker | |
| row_cells[1].text = f"{data['total_time']:.2f}" | |
| row_cells[2].text = f"{data['percentage']:.2f}" | |
| row_cells[3].text = f"{data['word_count']}" | |
| row_cells[4].text = f"{data['segments']}" | |
| doc.add_paragraph() | |
| # Add chart if available | |
| if chart_path and os.path.exists(chart_path): | |
| doc.add_heading('Graphical Analysis', level=1) | |
| doc.add_picture(chart_path, width=Pt(450)) | |
| doc.add_paragraph() | |
| # Transcription title | |
| doc.add_heading('Complete Transcription', level=1) | |
| # Add paragraphs to document | |
| for paragraph in paragraphs: | |
| speaker = paragraph['speaker'] | |
| text = paragraph['text'] | |
| # Create paragraph with appropriate formatting | |
| p = doc.add_paragraph() | |
| # Add timestamp if available and option is enabled | |
| if include_timestamps and 'timestamp' in paragraph: | |
| timestamp_run = p.add_run(f"[{paragraph['timestamp']}] ") | |
| timestamp_run.bold = True | |
| timestamp_run.font.color.rgb = RGBColor(128, 128, 128) | |
| # Add speaker | |
| speaker_run = p.add_run(f"{speaker}: ") | |
| speaker_run.bold = True | |
| # Text color according to speaker for easier reading | |
| if "Speaker 1" in speaker: | |
| speaker_run.font.color.rgb = RGBColor(0, 0, 200) # Blue | |
| elif "Speaker 2" in speaker: | |
| speaker_run.font.color.rgb = RGBColor(200, 0, 0) # Red | |
| elif "Speaker 3" in speaker: | |
| speaker_run.font.color.rgb = RGBColor(0, 150, 0) # Green | |
| elif "Speaker 4" in speaker: | |
| speaker_run.font.color.rgb = RGBColor(128, 0, 128) # Purple | |
| # Add paragraph text | |
| text_run = p.add_run(text) | |
| # Add separator for better readability | |
| doc.add_paragraph() | |
| # Save document | |
| doc.save(output_path) | |
| print(f"Word document saved to: {output_path}") | |
| return True | |
| except Exception as e: | |
| print(f"Error creating Word document: {str(e)}") | |
| return False | |
| # Function to save results as JSON for later processing | |
| def save_json_results(segments, output_path): | |
| try: | |
| # Convert segments to serializable format | |
| serializable_segments = [] | |
| for segment in segments: | |
| serializable_segment = { | |
| 'start': segment['start'], | |
| 'end': segment['end'], | |
| 'text': segment['text'], | |
| 'speaker': segment.get('speaker', 'Unknown Speaker') | |
| } | |
| serializable_segments.append(serializable_segment) | |
| # Save to JSON file | |
| with open(output_path, 'w', encoding='utf-8') as f: | |
| json.dump(serializable_segments, f, ensure_ascii=False, indent=2) | |
| print(f"Results saved in JSON format: {output_path}") | |
| return True | |
| except Exception as e: | |
| print(f"Error saving results to JSON: {e}") | |
| return False | |
| # Function to save results to Hugging Face Dataset | |
| def save_to_huggingface_dataset(segments, output_path=None, push_to_hub=False, repo_id=None, token=None): | |
| try: | |
| # Prepare data for Dataset format | |
| data = { | |
| "segment_id": [], | |
| "start_time": [], | |
| "end_time": [], | |
| "speaker": [], | |
| "text": [] | |
| } | |
| for i, segment in enumerate(segments): | |
| data["segment_id"].append(i) | |
| data["start_time"].append(segment["start"]) | |
| data["end_time"].append(segment["end"]) | |
| data["speaker"].append(segment.get("speaker", "Unknown Speaker")) | |
| data["text"].append(segment["text"]) | |
| # Create Dataset | |
| dataset = Dataset.from_dict(data) | |
| # Save locally if path provided | |
| if output_path: | |
| dataset.save_to_disk(output_path) | |
| print(f"Dataset saved locally to: {output_path}") | |
| # Push to Hugging Face Hub if requested | |
| if push_to_hub and repo_id: | |
| dataset.push_to_hub(repo_id, token=token) | |
| print(f"Dataset pushed to Hugging Face Hub: {repo_id}") | |
| return dataset | |
| except Exception as e: | |
| print(f"Error saving to Hugging Face dataset: {e}") | |
| return None | |
| # Main function | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Audio transcription with speaker diarization using Hugging Face models") | |
| parser.add_argument("--video", type=str, help="Path to video file") | |
| parser.add_argument("--audio", type=str, help="Path to audio file (if already extracted)") | |
| parser.add_argument("--output_dir", type=str, default="./output", help="Directory to save output files") | |
| parser.add_argument("--model", type=str, default="openai/whisper-base", | |
| help="Whisper model to use: openai/whisper-tiny, openai/whisper-base, openai/whisper-small, openai/whisper-medium, openai/whisper-large") | |
| parser.add_argument("--language", type=str, help="Language code (e.g., 'es' for Spanish)") | |
| parser.add_argument("--hf_token", type=str, help="Hugging Face API token for speaker diarization") | |
| parser.add_argument("--organization", type=str, default="chronological", | |
| choices=["chronological", "by_speaker"], help="Transcription organization mode") | |
| parser.add_argument("--push_to_hub", action="store_true", help="Push results to Hugging Face Hub") | |
| parser.add_argument("--repo_id", type=str, help="Hugging Face repository ID for pushing dataset") | |
| args = parser.parse_args() | |
| # Create output directory if it doesn't exist | |
| os.makedirs(args.output_dir, exist_ok=True) | |
| # Timestamp for output files | |
| timestamp = time.strftime("%Y%m%d_%H%M%S") | |
| try: | |
| print("=== TRANSCRIPTION WITH SPEAKER DETECTION ===") | |
| # Check input file | |
| if args.audio: | |
| audio_path = args.audio | |
| base_filename = os.path.splitext(os.path.basename(audio_path))[0] | |
| elif args.video: | |
| video_path = args.video | |
| base_filename = os.path.splitext(os.path.basename(video_path))[0] | |
| audio_path = os.path.join(args.output_dir, f"{base_filename}_{timestamp}.wav") | |
| # Extract audio from video | |
| if not extract_audio(video_path, audio_path): | |
| print("Could not extract audio. Process canceled.") | |
| return | |
| else: | |
| print("Error: You must provide either a video file or an audio file.") | |
| return | |
| # Output file paths | |
| word_output_path = os.path.join(args.output_dir, f"{base_filename}_{timestamp}_transcription.docx") | |
| json_output_path = os.path.join(args.output_dir, f"{base_filename}_{timestamp}_data.json") | |
| chart_output_path = os.path.join(args.output_dir, f"{base_filename}_{timestamp}_analysis.png") | |
| dataset_output_path = os.path.join(args.output_dir, f"{base_filename}_{timestamp}_dataset") | |
| print(f"\nProcessing audio: {audio_path}") | |
| start_time = time.time() | |
| # Transcribe with Whisper | |
| print(f"\nStarting transcription with Whisper model {args.model}...") | |
| transcribed_text, segments, detected_language = transcribe_audio_with_timing( | |
| audio_path, | |
| model_name=args.model, | |
| language=args.language | |
| ) | |
| if not transcribed_text: | |
| print("Transcription failed. Process canceled.") | |
| return | |
| print(f"Transcription completed: {transcribed_text[:100]}...\n") | |
| # If no language specified, use the detected one | |
| if not args.language: | |
| detected_language = detect_language(transcribed_text) if detected_language == "unknown" else detected_language | |
| else: | |
| detected_language = args.language | |
| # Speaker diarization | |
| print("Starting speaker detection...") | |
| speakers = diarize_speakers(audio_path, args.hf_token) | |
| # Assign speakers to segments | |
| segments_with_speakers = assign_speakers_to_segments(segments, speakers) | |
| # Analyze speaker statistics | |
| speaker_stats, total_duration = analyze_speaker_stats(segments_with_speakers) | |
| print("\n=== PARTICIPATION STATISTICS ===") | |
| for speaker, stats in speaker_stats.items(): | |
| print(f"{speaker}: {stats['percentage']:.2f}% of time, {stats['word_count']} words, {stats['segments']} interventions") | |
| # Generate analysis charts | |
| generate_speaker_analysis_charts(speaker_stats, chart_output_path) | |
| # Process segments according to selected organization mode | |
| paragraphs = process_segments_for_document(segments_with_speakers, args.organization) | |
| # Save results as JSON | |
| save_json_results(segments_with_speakers, json_output_path) | |
| # Create Word document with transcription | |
| create_word_document( | |
| paragraphs, | |
| word_output_path, | |
| include_timestamps=True, | |
| stats=speaker_stats, | |
| chart_path=chart_output_path | |
| ) | |
| # Save to Hugging Face Dataset | |
| if args.push_to_hub or os.path.exists(dataset_output_path): | |
| save_to_huggingface_dataset( | |
| segments_with_speakers, | |
| output_path=dataset_output_path, | |
| push_to_hub=args.push_to_hub, | |
| repo_id=args.repo_id, | |
| token=args.hf_token | |
| ) | |
| # Total processing time | |
| end_time = time.time() | |
| elapsed_time = end_time - start_time | |
| print(f"\nTotal processing time: {elapsed_time:.2f} seconds") | |
| print("\nProcess completed successfully!") | |
| except Exception as e: | |
| print(f"Unexpected error during the process: {str(e)}") | |
| # Run the script | |
| if __name__ == "__main__": | |
| main() |