File size: 7,236 Bytes
5327928
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
import os
import uuid
import logging
import json
import shutil
from pathlib import Path
import tempfile
import gradio as gr
from process_interview import process_interview
from typing import Tuple, Optional, List, Dict
from concurrent.futures import ThreadPoolExecutor  # Import ThreadPoolExecutor for parallel processing

# Setup logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
logging.getLogger("nemo_logging").setLevel(logging.ERROR)
logging.getLogger("nemo").setLevel(logging.ERROR)

# Configuration
OUTPUT_DIR = "./processed_audio"
os.makedirs(OUTPUT_DIR, exist_ok=True)

# Constants
VALID_EXTENSIONS = ('.wav', '.mp3', '.m4a', '.flac')
MAX_FILE_SIZE = 100 * 1024 * 1024  # 100MB


def check_health() -> str:
    """Check system health, similar to FastAPI /health endpoint"""
    try:
        for directory in [OUTPUT_DIR]:
            if not os.path.exists(directory):
                raise Exception(f"Directory {directory} does not exist")
        return "System is healthy"
    except Exception as e:
        logger.error(f"Health check failed: {str(e)}")
        return f"System is unhealthy: {str(e)}"


# A helper function to process a single audio file
def process_single_audio(file_path_or_url: str) -> Dict:
    """Processes a single audio file and returns its analysis."""
    try:
        if not file_path_or_url:
            return {"error": "No audio provided for processing."}

        # Gradio will download the file if it's a URL and provide a local path.
        # So, 'file_path_or_url' will always be a local path when it reaches this function.
        temp_audio_path = Path(file_path_or_url)

        file_ext = temp_audio_path.suffix.lower()
        if file_ext not in VALID_EXTENSIONS:
            return {"error": f"Invalid file format: {file_ext}. Supported formats: {', '.join(VALID_EXTENSIONS)}"}

        file_size = os.path.getsize(temp_audio_path)
        if file_size > MAX_FILE_SIZE:
            return {
                "error": f"File too large: {file_size / (1024 * 1024):.2f}MB. Max size: {MAX_FILE_SIZE // (1024 * 1024)}MB"}

        logger.info(f"Processing audio from: {temp_audio_path}")
        result = process_interview(str(temp_audio_path))

        if not result or 'pdf_path' not in result or 'json_path' not in result:
            return {"error": "Processing failed - invalid result format."}

        pdf_path = Path(result['pdf_path'])
        json_path = Path(result['json_path'])

        if not pdf_path.exists() or not json_path.exists():
            return {"error": "Processing failed - output files not found."}

        with json_path.open('r') as f:
            analysis_data = json.load(f)

        voice_analysis = analysis_data.get('voice_analysis', {})
        summary = (
            f"Speakers: {', '.join(analysis_data['speakers'])}\n"
            f"Interview Duration: {analysis_data['text_analysis']['total_duration']:.2f} seconds\n"
            f"Confidence Level: {voice_analysis.get('interpretation', {}).get('confidence_level', 'Unknown')}\n"
            f"Anxiety Level: {voice_analysis.get('interpretation', {}).get('anxiety_level', 'Unknown')}"
        )

        json_data = json.dumps(analysis_data, indent=2)

        return {
            "summary": summary,
            "json_data": json_data,
            "pdf_path": str(pdf_path),
            "original_input": file_path_or_url  # Optionally return the original URL/path for mapping
        }

    except Exception as e:
        logger.error(f"Error processing single audio: {str(e)}", exc_info=True)
        return {"error": f"Error during processing: {str(e)}"}


# Main function to handle multiple audio files/URLs
def analyze_multiple_audios(file_paths_or_urls: List[str]) -> Tuple[str, str, List[str]]:
    """
    Analyzes multiple interview audio files/URLs in parallel.
    Returns combined summary, combined JSON, and a list of PDF paths.
    """
    if not file_paths_or_urls:
        return "No audio files/URLs provided.", "[]", []

    all_summaries = []
    all_json_data = []
    all_pdf_paths = []

    # Use ThreadPoolExecutor for parallel processing
    # Adjust max_workers based on available resources and expected load
    with ThreadPoolExecutor(max_workers=5) as executor:
        futures = {executor.submit(process_single_audio, item): item for item in file_paths_or_urls}

        for future in futures:
            item = futures[future]  # Get the original item (URL/path) that was processed
            try:
                result = future.result()  # Get the result of the processing
                if "error" in result:
                    all_summaries.append(f"Error processing {item}: {result['error']}")
                    # Include error in JSON output for clarity
                    all_json_data.append(json.dumps({"input": item, "error": result['error']}, indent=2))
                else:
                    all_summaries.append(f"Analysis for {os.path.basename(item)}:\n{result['summary']}")
                    all_json_data.append(result['json_data'])
                    all_pdf_paths.append(result['pdf_path'])
            except Exception as exc:
                logger.error(f"Item {item} generated an unexpected exception: {exc}", exc_info=True)
                all_summaries.append(f"Error processing {item}: An unexpected error occurred.")
                all_json_data.append(json.dumps({"input": item, "error": str(exc)}, indent=2))

    combined_summary = "\n\n---\n\n".join(all_summaries)
    # Ensure the combined_json_list is a valid JSON array string
    combined_json_list = "[\n" + ",\n".join(all_json_data) + "\n]"
    return combined_summary, combined_json_list, all_pdf_paths


# Gradio interface
with gr.Blocks(title="Interview Analysis System", theme=gr.themes.Soft()) as demo:
    gr.Markdown("""
    # 🎤 Interview Audio Analysis System
    Provide multiple audio file URLs or upload multiple audio files to analyze speaker performance.  
    Supported formats: WAV, MP3, M4A, FLAC (max 100MB per file).
    """)

    with gr.Row():
        with gr.Column():
            health_status = gr.Textbox(label="System Status", value=check_health(), interactive=False)
            audio_inputs = gr.File(
                label="Provide Audio URLs or Upload Files (Multiple allowed)",
                type="filepath",
                file_count="multiple"  # Allow multiple files/URLs
            )
            submit_btn = gr.Button("Start Analysis", variant="primary")

        with gr.Column():
            output_summary = gr.Textbox(label="Combined Analysis Summary", interactive=False,
                                        lines=10)  # Adjusted lines
            output_json = gr.Textbox(label="Detailed Analysis (JSON Array)", interactive=False, lines=20)
            pdf_outputs = gr.File(label="Download All Reports", type="filepath", file_count="multiple")

    submit_btn.click(
        fn=analyze_multiple_audios,
        inputs=audio_inputs,
        outputs=[output_summary, output_json, pdf_outputs]
    )

# Run the interface
if __name__ == "__main__":
    demo.launch(server_port=7860, server_name="0.0.0.0")