Varsha Dewangan
Initial clean commit for project deployment
ee1d4aa
import os
import base64
import json
import numpy as np
import cv2 # For image manipulation and plotting segmentation results
import pickle # To load vocabulary (for captioning internal loading)
import logging
from io import BytesIO
import sys
# import jsonify # This was the missing import causing issues!
import face_recognition # For facial recognition tasks
import torch
from werkzeug.utils import secure_filename
from flask import Flask, render_template, request, redirect, url_for, session, flash, g, jsonify # Corrected import: added jsonify
from flask_sqlalchemy import SQLAlchemy
from werkzeug.security import generate_password_hash, check_password_hash
from functools import wraps
from PIL import Image # Used for both image processing and face_recognition
# Add the 'src' directory to Python's path so we can import from it.
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), 'src')))
# IMPORTANT: Explicitly import the CLASSES directly into the __main__ scope.
# This ensures their definitions are available to torch.load when it attempts
# to deserialize pickled objects (like COCOVocabulary or ImageCaptioningModel
# instances) that might have been saved with a __main__ module reference.
from src.data_preprocessing import COCOVocabulary
from src.model import ImageCaptioningModel
# Now import the necessary functions and modules from your project
from src.inference_api import generate_caption_for_image # Your existing captioning function
from src.utils import get_logger # Your existing logger utility
# Import YOLO for segmentation - adapted from your file.py
try:
from ultralytics import YOLO
# Logger initialization moved here to ensure it's after all necessary imports
logger = get_logger(__name__)
except ImportError:
logger = get_logger(__name__)
logger.error("ultralytics library not found. Please install it: pip install ultralytics")
YOLO = None # Set to None if import fails
# --- Flask App Setup ---
app = Flask(__name__)
# --- Configuration ---
# Strong secret key for session management (IMPORTANT: Change this in production!)
app.config['SECRET_KEY'] = os.urandom(24)
# SQLite database for users
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///users.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
# Define the folder to store uploaded images temporarily within the static directory
UPLOAD_FOLDER = os.path.join('static', 'uploads')
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif'}
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
# Ensure the upload folder exists
os.makedirs(os.path.join(app.root_path, UPLOAD_FOLDER), exist_ok=True)
logger.info(f"Upload folder '{UPLOAD_FOLDER}' ensured at {os.path.join(app.root_path, UPLOAD_FOLDER)}")
# --- Database Initialization ---
db = SQLAlchemy(app)
# Suppress some logging for cleaner output, but keep our custom prints
logging.getLogger("werkzeug").setLevel(logging.ERROR)
# --- Database Model (from auth_app.py) ---
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
email = db.Column(db.String(120), unique=True, nullable=False)
password_hash = db.Column(db.String(256), nullable=False)
# Store face encodings as JSON string of a list of floats (numpy arrays are not directly JSON serializable)
face_encodings_json = db.Column(db.Text, nullable=True)
def __repr__(self):
return f'<User {self.email}>'
# Create database tables if they don't exist
with app.app_context():
db.create_all()
print("Database tables created/checked.") # Kept print from your auth_app.py
# --- Global Segmentation Model Loading ---
# Captioning model is assumed to be loaded/handled by generate_caption_for_image internally.
segmentation_model_yolo = None
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
logger.info(f"Using device for models: {device}")
try:
if YOLO: # Only try to load if ultralytics import was successful
segmentation_model_yolo = YOLO('yolov8x-seg.pt') # YOLOv8x-seg is a large segmentation model
segmentation_model_yolo.to(device) # Move model to appropriate device
logger.info("Segmentation Model (YOLOv8x-seg) loaded successfully.")
else:
logger.warning("YOLO library not available, skipping segmentation model loading.")
except Exception as e:
logger.critical(f"Error loading Segmentation Model (YOLOv8x-seg): {e}", exc_info=True)
segmentation_model_yolo = None
# --- Helper Functions for Facial Recognition (Copied directly from your working auth_app.py) ---
def get_face_encoding_from_image(image_data_b64):
"""
Decodes base64 image data, finds faces, and returns the first face's encoding.
Returns None if no face is found or on error.
"""
try:
print(f"Processing image data of length: {len(image_data_b64)}") # From your auth_app.py
# Handle both formats: with and without data URL prefix
if ',' in image_data_b64:
# Remove data URL prefix (e.g., "data:image/jpeg;base64,")
image_data_clean = image_data_b64.split(',')[1]
else:
image_data_clean = image_data_b64
# Add padding if needed (base64 strings must be multiples of 4)
missing_padding = len(image_data_clean) % 4
if missing_padding:
image_data_clean += '=' * (4 - missing_padding)
try:
image_bytes = base64.b64decode(image_data_clean)
except Exception as decode_error:
print(f"Base64 decode error: {decode_error}") # From your auth_app.py
return None
print(f"Decoded image bytes length: {len(image_bytes)}") # From your auth_app.py
# Open and convert image
try:
img = Image.open(BytesIO(image_bytes))
print(f"Image opened successfully. Format: {img.format}, Size: {img.size}, Mode: {img.mode}") # From your auth_app.py
# Convert to RGB if needed
if img.mode != 'RGB':
img = img.convert('RGB')
print(f"Converted image to RGB mode") # From your auth_app.py
except Exception as img_error:
print(f"Image opening/conversion error: {img_error}") # From your auth_app.py
return None
# Convert PIL Image to numpy array (face_recognition expects numpy array)
img_np = np.array(img)
print(f"Numpy array shape: {img_np.shape}") # From your auth_app.py
# Find face encodings
try:
face_locations = face_recognition.face_locations(img_np)
print(f"Found {len(face_locations)} face location(s)") # From your auth_app.py
if len(face_locations) == 0:
print("No faces detected in the image") # From your auth_app.py
return None
face_encodings = face_recognition.face_encodings(img_np, face_locations)
print(f"Generated {len(face_encodings)} face encoding(s)") # From your auth_app.py
if len(face_encodings) > 0:
encoding = face_encodings[0]
print(f"Face encoding shape: {encoding.shape}") # From your auth_app.py
return encoding.tolist() # Convert numpy array to list for JSON serialization
else:
print("No face encodings generated despite face locations found") # From your auth_app.py
return None
except Exception as face_error:
print(f"Face recognition processing error: {face_error}") # From your auth_app.py
return None
except Exception as e:
print(f"General error processing image for face encoding: {e}") # From your auth_app.py
return None
def compare_face_encoding_to_stored(live_encoding, stored_encodings_json):
"""
Compares a live face encoding to a list of stored encodings for a user.
Returns True if a match is found, False otherwise.
"""
if not live_encoding:
print("Live encoding is None, cannot compare.") # From your auth_app.py
return False
if not stored_encodings_json:
print("Stored encodings JSON is None, cannot compare.") # From your auth_app.py
return False
try:
# Convert JSON string back to a list of numpy arrays
stored_encodings_list = json.loads(stored_encodings_json)
if not stored_encodings_list:
print("No stored encodings found in JSON") # From your auth_app.py
return False
stored_encodings = [np.array(e) for e in stored_encodings_list]
print(f"Comparing against {len(stored_encodings)} stored encodings") # From your auth_app.py
# Compare the live encoding against all stored encodings for this user
# tolerance: lower value means stricter match (0.6 is common default)
matches = face_recognition.compare_faces(stored_encodings, np.array(live_encoding), tolerance=0.6)
match_found = True in matches
print(f"Face comparison result: {match_found}. Matches: {matches}") # From your auth_app.py
return match_found
except Exception as e:
print(f"Error comparing face encodings: {e}") # From your auth_app.py
return False
# --- Segmentation Helper Functions (from web_app.py, adapted from file.py) ---
def calculate_segmentation_metrics(results, segmentation_model_ref):
"""
Calculates basic segmentation metrics (detected objects).
Adapted to remove Streamlit dependencies and reliance on mock GT.
"""
metrics = {
'detected_objects': [],
'num_objects': 0,
'status': 'Processed',
'error': None
}
if not segmentation_model_ref:
metrics['error'] = "Segmentation model not loaded."
metrics['status'] = "Error: Segmentation model unavailable."
return metrics
if not results or results[0].masks is None or len(results[0].masks) == 0:
metrics['status'] = "No objects detected."
return metrics
try:
detected_objects_info = []
for r_box in results[0].boxes.data.tolist():
class_id = int(r_box[5])
confidence = round(r_box[4], 2)
# Ensure class_id exists in model.names
class_name = segmentation_model_ref.names.get(class_id, f"Class {class_id}")
detected_objects_info.append(f"{class_name} (Conf: {confidence})")
metrics['detected_objects'] = detected_objects_info
metrics['num_objects'] = len(detected_objects_info)
except Exception as e:
metrics['error'] = f"Metric calculation failed: {str(e)}"
metrics['status'] = "Error during metric calculation."
logger.error(f"Metric calculation failed: {e}", exc_info=True)
return metrics
def perform_segmentation(image_path, model_ref, upload_folder, filename_stem):
"""
Performs segmentation on an image and returns the URL of the segmented image
and a dictionary of metrics.
"""
segmented_image_url = None
metrics = {}
if not model_ref:
metrics = {'error': "Segmentation model not loaded."}
return segmented_image_url, metrics
try:
img_pil = Image.open(image_path).convert('RGB')
img_np = np.array(img_pil) # Convert to NumPy array
img_cv2 = cv2.cvtColor(img_np, cv2.COLOR_RGB2BGR) # YOLO expects BGR
# Perform inference
results = model_ref(img_cv2, verbose=False) # verbose=False suppresses console output
if results and results[0].masks is not None and len(results[0].masks) > 0:
# Plot results directly onto the image
annotated_image = results[0].plot() # This returns a numpy array (BGR)
# Convert BGR (OpenCV default) to RGB for PIL and saving
annotated_image_rgb = cv2.cvtColor(annotated_image, cv2.COLOR_BGR2RGB)
img_segmented_pil = Image.fromarray(annotated_image_rgb)
# Save the segmented image
segmented_filename = f"segmented_{filename_stem}.png" # Ensure .png extension for segmented output
segmented_filepath = os.path.join(upload_folder, segmented_filename)
img_segmented_pil.save(segmented_filepath)
segmented_image_url = url_for('static', filename=f'uploads/{segmented_filename}')
logger.info(f"Segmented image saved to: {segmented_filepath}")
# Calculate and return metrics
metrics = calculate_segmentation_metrics(results, model_ref)
metrics['status'] = "Segmentation successful."
else:
metrics['status'] = "No objects detected for segmentation."
logger.info(f"No objects detected for segmentation in {image_path}.")
except Exception as e:
metrics['error'] = str(e)
metrics['status'] = "Error during segmentation processing."
logger.critical(f"Error in perform_segmentation for {image_path}: {e}", exc_info=True)
return segmented_image_url, metrics
# --- Helper for file extension check ---
def allowed_file(filename):
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
# --- Before/After Request Hooks ---
@app.before_request
def load_logged_in_user():
user_id = session.get('user_id')
if user_id is None:
g.user = None
else:
g.user = User.query.get(user_id)
# --- Authentication Decorator ---
def login_required(view):
@wraps(view)
def wrapped_view(**kwargs):
if g.user is None:
flash("Please log in to access this page.", "info")
return redirect(url_for('auth_page'))
return view(**kwargs)
return wrapped_view
# --- Routes (Combined from both previous apps, authentication parts use print for logs) ---
# Authentication Page Route (Root)
@app.route('/')
def auth_page():
"""Serves the authentication HTML page. Redirects to main_app if logged in."""
if 'user_id' in session: # Use session directly as in your original auth_app.py
user = User.query.get(session['user_id'])
if user:
# If already logged in, redirect to main_app
print(f"User {user.email} already logged in, redirecting to main_app.")
return redirect(url_for('main_app'))
else:
session.pop('user_id', None) # Clear invalid session
print("Invalid user_id in session, redirecting to auth_page.")
return redirect(url_for('auth_page'))
print("Serving auth.html for login/registration.")
return render_template('auth.html')
@app.route('/register', methods=['POST'])
def register():
"""Handles traditional email/password registration."""
email = request.form['email'].strip() # Use .strip() to remove whitespace
password = request.form['password']
print(f"Received traditional registration request for: {email}") # Kept print from your auth_app.py
if not email or not password:
print("Error: Email or password missing for traditional registration.") # From auth_app.py
return jsonify({'success': False, 'message': 'Email and password are required.'}), 400
import re # Make sure re is imported
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(email_pattern, email):
print(f"Error: Invalid email format: {email}")
return jsonify({'success': False, 'message': 'Please enter a valid email address.'}), 400
if len(password) < 6:
print("Error: Password too short.") # From auth_app.py
return jsonify({'success': False, 'message': 'Password must be at least 6 characters long.'}), 400
existing_user = User.query.filter_by(email=email).first()
if existing_user:
print(f"Error: Email {email} already registered.") # From auth_app.py
return jsonify({'success': False, 'message': 'Email already registered.'}), 409
hashed_password = generate_password_hash(password)
new_user = User(email=email, password_hash=hashed_password)
try:
db.session.add(new_user)
db.session.commit()
print(f"Traditional registration successful for: {email}") # From auth_app.py
return jsonify({'success': True, 'message': 'Registration successful. You can now log in.'})
except Exception as e:
db.session.rollback()
print(f"Database error during traditional registration: {e}") # From auth_app.py
return jsonify({'success': False, 'message': 'Database error during registration.'}), 500
@app.route('/login', methods=['POST'])
def login():
"""Handles traditional email/password login."""
email = request.form['email'].strip() # Use .strip()
password = request.form['password']
print(f"Received traditional login request for: {email}") # From auth_app.py
user = User.query.filter_by(email=email).first()
if user and check_password_hash(user.password_hash, password):
session['user_id'] = user.id
print(f"Traditional login successful for: {email}") # From auth_app.py
return jsonify({'success': True, 'message': 'Login successful.'})
else:
print(f"Traditional login failed for: {email}") # From auth_app.py
return jsonify({'success': False, 'message': 'Invalid email or password.'}), 401
@app.route('/face_register', methods=['POST'])
def face_register():
"""
Receives face images for registration.
Expects email, password, and a list of base64 image data from the frontend.
"""
try:
if not request.is_json:
print("Error: Request is not JSON for face_register") # From auth_app.py
return jsonify({'success': False, 'message': 'Invalid request format. JSON expected.'}), 400
data = request.get_json()
if not data:
print("Error: No JSON data received for face_register") # From auth_app.py
return jsonify({'success': False, 'message': 'No data received.'}), 400
email = data.get('email', '').strip() # Use .strip()
password = data.get('password', '')
image_data_list = data.get('images', [])
print(f"Received face registration request for: {email} with {len(image_data_list)} images.") # From auth_app.py
if not email or not password or not image_data_list:
print("Error: Missing email, password or image data for face registration.") # From auth_app.py
return jsonify({'success': False, 'message': 'Email, password, and face images are required.'}), 400
import re # Make sure re is imported
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(email_pattern, email):
print(f"Error: Invalid email format: {email}")
return jsonify({'success': False, 'message': 'Please enter a valid email address.'}), 400
if len(password) < 6:
print("Error: Password too short for face registration.") # From auth_app.py
return jsonify({'success': False, 'message': 'Password must be at least 6 characters long.'}), 400
existing_user = User.query.filter_by(email=email).first()
if existing_user:
print(f"Error: Email {email} already registered for face registration.") # From auth_app.py
return jsonify({'success': False, 'message': 'Email already registered.'}), 409
all_encodings = []
for i, img_b64 in enumerate(image_data_list):
print(f"Processing image {i+1}/{len(image_data_list)} for face encoding...") # From auth_app.py
if not img_b64:
print(f"Warning: Image {i+1} is empty, skipping.") # From auth_app.py
continue
encoding = get_face_encoding_from_image(img_b64)
if encoding:
all_encodings.append(encoding)
print(f"Successfully processed image {i+1}") # From auth_app.py
else:
print(f"Failed to process image {i+1} - no face detected or processing error.") # From auth_app.py
if not all_encodings:
print("Error: No detectable faces found in any of the provided images for face registration.") # From auth_app.py
return jsonify({'success': False, 'message': 'No detectable faces in the provided images. Please try again with clearer images showing your face clearly.'}), 400
hashed_password = generate_password_hash(password)
face_encodings_json = json.dumps(all_encodings)
new_user = User(email=email, password_hash=hashed_password, face_encodings_json=face_encodings_json)
try:
db.session.add(new_user)
db.session.commit()
print(f"Face registration successful for: {email}. Stored {len(all_encodings)} encodings.") # From auth_app.py
return jsonify({'success': True, 'message': f'Face registration successful with {len(all_encodings)} face samples. You can now log in with your face.'})
except Exception as db_error:
db.session.rollback()
print(f"Database error during face registration: {db_error}") # From auth_app.py
return jsonify({'success': False, 'message': 'Database error during registration. Please try again.'}), 500
except Exception as e:
print(f"Unexpected error during face registration: {e}") # From auth_app.py
import traceback
traceback.print_exc()
return jsonify({'success': False, 'message': 'An unexpected error occurred. Please try again.'}), 500
@app.route('/face_login', methods=['POST'])
def face_login():
"""
Receives a single live face image for login.
Compares it against all registered users' face encodings.
"""
try:
if not request.is_json:
print("Error: Request is not JSON for face_login") # From auth_app.py
return jsonify({'success': False, 'message': 'Invalid request format. JSON expected.'}), 400
data = request.get_json()
if not data:
print("Error: No JSON data received for face_login") # From auth_app.py
return jsonify({'success': False, 'message': 'No data received.'}), 400
image_data = data.get('image')
print("Received face login request.") # From auth_app.py
if not image_data:
print("Error: Face image required for login.") # From auth_app.py
return jsonify({'success': False, 'message': 'Face image required for login.'}), 400
live_encoding = get_face_encoding_from_image(image_data)
if not live_encoding:
print("No face detected in the live image for login.") # From auth_app.py
return jsonify({'success': False, 'message': 'No face detected. Please position your face clearly in the camera and ensure good lighting.'}), 400
users = User.query.filter(User.face_encodings_json.isnot(None)).all()
print(f"Attempting to match against {len(users)} registered users with face data...") # From auth_app.py
for user in users:
if user.face_encodings_json:
print(f"Comparing live encoding with stored encodings for user: {user.email}") # From auth_app.py
if compare_face_encoding_to_stored(live_encoding, user.face_encodings_json):
session['user_id'] = user.id
print(f"Face login successful for user: {user.email}") # From auth_app.py
return jsonify({'success': True, 'message': f'Welcome back, {user.email}!'})
print("Face not recognized against any registered user.") # From auth_app.py
return jsonify({'success': False, 'message': 'Face not recognized. Please try again or use email/password login.'}), 401
except Exception as e:
print(f"Unexpected error during face login: {e}") # From auth_app.py
import traceback
traceback.print_exc()
return jsonify({'success': False, 'message': 'An error occurred during face login. Please try again.'}), 500
@app.route('/logout')
def logout():
"""Logs out the current user."""
print(f"User {session.get('user_id')} logging out.") # From auth_app.py
session.pop('user_id', None)
flash("You have been logged out.", "info") # Added for consistency with other parts
return redirect(url_for('auth_page'))
# Main application route (protected)
@app.route('/main_app')
@login_required
def main_app():
"""
Renders the main image processing page only if the user is authenticated.
"""
return render_template('index.html',
caption=None,
uploaded_image_url=None,
segmentation_image_url=None,
segmentation_metrics={})
# Predict route (protected)
@app.route('/predict', methods=['POST'])
@login_required
def predict():
"""
Handles image upload, performs captioning and segmentation,
and renders the results.
"""
logger.info("Received request to /predict.")
# Initialize variables for the template
generated_caption = "N/A"
uploaded_image_url = None
segmentation_image_url = None
segmentation_metrics = {}
if 'file' not in request.files:
flash('No file part in the request.', 'error')
logger.warning("No file part.")
return redirect(request.url)
file = request.files['file']
if file.filename == '':
flash('No selected file.', 'error')
logger.warning("Empty filename.")
return redirect(request.url)
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
# Extract filename stem (without extension) for segmented image naming
filename_stem, file_ext = os.path.splitext(filename)
# Construct the full path to save the original image
original_filepath = os.path.join(app.root_path, app.config['UPLOAD_FOLDER'], filename)
file.save(original_filepath)
uploaded_image_url = url_for('static', filename=f'uploads/{filename}')
logger.info(f"Original image saved to: {original_filepath}")
# --- Perform Image Captioning (using your provided correct method) ---
try:
logger.info(f"Starting caption generation for {original_filepath}...")
# This is your original working call for captioning
generated_caption = generate_caption_for_image(original_filepath)
logger.info(f"Caption generated: '{generated_caption}'")
flash("Image caption generated successfully!", 'success')
except FileNotFoundError:
flash(f"Error: Captioning model or vocabulary file not found for {original_filepath}. Check server logs.", 'error')
logger.error(f"Captioning model/vocab not found during inference for {original_filepath}.")
generated_caption = "Error: Captioning model/vocab not found."
except RuntimeError as re:
flash(f"Error: Captioning model not initialized or device issue. Check server logs.", 'error')
logger.critical(f"Captioning model not initialized: {re}", exc_info=True)
generated_caption = "Error: Captioning service unavailable."
except Exception as e:
flash(f"An unexpected error occurred during caption generation. Check server logs.", 'error')
logger.critical(f"Error generating caption for {original_filepath}: {e}", exc_info=True)
generated_caption = "Error: Could not generate caption."
# --- Perform Image Segmentation ---
if segmentation_model_yolo:
logger.info(f"Starting segmentation for {original_filepath} using YOLO...")
segmentation_image_url, segmentation_metrics = perform_segmentation(
image_path=original_filepath,
model_ref=segmentation_model_yolo,
upload_folder=os.path.join(app.root_path, app.config['UPLOAD_FOLDER']),
filename_stem=filename_stem
)
if 'error' in segmentation_metrics and segmentation_metrics['error']:
flash(f"Segmentation Error: {segmentation_metrics['error']}", 'error')
elif segmentation_image_url:
flash("Image segmentation performed successfully!", 'success')
else:
flash(f"Segmentation: {segmentation_metrics.get('status', 'No specific status.')}", 'info')
else:
flash("Segmentation model not initialized. Ensure 'ultralytics' is installed and model loaded.", 'error')
logger.error("Segmentation model (YOLO) is not available.")
segmentation_metrics['error'] = "Segmentation service unavailable."
# Render the template with results for both tasks
return render_template('index.html',
caption=generated_caption,
uploaded_image_url=uploaded_image_url,
segmentation_image_url=segmentation_image_url,
segmentation_metrics=segmentation_metrics)
else:
flash('Allowed image types are png, jpg, jpeg, gif.', 'error')
logger.warning(f"Disallowed file type uploaded: {file.filename}")
return redirect(request.url)
if __name__ == '__main__':
logger.info("Starting Flask web application with integrated auth...")
# This app will run on port 5000, handling both auth and image processing.
# app.run(debug=True, host='0.0.0.0', port=5000)