Spaces:
Running
Running
| import openai | |
| from flask import Flask, jsonify, request, send_from_directory, send_file, Blueprint, current_app, url_for | |
| import os | |
| from flask_cors import CORS | |
| import io # for streaming S3 bytes in HF/AWS mode | |
| # Optional (only used in AWS mode) | |
| try: | |
| import boto3 | |
| from botocore.exceptions import BotoCoreError, ClientError | |
| except Exception: | |
| # Not required for local; will be imported dynamically in AWS mode | |
| boto3 = None | |
| BotoCoreError = ClientError = Exception | |
| app = Flask(__name__) | |
| CORS(app) | |
| # --- Blueprint --- | |
| finding_bp = Blueprint("findingword", __name__) | |
| # Directories for video, audio, and transcripts | |
| VIDEO_FOLDER = 'static/videos' | |
| AUDIO_FOLDER = 'static/audio' # used only in local mode | |
| TRANSCRIPT_FOLDER = 'static/transcripts' | |
| # --- OpenAI key handling (same as vocab builder) --- | |
| _OPENAI_API_KEY_FALLBACK = os.getenv("OPENAI_API_KEY", "") | |
| def _ensure_openai_key(): | |
| """Set openai.api_key from Flask config or env before each API call.""" | |
| api_key = (current_app.config.get("OPENAI_API_KEY") if current_app else None) or _OPENAI_API_KEY_FALLBACK | |
| if api_key: | |
| openai.api_key = api_key | |
| # ---------------------- audio-mode helpers ---------------------- | |
| def _is_aws_mode() -> bool: | |
| """ | |
| Switch to AWS Polly + S3 on Hugging Face / prod. | |
| Local stays on Google TTS + disk. | |
| """ | |
| if os.getenv("USE_AWS_AUDIO", "0") == "1": | |
| return True | |
| if os.getenv("SPACE_ID"): # set on Hugging Face Spaces | |
| return True | |
| if os.getenv("ENV", "dev").lower() == "prod": | |
| return True | |
| return False | |
| def _sanitize_filename(word: str) -> str: | |
| # Keep your current style but ensure safe S3 key/filename | |
| return word.strip().replace(" ", "_").replace(".", "").lower() | |
| # --------------------------------------------------------------------- | |
| def get_vocabulary_word_from_openai(): | |
| prompt = ( | |
| "Pick a simple vocabulary word suitable for children (ages 6–8) " | |
| "and provide its meaning in very easy English. Do not repeat words from previous responses. " | |
| "Format: 'Word: [word]. Meaning: [meaning].'" | |
| ) | |
| try: | |
| _ensure_openai_key() | |
| response = openai.chat.completions.create( | |
| model="gpt-3.5-turbo", | |
| messages=[ | |
| {"role": "system", "content": "You are a helpful assistant."}, | |
| {"role": "user", "content": prompt}, | |
| ] | |
| ) | |
| result = response.choices[0].message.content.strip() | |
| print(f"Full Response: {result}") | |
| if "Word:" in result and "Meaning:" in result: | |
| parts = result.split("Meaning:") | |
| word = parts[0].replace("Word:", "").strip() | |
| word = word.rstrip('.') # avoid trailing dot | |
| meaning = parts[1].strip() | |
| # Generate the sentence | |
| sentence = generate_sentence(word, meaning) | |
| # Generate audio file for the vocabulary word | |
| audio_file_path_or_name = generate_audio(word) # local path or just filename in AWS mode | |
| # URL for frontend remains identical | |
| # audio_url = f"/static/audio/{os.path.basename(audio_file_path_or_name)}" | |
| audio_url = url_for("findingword.serve_audio", | |
| filename=os.path.basename(audio_file_path_or_name)) | |
| return jsonify({ | |
| "word": word, | |
| "meaning": meaning, | |
| "sentence": sentence, | |
| "audio_file_path": audio_url | |
| }) | |
| else: | |
| return jsonify({"response": result, "message": "Meaning not provided in the expected format"}) | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| def generate_sentence(word, meaning): | |
| prompt = f"Create a sentence using the word '{word}' that fully demonstrates its meaning: {meaning}" | |
| _ensure_openai_key() | |
| response = openai.chat.completions.create( | |
| model="gpt-3.5-turbo", | |
| messages=[ | |
| {"role": "system", "content": "You are a helpful assistant."}, | |
| {"role": "user", "content": prompt}, | |
| ] | |
| ) | |
| sentence = response.choices[0].message.content.strip() | |
| return sentence | |
| def generate_audio(word): | |
| """ | |
| Local (default): Google TTS → write MP3 to ./static/audio/<word>.mp3 → return full path. | |
| Hugging Face / AWS mode: Polly → upload to S3 (findingword/<word>.mp3) → return just the filename, | |
| and let /static/audio/<filename> stream from S3 (see route below). | |
| """ | |
| sanitized_word = _sanitize_filename(word) | |
| filename = f"{sanitized_word}.mp3" | |
| if _is_aws_mode(): | |
| # ---- AWS Polly + S3 path (no local write) ---- | |
| if boto3 is None: | |
| raise RuntimeError("boto3 is required in AWS audio mode but not available") | |
| region = os.getenv("AWS_DEFAULT_REGION", "eu-north-1") | |
| bucket = os.getenv("S3_BUCKET_NAME") | |
| if not bucket: | |
| raise RuntimeError("S3_BUCKET_NAME is not set") | |
| polly = boto3.client("polly", region_name=region) | |
| s3 = boto3.client("s3", region_name=region) | |
| try: | |
| resp = polly.synthesize_speech( | |
| Text=word, | |
| OutputFormat="mp3", | |
| VoiceId=os.getenv("POLLY_VOICE_ID", "Joanna"), | |
| Engine=os.getenv("POLLY_ENGINE", "standard"), | |
| LanguageCode="en-US", | |
| ) | |
| stream = resp.get("AudioStream") | |
| if not stream: | |
| raise RuntimeError("Polly returned no AudioStream") | |
| audio_bytes = stream.read() | |
| except (BotoCoreError, ClientError, Exception) as e: | |
| raise RuntimeError(f"Polly TTS failed: {e}") | |
| key = f"findingword/{filename}" | |
| try: | |
| s3.put_object(Bucket=bucket, Key=key, Body=audio_bytes, ContentType="audio/mpeg") | |
| except (BotoCoreError, ClientError, Exception) as e: | |
| raise RuntimeError(f"S3 upload failed: {e}") | |
| # Return only the filename; /static/audio/<filename> will proxy from S3 | |
| return filename | |
| # ---- Local Google TTS path (lazy import; create dir here only) ---- | |
| audio_dir = AUDIO_FOLDER | |
| try: | |
| os.makedirs(audio_dir, exist_ok=True) | |
| except Exception: | |
| # Fallback if CWD is restricted | |
| audio_dir = "/tmp/audio" | |
| os.makedirs(audio_dir, exist_ok=True) | |
| audio_file_path = os.path.join(audio_dir, filename) | |
| if not os.path.exists(audio_file_path): | |
| try: | |
| # Import only in local mode to avoid HF credential errors | |
| from google.cloud import texttospeech | |
| gcp_client = texttospeech.TextToSpeechClient() | |
| except Exception as e: | |
| raise RuntimeError( | |
| "Google TTS is required in local mode but missing. " | |
| "Install google-cloud-texttospeech and set GOOGLE_APPLICATION_CREDENTIALS. " | |
| f"Details: {e}" | |
| ) | |
| synthesis_input = texttospeech.SynthesisInput(text=word) | |
| voice = texttospeech.VoiceSelectionParams( | |
| language_code="en-US", ssml_gender=texttospeech.SsmlVoiceGender.NEUTRAL | |
| ) | |
| audio_config = texttospeech.AudioConfig(audio_encoding=texttospeech.AudioEncoding.MP3) | |
| response = gcp_client.synthesize_speech( | |
| input=synthesis_input, voice=voice, audio_config=audio_config | |
| ) | |
| with open(audio_file_path, "wb") as out: | |
| out.write(response.audio_content) | |
| print(f"✅ Audio saved: {audio_file_path}") | |
| return audio_file_path | |
| def validate_word(): | |
| try: | |
| data = request.get_json() | |
| print("📥 Received data for validation:", data) | |
| if not data or 'user_input' not in data or 'correct_word' not in data: | |
| return jsonify({"error": "Invalid request, missing fields"}), 400 | |
| user_input = data.get('user_input', '').strip() | |
| correct_word = data.get('correct_word', '').strip() | |
| if user_input.lower() == correct_word.lower(): | |
| return jsonify({"status": "success", "message": "Correct! You typed the word correctly."}) | |
| else: | |
| return jsonify({"status": "failure", "message": f"Incorrect. The correct word was '{correct_word}'."}) | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| def serve_audio(filename): | |
| """ | |
| Local: serve from disk. | |
| AWS mode (HF): fetch the object from S3 and stream it (no local storage). | |
| """ | |
| if _is_aws_mode(): | |
| if boto3 is None: | |
| return jsonify({"error": "boto3 missing in AWS mode"}), 500 | |
| region = os.getenv("AWS_DEFAULT_REGION", "eu-north-1") | |
| bucket = os.getenv("S3_BUCKET_NAME") | |
| if not bucket: | |
| return jsonify({"error": "S3_BUCKET_NAME not set"}), 500 | |
| s3 = boto3.client("s3", region_name=region) | |
| key = f"findingword/{filename}" | |
| try: | |
| obj = s3.get_object(Bucket=bucket, Key=key) | |
| data = obj["Body"].read() | |
| return send_file( | |
| io.BytesIO(data), | |
| mimetype="audio/mpeg", | |
| download_name=filename, | |
| as_attachment=False | |
| ) | |
| except (BotoCoreError, ClientError, Exception) as e: | |
| return jsonify({"error": f"S3 fetch failed: {str(e)}"}), 404 | |
| # Local: serve file from disk as before (with /tmp fallback) | |
| local_path = os.path.join(AUDIO_FOLDER, filename) | |
| if os.path.exists(local_path): | |
| return send_from_directory(AUDIO_FOLDER, filename) | |
| alt_dir = "/tmp/audio" | |
| alt_path = os.path.join(alt_dir, filename) | |
| if os.path.exists(alt_path): | |
| return send_from_directory(alt_dir, filename) | |
| return jsonify({"error": "File not found"}), 404 | |
| # Run the Flask server (local dev): keep URLs unchanged by registering with empty prefix | |
| if __name__ == '__main__': | |
| app.register_blueprint(finding_bp, url_prefix='') # Local: /generate-vocabulary, /validate-word, /static/audio/... | |
| app.run(host='0.0.0.0', port=5005, debug=True) | |