Spaces:
Running
Running
| # listen.py | |
| from flask import Flask, Blueprint, jsonify, send_file, abort, request, send_from_directory | |
| from flask_cors import CORS | |
| from moviepy.editor import VideoFileClip | |
| from google.cloud import speech | |
| import os | |
| print(f"GOOGLE_APPLICATION_CREDENTIALS: {os.getenv('GOOGLE_APPLICATION_CREDENTIALS')}") | |
| import uuid | |
| import requests | |
| from pydub import AudioSegment | |
| import ffmpeg | |
| import re | |
| import io # for streaming S3 bytes in HF/AWS mode | |
| import json # <-- added for JSON creds parsing | |
| # Optional (only used in AWS mode) | |
| try: | |
| import boto3 | |
| from botocore.exceptions import BotoCoreError, ClientError | |
| except Exception: | |
| boto3 = None | |
| BotoCoreError = ClientError = Exception | |
| # ---------- Blueprint ---------- | |
| listen_bp = Blueprint("listen", __name__) | |
| # ---------------------- storage mode helpers ---------------------- | |
| def _is_aws_video_mode() -> bool: | |
| """ | |
| Switch to S3 on Hugging Face / prod. Local stays on disk. | |
| """ | |
| if os.getenv("USE_AWS_VIDEO", "0") == "1": | |
| return True | |
| if os.getenv("SPACE_ID"): # set on Hugging Face Spaces | |
| return True | |
| if os.getenv("ENV", "dev").lower() == "prod": | |
| return True | |
| return False | |
| def _s3_clients(): | |
| if boto3 is None: | |
| raise RuntimeError("boto3 is required in AWS video mode but not available") | |
| region = os.getenv("AWS_DEFAULT_REGION", "eu-north-1") | |
| s3 = boto3.client("s3", region_name=region) | |
| return s3 | |
| def _video_s3_bucket(): | |
| bucket = os.getenv("S3_BUCKET_NAME") | |
| if not bucket: | |
| raise RuntimeError("S3_BUCKET_NAME is not set") | |
| return bucket | |
| def _video_s3_key(filename: str) -> str: | |
| # Prefix under which listen.py stores videos in the same bucket | |
| prefix = os.getenv("LISTEN_S3_PREFIX", "listen") | |
| prefix = prefix.strip().strip("/") | |
| return f"{prefix}/{filename}" | |
| # ---------- writable working directories ---------- | |
| # Base working dir: /tmp on HF/AWS; local stays under ./static (or override via LISTEN_WORKDIR) | |
| _BASE_WORKDIR = os.getenv( | |
| "LISTEN_WORKDIR", | |
| "/tmp/listen" if _is_aws_video_mode() else os.path.abspath("static") | |
| ) | |
| VIDEO_FOLDER = os.path.join(_BASE_WORKDIR, "videos") | |
| AUDIO_FOLDER = os.path.join(_BASE_WORKDIR, "audio") | |
| TRANSCRIPT_FOLDER = os.path.join(_BASE_WORKDIR, "transcripts") | |
| # Ensure directories exist (with hard fallback to /tmp if needed) | |
| for _pname in ("videos", "audio", "transcripts"): | |
| _p = os.path.join(_BASE_WORKDIR, _pname) | |
| try: | |
| os.makedirs(_p, exist_ok=True) | |
| except Exception: | |
| _fallback_base = "/tmp/listen" | |
| os.makedirs(os.path.join(_fallback_base, _pname), exist_ok=True) | |
| if _pname == "videos": | |
| VIDEO_FOLDER = os.path.join(_fallback_base, "videos") | |
| elif _pname == "audio": | |
| AUDIO_FOLDER = os.path.join(_fallback_base, "audio") | |
| else: | |
| TRANSCRIPT_FOLDER = os.path.join(_fallback_base, "transcripts") | |
| # ---------------- Cohere configuration (migrated to v2 Chat) ---------------- | |
| COHERE_API_KEY = os.getenv("COHERE_API_KEY", "") | |
| COHERE_API_URL = 'https://api.cohere.com/v2/chat' | |
| # --------------------------------------------------------------------------- | |
| # --- Google Cloud Speech-to-Text client init (prefers HF secret JSON) --- | |
| def _make_speech_client(): | |
| sa_json = os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON") | |
| if sa_json: | |
| try: | |
| info = json.loads(sa_json) | |
| return speech.SpeechClient.from_service_account_info(info) | |
| except Exception as e: | |
| print(f"Failed to parse GOOGLE_APPLICATION_CREDENTIALS_JSON: {e}") | |
| # fall through to default ADC | |
| return speech.SpeechClient() | |
| speech_client = _make_speech_client() | |
| # ------------------------------------------------------------------------- | |
| # ------------- Cohere v2 helper (extract text from chat response) ------------- | |
| def _extract_text_v2(resp_json: dict) -> str: | |
| """ | |
| Cohere v2 /chat returns: | |
| { "message": { "content": [ { "type": "text", "text": "..." }, ... ] } } | |
| This pulls the first text block. | |
| """ | |
| msg = resp_json.get("message", {}) | |
| content = msg.get("content", []) | |
| for block in content: | |
| if isinstance(block, dict) and block.get("type") == "text": | |
| text = (block.get("text") or "").strip() | |
| if text: | |
| return text | |
| return "" | |
| # ----------------------------------------------------------------------------- | |
| # Convert video to audio | |
| def convert_video_to_audio(video_path, audio_path): | |
| try: | |
| # Using moviepy to extract audio from video | |
| video = VideoFileClip(video_path) | |
| video.audio.write_audiofile(audio_path, codec='mp3') | |
| return audio_path | |
| except Exception as e: | |
| print(f"Error converting video to audio: {str(e)}") | |
| return None | |
| # Re-encode MP3 to ensure proper format | |
| def reencode_mp3(input_audio_path, output_audio_path): | |
| try: | |
| # Using pydub to convert and re-encode MP3 (ensuring correct encoding) | |
| audio = AudioSegment.from_mp3(input_audio_path) | |
| audio.export(output_audio_path, format="mp3", codec="libmp3lame", parameters=["-q:a", "0"]) | |
| return output_audio_path | |
| except Exception as e: | |
| print(f"Error re-encoding MP3: {str(e)}") | |
| return None | |
| # Helper function to convert audio to the proper MP3 encoding if necessary | |
| def convert_audio_to_mp3(input_file_path, output_file_path): | |
| """ | |
| Converts the audio file to a valid MP3 format with proper encoding. | |
| """ | |
| try: | |
| ffmpeg.input(input_file_path).output(output_file_path, acodec='libmp3lame', audio_bitrate='128k').run() | |
| return True | |
| except Exception as e: | |
| print(f"Error during audio conversion: {e}") | |
| return False | |
| # Function to compress audio dynamically | |
| def compress_audio(input_file_path, output_file_path, target_bitrate="128k"): | |
| audio = AudioSegment.from_file(input_file_path) | |
| audio.export(output_file_path, format="mp3", bitrate=target_bitrate) | |
| return output_file_path | |
| # ---------------------------- Routes (Blueprint) ---------------------------- | |
| def home(): | |
| return "Welcome to the Flask app! The server is running." | |
| def list_videos(): | |
| """ | |
| List available videos for users to watch. | |
| """ | |
| # If you maintain a VIDEOS list elsewhere, return it here. | |
| # Returning empty list so the endpoint stays valid. | |
| return jsonify([]), 200 | |
| def serve_video(filename): | |
| """ | |
| Local: serve file from disk. | |
| HF/AWS: fetch object from S3 and stream bytes (no redirect). | |
| """ | |
| if _is_aws_video_mode(): | |
| try: | |
| s3 = _s3_clients() | |
| bucket = _video_s3_bucket() | |
| key = _video_s3_key(filename) | |
| obj = s3.get_object(Bucket=bucket, Key=key) | |
| data = obj["Body"].read() | |
| return send_file( | |
| io.BytesIO(data), | |
| mimetype="video/mp4", | |
| download_name=filename, | |
| as_attachment=False | |
| ) | |
| except (BotoCoreError, ClientError, Exception) as e: | |
| print(f"S3 fetch failed for {filename}: {e}") | |
| abort(404) | |
| # Local | |
| video_path = os.path.join(VIDEO_FOLDER, filename) | |
| if not os.path.exists(video_path): | |
| print(f"Video file not found: {filename}") | |
| abort(404) | |
| return send_file(video_path, mimetype='video/mp4') | |
| def upload_video(): | |
| """ | |
| Local: save to static/videos or /tmp/listen/videos (depending on mode). | |
| HF/AWS: upload to S3 (no local original). | |
| """ | |
| print("Received upload request.") | |
| if 'video' not in request.files: | |
| print("No video file provided in the request.") | |
| return jsonify({'error': 'No video file provided'}), 400 | |
| video = request.files['video'] | |
| if video.filename == '': | |
| print("Empty filename detected.") | |
| return jsonify({'error': 'No selected file'}), 400 | |
| try: | |
| filename = f"{uuid.uuid4()}.mp4" | |
| if _is_aws_video_mode(): | |
| try: | |
| s3 = _s3_clients() | |
| bucket = _video_s3_bucket() | |
| key = _video_s3_key(filename) | |
| s3.put_object( | |
| Bucket=bucket, | |
| Key=key, | |
| Body=video.stream.read(), | |
| ContentType="video/mp4" | |
| ) | |
| print(f"Uploaded to S3: s3://{bucket}/{key}") | |
| except (BotoCoreError, ClientError, Exception) as e: | |
| print(f"S3 upload error: {e}") | |
| return jsonify({'error': 'Failed to upload to S3'}), 500 | |
| else: | |
| # Save locally | |
| video_path = os.path.join(VIDEO_FOLDER, filename) | |
| print(f"Saving video: {filename}") | |
| video.save(video_path) | |
| print(f"Video saved successfully at {video_path}") | |
| return jsonify({'message': 'Video uploaded successfully!', 'filename': filename}), 200 | |
| except Exception as e: | |
| print(f"Error saving video: {str(e)}") | |
| return jsonify({'error': 'Failed to save video'}), 500 | |
| def generate_questions(): | |
| try: | |
| data = request.json | |
| video_filename = data.get('filename') | |
| if not video_filename: | |
| print("Error: No filename provided in request.") | |
| return jsonify({"error": "Filename is required"}), 400 | |
| # Resolve a local readable path for processing | |
| video_path = os.path.join(VIDEO_FOLDER, video_filename) | |
| if _is_aws_video_mode(): | |
| # Download object bytes to a local working file path | |
| try: | |
| s3 = _s3_clients() | |
| bucket = _video_s3_bucket() | |
| key = _video_s3_key(video_filename) | |
| obj = s3.get_object(Bucket=bucket, Key=key) | |
| data_bytes = obj["Body"].read() | |
| with open(video_path, "wb") as f: | |
| f.write(data_bytes) | |
| except (BotoCoreError, ClientError, Exception) as e: | |
| print(f"S3 download error for {video_filename}: {e}") | |
| return jsonify({"error": "Video file not found"}), 404 | |
| else: | |
| if not os.path.exists(video_path): | |
| print(f"Error: Video file {video_filename} not found at {video_path}") | |
| return jsonify({"error": "Video file not found"}), 404 | |
| print(f"Processing video: {video_filename}") | |
| # Convert video to audio | |
| audio_filename = f"{uuid.uuid4()}.mp3" | |
| audio_path = os.path.join(AUDIO_FOLDER, audio_filename) | |
| if not convert_video_to_audio(video_path, audio_path): | |
| print("Error: Video to audio conversion failed.") | |
| return jsonify({"error": "Failed to convert video to audio"}), 500 | |
| # Transcribe audio using Google Cloud Speech-to-Text | |
| with open(audio_path, 'rb') as audio_file: | |
| audio_content = audio_file.read() | |
| audio = speech.RecognitionAudio(content=audio_content) | |
| config = speech.RecognitionConfig( | |
| encoding=speech.RecognitionConfig.AudioEncoding.MP3, | |
| sample_rate_hertz=16000, | |
| language_code="en-US", | |
| ) | |
| response = speech_client.recognize(config=config, audio=audio) | |
| transcripts = [result.alternatives[0].transcript for result in response.results] | |
| if not transcripts: | |
| print("Error: No transcription results found.") | |
| return jsonify({"error": "No transcription results found"}), 500 | |
| transcription_text = " ".join(transcripts) | |
| print(f"Transcription successful: {transcription_text[:200]}...") # Print first 200 chars | |
| # ---------------- Cohere v2 Chat call (minimal change) ---------------- | |
| headers = { | |
| "Authorization": f"Bearer {COHERE_API_KEY}", | |
| "Content-Type": "application/json" | |
| } | |
| prompt_text = ( | |
| "Generate exactly three multiple-choice questions based on this text:\n" | |
| f"{transcription_text}\n\n" | |
| "Rules:\n" | |
| "- Each question starts with a number and a period (e.g., 1.)\n" | |
| "- Each question has exactly four options labeled A., B., C., and D.\n" | |
| "- After the options, add a line 'Correct answer: <A|B|C|D>'\n" | |
| "- Output plain text only." | |
| ) | |
| cohere_payload = { | |
| "model": "command-r-08-2024", | |
| "messages": [ | |
| {"role": "user", "content": prompt_text} | |
| ], | |
| "max_tokens": 300, | |
| "temperature": 0.9 | |
| } | |
| cohere_response = requests.post( | |
| COHERE_API_URL, | |
| json=cohere_payload, | |
| headers=headers, | |
| timeout=60 | |
| ) | |
| if cohere_response.status_code != 200: | |
| print(f"Error: Cohere API response failed: {cohere_response.text}") | |
| return jsonify({"error": "Failed to generate questions"}), 500 | |
| raw_text = _extract_text_v2(cohere_response.json()) | |
| if not raw_text: | |
| print("Error: No questions text returned by Cohere Chat API.") | |
| return jsonify({"error": "No questions generated"}), 500 | |
| # --------------------------------------------------------------------- | |
| # Extract raw text and parse questions | |
| structured_questions = parse_questions(raw_text) | |
| return jsonify({"questions": structured_questions}), 200 | |
| except Exception as e: | |
| print(f"Critical Error: {e}") | |
| return jsonify({"error": "An error occurred while generating questions"}), 500 | |
| def parse_questions(response_text): | |
| # Split the text into individual question blocks | |
| question_blocks = response_text.split("\n\n") | |
| questions = [] | |
| # Process each question block | |
| for block in question_blocks: | |
| print("\nProcessing Block:", block) # Debug: Log each question block | |
| # Split the block into lines | |
| lines = block.strip().split("\n") | |
| print("Split Lines:", lines) # Debug: Log split lines of the block | |
| # Ensure the block contains a question | |
| if len(lines) < 2: | |
| print("Skipping Invalid Block") # Debug: Log invalid blocks | |
| continue | |
| # Extract the question text | |
| question_line = lines[0] | |
| question_text = question_line.split(". ", 1)[1] if ". " in question_line else question_line | |
| print("Question Text:", question_text) # Debug: Log extracted question text | |
| # Extract the options and find the correct answer | |
| options = [] | |
| correct_answer_letter = None | |
| for line in lines[1:]: | |
| line = line.strip() | |
| # Handle A., B., C., D. and also a) / A) formats | |
| if line.lower().startswith("correct answer:"): | |
| correct_answer_letter = line.split(":")[-1].strip() | |
| continue | |
| match = re.match(r"^(?:[a-dA-D][\).]?\s)?(.+)$", line) | |
| if match: | |
| option_text = match.group(1).strip() | |
| # We already handled "Correct answer:" above, so only options get appended | |
| if not line.lower().startswith("correct answer:"): | |
| options.append(option_text) | |
| print("Extracted Options:", options) # Debug: Log extracted options | |
| print("Correct Answer Letter:", correct_answer_letter) # Debug: Log the correct answer letter | |
| # Map the correct answer text | |
| correct_answer_text = "" | |
| if correct_answer_letter: | |
| option_index = ord(correct_answer_letter.upper()) - ord('A') # Convert 'A'→0, 'B'→1, etc. | |
| if 0 <= option_index < len(options): | |
| correct_answer_text = options[option_index] | |
| print("Mapped Correct Answer Text:", correct_answer_text) # Debug: Log mapped answer | |
| # Append the parsed question to the list | |
| if question_text and options: | |
| questions.append({ | |
| "question": question_text, | |
| "options": options, | |
| "answer": correct_answer_text # Use the full answer text | |
| }) | |
| print("\nFinal Questions:", questions) # Debug: Log final parsed questions | |
| return questions | |
| # ---------- Standalone (local testing) ---------- | |
| if __name__ == '__main__': | |
| app = Flask(__name__) | |
| CORS(app) | |
| app.config["COHERE_API_KEY"] = os.getenv("COHERE_API_KEY", COHERE_API_KEY) | |
| app.register_blueprint(listen_bp, url_prefix='') | |
| app.run(host='0.0.0.0', port=5012, debug=True) | |