py-learn-backend / listen.py
Oviya
add listen
38efa1b
# listen.py
from flask import Flask, Blueprint, jsonify, send_file, abort, request, send_from_directory
from flask_cors import CORS
from moviepy.editor import VideoFileClip
from google.cloud import speech
import os
print(f"GOOGLE_APPLICATION_CREDENTIALS: {os.getenv('GOOGLE_APPLICATION_CREDENTIALS')}")
import uuid
import requests
from pydub import AudioSegment
import ffmpeg
import re
import io # for streaming S3 bytes in HF/AWS mode
import json # <-- added for JSON creds parsing
# Optional (only used in AWS mode)
try:
import boto3
from botocore.exceptions import BotoCoreError, ClientError
except Exception:
boto3 = None
BotoCoreError = ClientError = Exception
# ---------- Blueprint ----------
listen_bp = Blueprint("listen", __name__)
# ---------------------- storage mode helpers ----------------------
def _is_aws_video_mode() -> bool:
"""
Switch to S3 on Hugging Face / prod. Local stays on disk.
"""
if os.getenv("USE_AWS_VIDEO", "0") == "1":
return True
if os.getenv("SPACE_ID"): # set on Hugging Face Spaces
return True
if os.getenv("ENV", "dev").lower() == "prod":
return True
return False
def _s3_clients():
if boto3 is None:
raise RuntimeError("boto3 is required in AWS video mode but not available")
region = os.getenv("AWS_DEFAULT_REGION", "eu-north-1")
s3 = boto3.client("s3", region_name=region)
return s3
def _video_s3_bucket():
bucket = os.getenv("S3_BUCKET_NAME")
if not bucket:
raise RuntimeError("S3_BUCKET_NAME is not set")
return bucket
def _video_s3_key(filename: str) -> str:
# Prefix under which listen.py stores videos in the same bucket
prefix = os.getenv("LISTEN_S3_PREFIX", "listen")
prefix = prefix.strip().strip("/")
return f"{prefix}/{filename}"
# ---------- writable working directories ----------
# Base working dir: /tmp on HF/AWS; local stays under ./static (or override via LISTEN_WORKDIR)
_BASE_WORKDIR = os.getenv(
"LISTEN_WORKDIR",
"/tmp/listen" if _is_aws_video_mode() else os.path.abspath("static")
)
VIDEO_FOLDER = os.path.join(_BASE_WORKDIR, "videos")
AUDIO_FOLDER = os.path.join(_BASE_WORKDIR, "audio")
TRANSCRIPT_FOLDER = os.path.join(_BASE_WORKDIR, "transcripts")
# Ensure directories exist (with hard fallback to /tmp if needed)
for _pname in ("videos", "audio", "transcripts"):
_p = os.path.join(_BASE_WORKDIR, _pname)
try:
os.makedirs(_p, exist_ok=True)
except Exception:
_fallback_base = "/tmp/listen"
os.makedirs(os.path.join(_fallback_base, _pname), exist_ok=True)
if _pname == "videos":
VIDEO_FOLDER = os.path.join(_fallback_base, "videos")
elif _pname == "audio":
AUDIO_FOLDER = os.path.join(_fallback_base, "audio")
else:
TRANSCRIPT_FOLDER = os.path.join(_fallback_base, "transcripts")
# ---------------- Cohere configuration (migrated to v2 Chat) ----------------
COHERE_API_KEY = os.getenv("COHERE_API_KEY", "")
COHERE_API_URL = 'https://api.cohere.com/v2/chat'
# ---------------------------------------------------------------------------
# --- Google Cloud Speech-to-Text client init (prefers HF secret JSON) ---
def _make_speech_client():
sa_json = os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON")
if sa_json:
try:
info = json.loads(sa_json)
return speech.SpeechClient.from_service_account_info(info)
except Exception as e:
print(f"Failed to parse GOOGLE_APPLICATION_CREDENTIALS_JSON: {e}")
# fall through to default ADC
return speech.SpeechClient()
speech_client = _make_speech_client()
# -------------------------------------------------------------------------
# ------------- Cohere v2 helper (extract text from chat response) -------------
def _extract_text_v2(resp_json: dict) -> str:
"""
Cohere v2 /chat returns:
{ "message": { "content": [ { "type": "text", "text": "..." }, ... ] } }
This pulls the first text block.
"""
msg = resp_json.get("message", {})
content = msg.get("content", [])
for block in content:
if isinstance(block, dict) and block.get("type") == "text":
text = (block.get("text") or "").strip()
if text:
return text
return ""
# -----------------------------------------------------------------------------
# Convert video to audio
def convert_video_to_audio(video_path, audio_path):
try:
# Using moviepy to extract audio from video
video = VideoFileClip(video_path)
video.audio.write_audiofile(audio_path, codec='mp3')
return audio_path
except Exception as e:
print(f"Error converting video to audio: {str(e)}")
return None
# Re-encode MP3 to ensure proper format
def reencode_mp3(input_audio_path, output_audio_path):
try:
# Using pydub to convert and re-encode MP3 (ensuring correct encoding)
audio = AudioSegment.from_mp3(input_audio_path)
audio.export(output_audio_path, format="mp3", codec="libmp3lame", parameters=["-q:a", "0"])
return output_audio_path
except Exception as e:
print(f"Error re-encoding MP3: {str(e)}")
return None
# Helper function to convert audio to the proper MP3 encoding if necessary
def convert_audio_to_mp3(input_file_path, output_file_path):
"""
Converts the audio file to a valid MP3 format with proper encoding.
"""
try:
ffmpeg.input(input_file_path).output(output_file_path, acodec='libmp3lame', audio_bitrate='128k').run()
return True
except Exception as e:
print(f"Error during audio conversion: {e}")
return False
# Function to compress audio dynamically
def compress_audio(input_file_path, output_file_path, target_bitrate="128k"):
audio = AudioSegment.from_file(input_file_path)
audio.export(output_file_path, format="mp3", bitrate=target_bitrate)
return output_file_path
# ---------------------------- Routes (Blueprint) ----------------------------
@listen_bp.route('/', methods=['GET'])
def home():
return "Welcome to the Flask app! The server is running."
@listen_bp.route('/videos', methods=['GET'])
def list_videos():
"""
List available videos for users to watch.
"""
# If you maintain a VIDEOS list elsewhere, return it here.
# Returning empty list so the endpoint stays valid.
return jsonify([]), 200
@listen_bp.route('/videos/<filename>')
def serve_video(filename):
"""
Local: serve file from disk.
HF/AWS: fetch object from S3 and stream bytes (no redirect).
"""
if _is_aws_video_mode():
try:
s3 = _s3_clients()
bucket = _video_s3_bucket()
key = _video_s3_key(filename)
obj = s3.get_object(Bucket=bucket, Key=key)
data = obj["Body"].read()
return send_file(
io.BytesIO(data),
mimetype="video/mp4",
download_name=filename,
as_attachment=False
)
except (BotoCoreError, ClientError, Exception) as e:
print(f"S3 fetch failed for {filename}: {e}")
abort(404)
# Local
video_path = os.path.join(VIDEO_FOLDER, filename)
if not os.path.exists(video_path):
print(f"Video file not found: {filename}")
abort(404)
return send_file(video_path, mimetype='video/mp4')
@listen_bp.route('/upload-video', methods=['POST'])
def upload_video():
"""
Local: save to static/videos or /tmp/listen/videos (depending on mode).
HF/AWS: upload to S3 (no local original).
"""
print("Received upload request.")
if 'video' not in request.files:
print("No video file provided in the request.")
return jsonify({'error': 'No video file provided'}), 400
video = request.files['video']
if video.filename == '':
print("Empty filename detected.")
return jsonify({'error': 'No selected file'}), 400
try:
filename = f"{uuid.uuid4()}.mp4"
if _is_aws_video_mode():
try:
s3 = _s3_clients()
bucket = _video_s3_bucket()
key = _video_s3_key(filename)
s3.put_object(
Bucket=bucket,
Key=key,
Body=video.stream.read(),
ContentType="video/mp4"
)
print(f"Uploaded to S3: s3://{bucket}/{key}")
except (BotoCoreError, ClientError, Exception) as e:
print(f"S3 upload error: {e}")
return jsonify({'error': 'Failed to upload to S3'}), 500
else:
# Save locally
video_path = os.path.join(VIDEO_FOLDER, filename)
print(f"Saving video: {filename}")
video.save(video_path)
print(f"Video saved successfully at {video_path}")
return jsonify({'message': 'Video uploaded successfully!', 'filename': filename}), 200
except Exception as e:
print(f"Error saving video: {str(e)}")
return jsonify({'error': 'Failed to save video'}), 500
@listen_bp.route('/generate-questions-dynamicvideo', methods=['POST'])
def generate_questions():
try:
data = request.json
video_filename = data.get('filename')
if not video_filename:
print("Error: No filename provided in request.")
return jsonify({"error": "Filename is required"}), 400
# Resolve a local readable path for processing
video_path = os.path.join(VIDEO_FOLDER, video_filename)
if _is_aws_video_mode():
# Download object bytes to a local working file path
try:
s3 = _s3_clients()
bucket = _video_s3_bucket()
key = _video_s3_key(video_filename)
obj = s3.get_object(Bucket=bucket, Key=key)
data_bytes = obj["Body"].read()
with open(video_path, "wb") as f:
f.write(data_bytes)
except (BotoCoreError, ClientError, Exception) as e:
print(f"S3 download error for {video_filename}: {e}")
return jsonify({"error": "Video file not found"}), 404
else:
if not os.path.exists(video_path):
print(f"Error: Video file {video_filename} not found at {video_path}")
return jsonify({"error": "Video file not found"}), 404
print(f"Processing video: {video_filename}")
# Convert video to audio
audio_filename = f"{uuid.uuid4()}.mp3"
audio_path = os.path.join(AUDIO_FOLDER, audio_filename)
if not convert_video_to_audio(video_path, audio_path):
print("Error: Video to audio conversion failed.")
return jsonify({"error": "Failed to convert video to audio"}), 500
# Transcribe audio using Google Cloud Speech-to-Text
with open(audio_path, 'rb') as audio_file:
audio_content = audio_file.read()
audio = speech.RecognitionAudio(content=audio_content)
config = speech.RecognitionConfig(
encoding=speech.RecognitionConfig.AudioEncoding.MP3,
sample_rate_hertz=16000,
language_code="en-US",
)
response = speech_client.recognize(config=config, audio=audio)
transcripts = [result.alternatives[0].transcript for result in response.results]
if not transcripts:
print("Error: No transcription results found.")
return jsonify({"error": "No transcription results found"}), 500
transcription_text = " ".join(transcripts)
print(f"Transcription successful: {transcription_text[:200]}...") # Print first 200 chars
# ---------------- Cohere v2 Chat call (minimal change) ----------------
headers = {
"Authorization": f"Bearer {COHERE_API_KEY}",
"Content-Type": "application/json"
}
prompt_text = (
"Generate exactly three multiple-choice questions based on this text:\n"
f"{transcription_text}\n\n"
"Rules:\n"
"- Each question starts with a number and a period (e.g., 1.)\n"
"- Each question has exactly four options labeled A., B., C., and D.\n"
"- After the options, add a line 'Correct answer: <A|B|C|D>'\n"
"- Output plain text only."
)
cohere_payload = {
"model": "command-r-08-2024",
"messages": [
{"role": "user", "content": prompt_text}
],
"max_tokens": 300,
"temperature": 0.9
}
cohere_response = requests.post(
COHERE_API_URL,
json=cohere_payload,
headers=headers,
timeout=60
)
if cohere_response.status_code != 200:
print(f"Error: Cohere API response failed: {cohere_response.text}")
return jsonify({"error": "Failed to generate questions"}), 500
raw_text = _extract_text_v2(cohere_response.json())
if not raw_text:
print("Error: No questions text returned by Cohere Chat API.")
return jsonify({"error": "No questions generated"}), 500
# ---------------------------------------------------------------------
# Extract raw text and parse questions
structured_questions = parse_questions(raw_text)
return jsonify({"questions": structured_questions}), 200
except Exception as e:
print(f"Critical Error: {e}")
return jsonify({"error": "An error occurred while generating questions"}), 500
def parse_questions(response_text):
# Split the text into individual question blocks
question_blocks = response_text.split("\n\n")
questions = []
# Process each question block
for block in question_blocks:
print("\nProcessing Block:", block) # Debug: Log each question block
# Split the block into lines
lines = block.strip().split("\n")
print("Split Lines:", lines) # Debug: Log split lines of the block
# Ensure the block contains a question
if len(lines) < 2:
print("Skipping Invalid Block") # Debug: Log invalid blocks
continue
# Extract the question text
question_line = lines[0]
question_text = question_line.split(". ", 1)[1] if ". " in question_line else question_line
print("Question Text:", question_text) # Debug: Log extracted question text
# Extract the options and find the correct answer
options = []
correct_answer_letter = None
for line in lines[1:]:
line = line.strip()
# Handle A., B., C., D. and also a) / A) formats
if line.lower().startswith("correct answer:"):
correct_answer_letter = line.split(":")[-1].strip()
continue
match = re.match(r"^(?:[a-dA-D][\).]?\s)?(.+)$", line)
if match:
option_text = match.group(1).strip()
# We already handled "Correct answer:" above, so only options get appended
if not line.lower().startswith("correct answer:"):
options.append(option_text)
print("Extracted Options:", options) # Debug: Log extracted options
print("Correct Answer Letter:", correct_answer_letter) # Debug: Log the correct answer letter
# Map the correct answer text
correct_answer_text = ""
if correct_answer_letter:
option_index = ord(correct_answer_letter.upper()) - ord('A') # Convert 'A'→0, 'B'→1, etc.
if 0 <= option_index < len(options):
correct_answer_text = options[option_index]
print("Mapped Correct Answer Text:", correct_answer_text) # Debug: Log mapped answer
# Append the parsed question to the list
if question_text and options:
questions.append({
"question": question_text,
"options": options,
"answer": correct_answer_text # Use the full answer text
})
print("\nFinal Questions:", questions) # Debug: Log final parsed questions
return questions
# ---------- Standalone (local testing) ----------
if __name__ == '__main__':
app = Flask(__name__)
CORS(app)
app.config["COHERE_API_KEY"] = os.getenv("COHERE_API_KEY", COHERE_API_KEY)
app.register_blueprint(listen_bp, url_prefix='')
app.run(host='0.0.0.0', port=5012, debug=True)