Spaces:
Runtime error
Runtime error
| import os | |
| import base64 | |
| import tempfile | |
| import requests | |
| from datetime import datetime | |
| import gradio as gr | |
| from dotenv import load_dotenv | |
| from openai import AzureOpenAI # official OpenAI SDK, works with Azure endpoints | |
| import json | |
| import subprocess | |
| import Youtubetranscription_summarizer | |
| import re | |
| # --- LLM call (Azure OpenAI with API key) ----------------------------------- | |
| def summarize_input(audio_b64: str = None, text_input: str = None, sys_prompt: str = None, user_prompt: str = None, Starttime: datetime = None) -> str: | |
| """ | |
| Calls Azure OpenAI Chat Completions with audio input (base64 mp3) or text input, or both. | |
| """ | |
| load_dotenv() | |
| endpoint = os.getenv("AC_OPENAI_ENDPOINT") | |
| api_key = os.getenv("AC_OPENAI_API_KEY") | |
| deployment = os.getenv("AC_MODEL_DEPLOYMENT") | |
| api_version = os.getenv("AC_OPENAI_API_VERSION") | |
| if not endpoint or not api_key or not deployment: | |
| return "Server misconfiguration: required env vars missing." | |
| # Reset json_text for logging | |
| json_text = "" | |
| try: | |
| client = AzureOpenAI( | |
| api_key=api_key, | |
| api_version=api_version, | |
| azure_endpoint=endpoint, | |
| ) | |
| system_message = sys_prompt.strip() if sys_prompt else ( | |
| "You are an AI assistant with a charter to clearly analyze the customer enquiry." | |
| ) | |
| user_text = user_prompt.strip() if user_prompt else ( | |
| "Summarize the provided content." if audio_b64 or text_input else "No input provided." | |
| ) | |
| content = [{"type": "text", "text": user_text}] | |
| if audio_b64: | |
| content.append({ | |
| "type": "input_audio", | |
| "input_audio": {"data": audio_b64, "format": "mp3"}, | |
| }) | |
| if text_input is not None: | |
| # Debugging: Print the type and value of text_input | |
| #print(f"Debug: text_input type={type(text_input)}, value={text_input}") | |
| if isinstance(text_input, str): | |
| try: | |
| # Try to parse the string as JSON to see if it's a list or dict | |
| parsed = json.loads(text_input) | |
| if isinstance(parsed, (list, dict)): | |
| # If it's a list or dict, convert back to JSON string | |
| content.append({"type": "text", "text": json.dumps(parsed)}) | |
| else: | |
| # If it's a string but not a JSON list/dict, use it as-is | |
| content.append({"type": "text", "text": text_input}) | |
| except json.JSONDecodeError: | |
| # If it's not valid JSON, treat it as a regular string | |
| content.append({"type": "text", "text": text_input}) | |
| elif isinstance(text_input, (list, dict)): | |
| try: | |
| # Convert list or dict to JSON-formatted string | |
| json_text = json.dumps(text_input) | |
| content.append({"type": "text", "text": json_text}) | |
| except (TypeError, ValueError): | |
| return "Error: text_input (list or dict) could not be converted to JSON." | |
| else: | |
| return f"Error: text_input must be a string, list, or dict, got {type(text_input)}." | |
| response = client.chat.completions.create( | |
| model=deployment, | |
| messages=[ | |
| {"role": "system", "content": system_message}, | |
| {"role": "user", "content": content}, | |
| ], | |
| ) | |
| Enddate = datetime.now() | |
| Callduration = Enddate - Starttime[0] | |
| print(f"Azure API call with a duration of {Callduration}: prompt_length={len(user_prompt or '')}, " | |
| f"audio_size={len(audio_b64 or '')}, text_input_size={len(json_text or '')}") | |
| return response.choices[0].message.content | |
| except Exception as ex: | |
| return print(f"Error from Azure OpenAI: {ex}") | |
| #----Retrieve meta data from metadata.json file------------------------------ | |
| def retrieve_file_path(file_name): | |
| path = os.path.dirname(os.path.abspath(__file__)) | |
| file_path = os.path.join(path, file_name) | |
| if os.path.isfile(file_path): | |
| return file_path | |
| elif not os.path.exists(file_path): | |
| print(f"'{file_path}' does not exist.") | |
| return None | |
| return None | |
| def retrieve_json_record(file_path, record_id): | |
| with open(file_path, 'r') as file: | |
| data = json.load(file) | |
| if isinstance(data, list): | |
| for record in data: | |
| if record.get('metadata', {}).get('id') == record_id: | |
| return record | |
| elif isinstance(data, dict): | |
| if data.get('metadata', {}).get('id') == record_id: | |
| return data | |
| return None | |
| # --- I/O helpers ------------------------------------------------------------ | |
| def encode_audio_from_path(path: str) -> str: | |
| with open(path, "rb") as f: | |
| return base64.b64encode(f.read()).decode("utf-8") | |
| def download_to_temp_mp3(url: str) -> str: | |
| r = requests.get(url, stream=True, timeout=30) | |
| r.raise_for_status() | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp: | |
| for chunk in r.iter_content(chunk_size=8192): | |
| if chunk: | |
| tmp.write(chunk) | |
| return tmp.name | |
| def process_audio(upload_path, record_path, url, sys_prompt, user_prompt): | |
| tmp_to_cleanup = [] | |
| audio_b64 = None | |
| text_input = None | |
| domaincheck = None | |
| try: | |
| # Capture start time for logging | |
| Starttime = datetime.now(), | |
| print(f"Azure API call starts at {datetime.now()}"), | |
| audio_path = None | |
| if upload_path: | |
| audio_path = upload_path | |
| elif record_path: | |
| audio_path = record_path | |
| elif url and url.strip(): | |
| # Check dns resolution of the url domain | |
| domain = Youtubetranscription_summarizer.extract_domain(url) | |
| if domain: | |
| domaincheck = Youtubetranscription_summarizer.nslookup(domain) # Check DNS resolution of the domain | |
| else: | |
| return "Invalid URL format." | |
| if domaincheck: | |
| # Check if the url is a youtube link | |
| CheckURL = re.search(r"Youtube", url, re.IGNORECASE) | |
| if CheckURL: | |
| # Get the transcription from youtube | |
| text_input = Youtubetranscription_summarizer.main(url.strip()) # Youtube files are transcribed and summarized | |
| tmp_to_cleanup.append(text_input) | |
| else: | |
| audio_path = download_to_temp_mp3(url.strip()) | |
| tmp_to_cleanup.append(audio_path) | |
| else: | |
| return f"DNS lookup failed for {domain}" | |
| if not audio_path and text_input is None: | |
| return "Please provide content via upload, recording, or URL." | |
| # If we have an audio file, encode it | |
| if audio_path: | |
| audio_b64 = encode_audio_from_path(audio_path) | |
| return summarize_input(audio_b64, text_input, sys_prompt, user_prompt, Starttime) | |
| except Exception as e: | |
| return print(f"Error processing audio at {datetime.now()}: prompt_length={len(user_prompt)}, audio_path={audio_path}: {str(e)}") | |
| finally: | |
| for p in tmp_to_cleanup: | |
| try: | |
| if os.path.exists(p): | |
| os.remove(p) | |
| except Exception: | |
| pass | |
| # --- UI --------------------------------------------------------------------- | |
| with gr.Blocks(title="Audio Summarizer") as demo: | |
| gr.Markdown("# Audio File Summarizer (Azure OpenAI)") | |
| gr.Markdown("Upload an mp3(**YouTube is the new feature add**), record audio, or paste a URL, use the default user prompt and system prompt and click 'Summarize'.") | |
| gr.Markdown("Users are encouraged to modify the user and system prompts to suit their needs.") | |
| with gr.Row(): | |
| with gr.Column(): | |
| upload_audio = gr.Audio(sources=["upload"], type="filepath", label="Upload mp3") | |
| with gr.Column(): | |
| record_audio = gr.Audio(sources=["microphone"], type="filepath", label="Record Audio") | |
| with gr.Column(): | |
| url_input = gr.Textbox(label="YouTube or standard mp3 URL", placeholder="https://example.com/audio.mp3") | |
| ### Get system and user prompts from metadata.json file | |
| file_name = 'metadata.json' | |
| record_id = '1' | |
| file_path = retrieve_file_path(file_name) | |
| jsonrecord = retrieve_json_record(file_path, record_id) | |
| if jsonrecord: | |
| print(json.dumps(jsonrecord, indent=2)) | |
| else: | |
| print("Record not found.") | |
| sysprompt_default = jsonrecord['metadata']['content']['system_prompt']['content'] | |
| userprompt_default = jsonrecord['metadata']['content']['user_prompt']['content'] | |
| with gr.Row(): | |
| userprompt_input = gr.Textbox( | |
| label="User Prompt", | |
| #value="Summarize the audio content", | |
| value=userprompt_default, | |
| placeholder="e.g., Extract key points and action items", | |
| ) | |
| sysprompt_input = gr.Textbox( | |
| label="System Prompt", | |
| #value="You are an AI assistant with a charter to clearly analyze the customer enquiry.", | |
| value=sysprompt_default, | |
| ) | |
| submit_btn = gr.Button("Summarize") | |
| output = gr.Textbox(label="Summary", lines=12) | |
| # Capture inputs for logging | |
| if upload_audio: | |
| upload_audio.change( | |
| fn=lambda x: print(f"Upload audio selected: {x}"), | |
| inputs=[upload_audio], | |
| outputs=[], | |
| # Reset other inputs to avoid confusion | |
| ) | |
| if record_audio: | |
| record_audio.change( | |
| fn=lambda x: print(f"Record audio selected: {x}"), | |
| inputs=[record_audio], | |
| outputs=[], | |
| ) | |
| if url_input: | |
| url_input.change( | |
| fn=lambda x: print(f"URL input changed: {x}"), | |
| inputs=[url_input], | |
| outputs=[], | |
| ) | |
| submit_btn.click( | |
| fn=process_audio, | |
| inputs=[upload_audio, record_audio, url_input, sysprompt_input, userprompt_input], | |
| outputs=output, | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() | |