| import requests
|
| import base64
|
| from datetime import datetime
|
| import json
|
| from dotenv import load_dotenv
|
| import os
|
| load_dotenv()
|
|
|
|
|
| def transcribe_audio_hamsa(audio, language, history):
|
| """
|
| Transcribe audio using Hamsa API
|
|
|
| Args:
|
| audio: Audio file path or audio data
|
| language: Selected language from dropdown
|
| history: Previous transcription history
|
| api_key: Hamsa API key
|
|
|
| Returns:
|
| tuple: (updated_history, transcribed_text)
|
| """
|
| api_key = os.getenv("HAMS_API_KEY")
|
| if not api_key:
|
| raise ValueError("HAMS_API_KEY not set in environment variables")
|
|
|
| if audio is None:
|
| return history, ""
|
|
|
|
|
| language_codes = {
|
| "English": "en",
|
| "Arabic": "ar",
|
| "Arabic (Egypt)": "ar",
|
| "Arabic (UAE)": "ar",
|
| "Arabic (Lebanon)": "ar",
|
| "Arabic (Saudi Arabia)": "ar",
|
| "Arabic (Kuwait)": "ar",
|
| "Arabic (Qatar)": "ar",
|
| "Arabic (Jordan)": "ar",
|
| "Auto-detect": "auto"
|
| }
|
|
|
| try:
|
|
|
| if isinstance(audio, str):
|
| with open(audio, 'rb') as audio_file:
|
| audio_bytes = audio_file.read()
|
| else:
|
| audio_bytes = audio
|
|
|
|
|
| audio_base64 = base64.b64encode(audio_bytes).decode('utf-8')
|
|
|
|
|
| selected_language = language_codes.get(language, "ar")
|
|
|
|
|
| url = "https://api.tryhamsa.com/v1/realtime/stt"
|
| payload = {
|
| "audioList": [],
|
| "audioBase64": audio_base64,
|
| "language": selected_language,
|
| "isEosEnabled": False,
|
| "eosThreshold": 0.3
|
| }
|
|
|
| headers = {
|
| "Authorization": api_key,
|
| "Content-Type": "application/json"
|
| }
|
|
|
|
|
| response = requests.post(url, json=payload, headers=headers)
|
| response.raise_for_status()
|
|
|
|
|
| result = response.json()
|
| text = result.get("text", "")
|
|
|
|
|
| if language == "Auto-detect" and text:
|
|
|
| text = f"[Auto-detected] {text}"
|
|
|
|
|
| timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
| new_entry = f"[{timestamp}] [{language}] {text}"
|
|
|
|
|
| if history:
|
| updated_history = history + "\n" + new_entry
|
| else:
|
| updated_history = new_entry
|
|
|
| return updated_history, text
|
|
|
| except requests.exceptions.RequestException as e:
|
| error_msg = f"API request failed: {e}"
|
| timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
| new_entry = f"[{timestamp}] [{language}] ERROR: {error_msg}"
|
|
|
| if history:
|
| updated_history = history + "\n" + new_entry
|
| else:
|
| updated_history = new_entry
|
|
|
| return updated_history, error_msg
|
|
|
| except json.JSONDecodeError as e:
|
| error_msg = f"Failed to parse API response: {e}"
|
| timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
| new_entry = f"[{timestamp}] [{language}] ERROR: {error_msg}"
|
|
|
| if history:
|
| updated_history = history + "\n" + new_entry
|
| else:
|
| updated_history = new_entry
|
|
|
| return updated_history, error_msg
|
|
|
| except Exception as e:
|
| error_msg = f"Unexpected error: {e}"
|
| timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
| new_entry = f"[{timestamp}] [{language}] ERROR: {error_msg}"
|
|
|
| if history:
|
| updated_history = history + "\n" + new_entry
|
| else:
|
| updated_history = new_entry
|
|
|
| return updated_history, error_msg
|
|
|
| def clear_history():
|
| """Clear the transcription history"""
|
| return "", ""
|
|
|
|
|
|
|
|
|
|
|
| |