Spaces:
Sleeping
Sleeping
| from flask import Flask, render_template, request, redirect, url_for, flash, send_file, session | |
| from werkzeug.utils import secure_filename | |
| import cv2 | |
| import numpy as np | |
| import os | |
| from PIL import Image | |
| import uuid | |
| import json | |
| import threading | |
| import time | |
| app = Flask(__name__) | |
| app.secret_key = 'clave_secreta_para_flash' | |
| # Configuración de carpetas | |
| UPLOAD_FOLDER = 'static/uploads' | |
| COMMUNITY_FOLDER = 'static/community' | |
| THUMBNAILS_FOLDER = 'static/thumbnails' | |
| ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg'} | |
| # Crear directorios si no existen | |
| for folder in [UPLOAD_FOLDER, COMMUNITY_FOLDER, THUMBNAILS_FOLDER]: | |
| os.makedirs(folder, exist_ok=True) | |
| app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER | |
| def allowed_file(filename): | |
| return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS | |
| def create_thumbnail(image_path, thumbnail_path, size=(1280, 720)): | |
| """Crear miniatura comprimida para vista rápida""" | |
| try: | |
| with Image.open(image_path) as img: | |
| # Calcular dimensiones manteniendo aspecto | |
| img.thumbnail(size, Image.Resampling.LANCZOS) | |
| # Guardar con compresión alta | |
| img.save(thumbnail_path, 'JPEG', quality=70, optimize=True) | |
| return True | |
| except Exception as e: | |
| print(f"Error creando thumbnail: {e}") | |
| return False | |
| def process_panorama(image_files): | |
| imgs = [] | |
| for file in image_files: | |
| img = cv2.imdecode(np.fromfile(file, dtype=np.uint8), cv2.IMREAD_COLOR) | |
| if img is not None: | |
| imgs.append(img) | |
| if len(imgs) < 2: | |
| return None, "Se necesitan al menos 2 imágenes válidas." | |
| try: | |
| # Try with GPU acceleration | |
| stitcher = cv2.Stitcher_create(mode=cv2.Stitcher_PANORAMA) | |
| status, pano = stitcher.stitch(imgs) | |
| if status != cv2.Stitcher_OK: | |
| return None, "No se pudo crear el panorama. Asegúrate de que las imágenes tengan suficiente solapamiento." | |
| # Recortar bordes negros | |
| pano_rgb = cv2.cvtColor(pano, cv2.COLOR_BGR2RGB) | |
| mask = np.any(pano_rgb != [0, 0, 0], axis=2) | |
| coords = np.column_stack(np.where(mask)) | |
| if coords.size > 0: | |
| y_min, x_min = coords.min(axis=0) | |
| y_max, x_max = coords.max(axis=0) | |
| if y_min <= y_max and x_min <= x_max: | |
| cropped = pano_rgb[y_min:y_max+1, x_min:x_max+1] | |
| return Image.fromarray(cropped), None | |
| return Image.fromarray(pano_rgb), None | |
| except Exception as e: | |
| # If GPU processing fails, try CPU fallback | |
| try: | |
| print("Fallo de aceleración por GPU. Intentando procesamiento en CPU...") | |
| stitcher = cv2.Stitcher_create(mode=cv2.Stitcher_PANORAMA) | |
| status, pano = stitcher.stitch(imgs) | |
| if status != cv2.Stitcher_OK: | |
| return None, f"No se pudo crear el panorama (CPU): {cv2.Stitcher_err_code_to_str(status)}" | |
| # Same processing as above | |
| pano_rgb = cv2.cvtColor(pano, cv2.COLOR_BGR2RGB) | |
| mask = np.any(pano_rgb != [0, 0, 0], axis=2) | |
| coords = np.column_stack(np.where(mask)) | |
| if coords.size > 0: | |
| y_min, x_min = coords.min(axis=0) | |
| y_max, x_max = coords.max(axis=0) | |
| if y_min <= y_max and x_min <= x_max: | |
| cropped = pano_rgb[y_min:y_max+1, x_min:x_max+1] | |
| return Image.fromarray(cropped), None | |
| return Image.fromarray(pano_rgb), None | |
| except Exception as cpu_e: | |
| return None, f"Error al procesar las imágenes (CPU): {str(cpu_e)}" | |
| def index(): | |
| return render_template('index.html') | |
| def community(): | |
| """Página de comunidad con vistas tipo YouTube""" | |
| community_images = [] | |
| for filename in os.listdir(COMMUNITY_FOLDER): | |
| if filename.endswith(('.jpg', '.jpeg', '.png')): | |
| # Crear thumbnail si no existe | |
| thumbnail_name = f"thumb_{filename}" | |
| thumbnail_path = os.path.join(THUMBNAILS_FOLDER, thumbnail_name) | |
| if not os.path.exists(thumbnail_path): | |
| create_thumbnail( | |
| os.path.join(COMMUNITY_FOLDER, filename), | |
| thumbnail_path | |
| ) | |
| community_images.append({ | |
| 'filename': filename, | |
| 'thumbnail': f'thumbnails/{thumbnail_name}', | |
| 'full_image': f'community/{filename}' | |
| }) | |
| return render_template('community.html', images=community_images) | |
| def panorama_viewer(filename): | |
| """Visualizador de panorama cilíndrico""" | |
| image_path = f'community/{filename}' | |
| return render_template('viewer.html', image_path=image_path, filename=filename) | |
| # Dictionary to store ongoing processes by session ID | |
| ongoing_processes = {} | |
| def upload_files(): | |
| if 'files[]' not in request.files: | |
| flash('No se seleccionaron archivos') | |
| return redirect(url_for('index')) | |
| files = request.files.getlist('files[]') | |
| if not files or files[0].filename == '': | |
| flash('No se seleccionaron archivos') | |
| return redirect(url_for('index')) | |
| # Verificar y guardar archivos temporalmente | |
| temp_paths = [] | |
| for file in files: | |
| if file and allowed_file(file.filename): | |
| filename = secure_filename(file.filename) | |
| filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) | |
| file.save(filepath) | |
| temp_paths.append(filepath) | |
| if len(temp_paths) < 2: | |
| flash('Se necesitan al menos 2 imágenes') | |
| return redirect(url_for('index')) | |
| # Store the process in session | |
| session_id = str(session.get('user_id', uuid.uuid4())) | |
| ongoing_processes[session_id] = { | |
| 'status': 'processing', | |
| 'log': [], | |
| 'temp_paths': temp_paths, | |
| 'start_time': time.time() | |
| } | |
| session['user_id'] = session_id | |
| # Redirect to waiting page | |
| thread = threading.Thread(target=process_background, args=(session_id, temp_paths)) | |
| thread.start() | |
| return render_template('waiting.html') | |
| def process_background(session_id, image_files): | |
| """Process panorama in background""" | |
| try: | |
| ongoing_processes[session_id]['log'].append(f"Iniciando procesamiento de {len(image_files)} imágenes...") | |
| result_image, error = process_panorama(image_files) | |
| if error: | |
| ongoing_processes[session_id]['status'] = 'error' | |
| ongoing_processes[session_id]['log'].append(f"Error: {error}") | |
| return | |
| # Limpiar archivos temporales | |
| for path in image_files: | |
| try: | |
| os.remove(path) | |
| ongoing_processes[session_id]['log'].append(f"Archivo temporal eliminado: {path}") | |
| except Exception as e: | |
| ongoing_processes[session_id]['log'].append(f"Error eliminando archivo temporal: {e}") | |
| # Guardar resultado | |
| result_filename = f'result_{uuid.uuid4().hex[:8]}.jpg' | |
| result_path = os.path.join(app.config['UPLOAD_FOLDER'], result_filename) | |
| result_image.save(result_path, 'JPEG', quality=95) | |
| ongoing_processes[session_id]['status'] = 'completed' | |
| ongoing_processes[session_id]['result_filename'] = result_filename | |
| ongoing_processes[session_id]['log'].append(f"Proceso completado. Resultado guardado como: {result_filename}") | |
| except Exception as e: | |
| ongoing_processes[session_id]['status'] = 'error' | |
| ongoing_processes[session_id]['log'].append(f"Error inesperado: {str(e)}") | |
| def check_progress(): | |
| """Check progress of the background process""" | |
| session_id = str(session.get('user_id')) | |
| if not session_id or session_id not in ongoing_processes: | |
| return json.dumps({ | |
| 'completed': True, | |
| 'redirect_url': url_for('index') | |
| }) | |
| process = ongoing_processes[session_id] | |
| if process['status'] == 'completed': | |
| result_filename = process.get('result_filename', '') | |
| return json.dumps({ | |
| 'completed': True, | |
| 'redirect_url': url_for('result', filename=result_filename) | |
| }) | |
| elif process['status'] == 'error': | |
| return json.dumps({ | |
| 'completed': True, | |
| 'redirect_url': url_for('index') | |
| }) | |
| # Process is still running | |
| log_output = "\n".join(process['log']) | |
| return json.dumps({ | |
| 'completed': False, | |
| 'log': log_output | |
| }) | |
| def result(filename): | |
| """Show the result of panorama creation""" | |
| try: | |
| file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) | |
| if os.path.exists(file_path): | |
| # Clean up process tracking for this session | |
| session_id = str(session.get('user_id')) | |
| if session_id in ongoing_processes: | |
| del ongoing_processes[session_id] | |
| return render_template('result.html', result_image=f'uploads/{filename}', filename=filename) | |
| else: | |
| flash('Archivo no encontrado') | |
| return redirect(url_for('index')) | |
| except Exception as e: | |
| flash(f'Error al mostrar el resultado: {str(e)}') | |
| return redirect(url_for('index')) | |
| def share_panorama(): | |
| result_path = request.form.get('result_path') | |
| if not result_path: | |
| flash('No hay imagen para compartir') | |
| return redirect(url_for('index')) | |
| try: | |
| # Generar nombre único para la imagen compartida | |
| original_filename = os.path.basename(result_path) | |
| shared_filename = f'shared_{uuid.uuid4().hex[:8]}.jpg' | |
| original_path = os.path.join('static', result_path) | |
| shared_path = os.path.join(COMMUNITY_FOLDER, shared_filename) | |
| # Copiar imagen a la carpeta de comunidad | |
| Image.open(original_path).save(shared_path, 'JPEG', quality=95) | |
| # Crear thumbnail para la nueva imagen | |
| thumbnail_name = f"thumb_{shared_filename}" | |
| thumbnail_path = os.path.join(THUMBNAILS_FOLDER, thumbnail_name) | |
| create_thumbnail(shared_path, thumbnail_path) | |
| # Eliminar imagen temporal | |
| os.remove(original_path) | |
| flash('¡Imagen compartida exitosamente!') | |
| except Exception as e: | |
| flash(f'Error al compartir la imagen: {str(e)}') | |
| return redirect(url_for('community')) | |
| def download_file(filename): | |
| """Descargar archivo de resultado""" | |
| try: | |
| file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) | |
| if os.path.exists(file_path): | |
| return send_file(file_path, as_attachment=True, download_name=f'panorama_{filename}') | |
| else: | |
| flash('Archivo no encontrado') | |
| return redirect(url_for('index')) | |
| except Exception as e: | |
| flash(f'Error al descargar: {str(e)}') | |
| return redirect(url_for('index')) | |
| if __name__ == '__main__': | |
| app.run(debug=True, host='0.0.0.0', port=7860) |