from flask import Flask, render_template, request, redirect, url_for, flash, send_file, session from werkzeug.utils import secure_filename import cv2 import numpy as np import os from PIL import Image import uuid import json import threading import time app = Flask(__name__) app.secret_key = 'clave_secreta_para_flash' # Configuración de carpetas UPLOAD_FOLDER = 'static/uploads' COMMUNITY_FOLDER = 'static/community' THUMBNAILS_FOLDER = 'static/thumbnails' ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg'} # Crear directorios si no existen for folder in [UPLOAD_FOLDER, COMMUNITY_FOLDER, THUMBNAILS_FOLDER]: os.makedirs(folder, exist_ok=True) app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER def allowed_file(filename): return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS def create_thumbnail(image_path, thumbnail_path, size=(1280, 720)): """Crear miniatura comprimida para vista rápida""" try: with Image.open(image_path) as img: # Calcular dimensiones manteniendo aspecto img.thumbnail(size, Image.Resampling.LANCZOS) # Guardar con compresión alta img.save(thumbnail_path, 'JPEG', quality=70, optimize=True) return True except Exception as e: print(f"Error creando thumbnail: {e}") return False def process_panorama(image_files): imgs = [] for file in image_files: img = cv2.imdecode(np.fromfile(file, dtype=np.uint8), cv2.IMREAD_COLOR) if img is not None: imgs.append(img) if len(imgs) < 2: return None, "Se necesitan al menos 2 imágenes válidas." try: # Try with GPU acceleration stitcher = cv2.Stitcher_create(mode=cv2.Stitcher_PANORAMA) status, pano = stitcher.stitch(imgs) if status != cv2.Stitcher_OK: return None, "No se pudo crear el panorama. Asegúrate de que las imágenes tengan suficiente solapamiento." # Recortar bordes negros pano_rgb = cv2.cvtColor(pano, cv2.COLOR_BGR2RGB) mask = np.any(pano_rgb != [0, 0, 0], axis=2) coords = np.column_stack(np.where(mask)) if coords.size > 0: y_min, x_min = coords.min(axis=0) y_max, x_max = coords.max(axis=0) if y_min <= y_max and x_min <= x_max: cropped = pano_rgb[y_min:y_max+1, x_min:x_max+1] return Image.fromarray(cropped), None return Image.fromarray(pano_rgb), None except Exception as e: # If GPU processing fails, try CPU fallback try: print("Fallo de aceleración por GPU. Intentando procesamiento en CPU...") stitcher = cv2.Stitcher_create(mode=cv2.Stitcher_PANORAMA) status, pano = stitcher.stitch(imgs) if status != cv2.Stitcher_OK: return None, f"No se pudo crear el panorama (CPU): {cv2.Stitcher_err_code_to_str(status)}" # Same processing as above pano_rgb = cv2.cvtColor(pano, cv2.COLOR_BGR2RGB) mask = np.any(pano_rgb != [0, 0, 0], axis=2) coords = np.column_stack(np.where(mask)) if coords.size > 0: y_min, x_min = coords.min(axis=0) y_max, x_max = coords.max(axis=0) if y_min <= y_max and x_min <= x_max: cropped = pano_rgb[y_min:y_max+1, x_min:x_max+1] return Image.fromarray(cropped), None return Image.fromarray(pano_rgb), None except Exception as cpu_e: return None, f"Error al procesar las imágenes (CPU): {str(cpu_e)}" @app.route('/') def index(): return render_template('index.html') @app.route('/community') def community(): """Página de comunidad con vistas tipo YouTube""" community_images = [] for filename in os.listdir(COMMUNITY_FOLDER): if filename.endswith(('.jpg', '.jpeg', '.png')): # Crear thumbnail si no existe thumbnail_name = f"thumb_{filename}" thumbnail_path = os.path.join(THUMBNAILS_FOLDER, thumbnail_name) if not os.path.exists(thumbnail_path): create_thumbnail( os.path.join(COMMUNITY_FOLDER, filename), thumbnail_path ) community_images.append({ 'filename': filename, 'thumbnail': f'thumbnails/{thumbnail_name}', 'full_image': f'community/{filename}' }) return render_template('community.html', images=community_images) @app.route('/viewer/') def panorama_viewer(filename): """Visualizador de panorama cilíndrico""" image_path = f'community/{filename}' return render_template('viewer.html', image_path=image_path, filename=filename) # Dictionary to store ongoing processes by session ID ongoing_processes = {} @app.route('/upload', methods=['POST']) def upload_files(): if 'files[]' not in request.files: flash('No se seleccionaron archivos') return redirect(url_for('index')) files = request.files.getlist('files[]') if not files or files[0].filename == '': flash('No se seleccionaron archivos') return redirect(url_for('index')) # Verificar y guardar archivos temporalmente temp_paths = [] for file in files: if file and allowed_file(file.filename): filename = secure_filename(file.filename) filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) file.save(filepath) temp_paths.append(filepath) if len(temp_paths) < 2: flash('Se necesitan al menos 2 imágenes') return redirect(url_for('index')) # Store the process in session session_id = str(session.get('user_id', uuid.uuid4())) ongoing_processes[session_id] = { 'status': 'processing', 'log': [], 'temp_paths': temp_paths, 'start_time': time.time() } session['user_id'] = session_id # Redirect to waiting page thread = threading.Thread(target=process_background, args=(session_id, temp_paths)) thread.start() return render_template('waiting.html') def process_background(session_id, image_files): """Process panorama in background""" try: ongoing_processes[session_id]['log'].append(f"Iniciando procesamiento de {len(image_files)} imágenes...") result_image, error = process_panorama(image_files) if error: ongoing_processes[session_id]['status'] = 'error' ongoing_processes[session_id]['log'].append(f"Error: {error}") return # Limpiar archivos temporales for path in image_files: try: os.remove(path) ongoing_processes[session_id]['log'].append(f"Archivo temporal eliminado: {path}") except Exception as e: ongoing_processes[session_id]['log'].append(f"Error eliminando archivo temporal: {e}") # Guardar resultado result_filename = f'result_{uuid.uuid4().hex[:8]}.jpg' result_path = os.path.join(app.config['UPLOAD_FOLDER'], result_filename) result_image.save(result_path, 'JPEG', quality=95) ongoing_processes[session_id]['status'] = 'completed' ongoing_processes[session_id]['result_filename'] = result_filename ongoing_processes[session_id]['log'].append(f"Proceso completado. Resultado guardado como: {result_filename}") except Exception as e: ongoing_processes[session_id]['status'] = 'error' ongoing_processes[session_id]['log'].append(f"Error inesperado: {str(e)}") @app.route('/check_progress') def check_progress(): """Check progress of the background process""" session_id = str(session.get('user_id')) if not session_id or session_id not in ongoing_processes: return json.dumps({ 'completed': True, 'redirect_url': url_for('index') }) process = ongoing_processes[session_id] if process['status'] == 'completed': result_filename = process.get('result_filename', '') return json.dumps({ 'completed': True, 'redirect_url': url_for('result', filename=result_filename) }) elif process['status'] == 'error': return json.dumps({ 'completed': True, 'redirect_url': url_for('index') }) # Process is still running log_output = "\n".join(process['log']) return json.dumps({ 'completed': False, 'log': log_output }) @app.route('/result/') def result(filename): """Show the result of panorama creation""" try: file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) if os.path.exists(file_path): # Clean up process tracking for this session session_id = str(session.get('user_id')) if session_id in ongoing_processes: del ongoing_processes[session_id] return render_template('result.html', result_image=f'uploads/{filename}', filename=filename) else: flash('Archivo no encontrado') return redirect(url_for('index')) except Exception as e: flash(f'Error al mostrar el resultado: {str(e)}') return redirect(url_for('index')) @app.route('/share', methods=['POST']) def share_panorama(): result_path = request.form.get('result_path') if not result_path: flash('No hay imagen para compartir') return redirect(url_for('index')) try: # Generar nombre único para la imagen compartida original_filename = os.path.basename(result_path) shared_filename = f'shared_{uuid.uuid4().hex[:8]}.jpg' original_path = os.path.join('static', result_path) shared_path = os.path.join(COMMUNITY_FOLDER, shared_filename) # Copiar imagen a la carpeta de comunidad Image.open(original_path).save(shared_path, 'JPEG', quality=95) # Crear thumbnail para la nueva imagen thumbnail_name = f"thumb_{shared_filename}" thumbnail_path = os.path.join(THUMBNAILS_FOLDER, thumbnail_name) create_thumbnail(shared_path, thumbnail_path) # Eliminar imagen temporal os.remove(original_path) flash('¡Imagen compartida exitosamente!') except Exception as e: flash(f'Error al compartir la imagen: {str(e)}') return redirect(url_for('community')) @app.route('/download/') def download_file(filename): """Descargar archivo de resultado""" try: file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) if os.path.exists(file_path): return send_file(file_path, as_attachment=True, download_name=f'panorama_{filename}') else: flash('Archivo no encontrado') return redirect(url_for('index')) except Exception as e: flash(f'Error al descargar: {str(e)}') return redirect(url_for('index')) if __name__ == '__main__': app.run(debug=True, host='0.0.0.0', port=7860)