SnapToPano / app.py
HirCoir's picture
Update app.py
893c3dc verified
from flask import Flask, render_template, request, redirect, url_for, flash, send_file, session
from werkzeug.utils import secure_filename
import cv2
import numpy as np
import os
from PIL import Image
import uuid
import json
import threading
import time
app = Flask(__name__)
app.secret_key = 'clave_secreta_para_flash'
# Configuración de carpetas
UPLOAD_FOLDER = 'static/uploads'
COMMUNITY_FOLDER = 'static/community'
THUMBNAILS_FOLDER = 'static/thumbnails'
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg'}
# Crear directorios si no existen
for folder in [UPLOAD_FOLDER, COMMUNITY_FOLDER, THUMBNAILS_FOLDER]:
os.makedirs(folder, exist_ok=True)
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def create_thumbnail(image_path, thumbnail_path, size=(1280, 720)):
"""Crear miniatura comprimida para vista rápida"""
try:
with Image.open(image_path) as img:
# Calcular dimensiones manteniendo aspecto
img.thumbnail(size, Image.Resampling.LANCZOS)
# Guardar con compresión alta
img.save(thumbnail_path, 'JPEG', quality=70, optimize=True)
return True
except Exception as e:
print(f"Error creando thumbnail: {e}")
return False
def process_panorama(image_files):
imgs = []
for file in image_files:
img = cv2.imdecode(np.fromfile(file, dtype=np.uint8), cv2.IMREAD_COLOR)
if img is not None:
imgs.append(img)
if len(imgs) < 2:
return None, "Se necesitan al menos 2 imágenes válidas."
try:
# Try with GPU acceleration
stitcher = cv2.Stitcher_create(mode=cv2.Stitcher_PANORAMA)
status, pano = stitcher.stitch(imgs)
if status != cv2.Stitcher_OK:
return None, "No se pudo crear el panorama. Asegúrate de que las imágenes tengan suficiente solapamiento."
# Recortar bordes negros
pano_rgb = cv2.cvtColor(pano, cv2.COLOR_BGR2RGB)
mask = np.any(pano_rgb != [0, 0, 0], axis=2)
coords = np.column_stack(np.where(mask))
if coords.size > 0:
y_min, x_min = coords.min(axis=0)
y_max, x_max = coords.max(axis=0)
if y_min <= y_max and x_min <= x_max:
cropped = pano_rgb[y_min:y_max+1, x_min:x_max+1]
return Image.fromarray(cropped), None
return Image.fromarray(pano_rgb), None
except Exception as e:
# If GPU processing fails, try CPU fallback
try:
print("Fallo de aceleración por GPU. Intentando procesamiento en CPU...")
stitcher = cv2.Stitcher_create(mode=cv2.Stitcher_PANORAMA)
status, pano = stitcher.stitch(imgs)
if status != cv2.Stitcher_OK:
return None, f"No se pudo crear el panorama (CPU): {cv2.Stitcher_err_code_to_str(status)}"
# Same processing as above
pano_rgb = cv2.cvtColor(pano, cv2.COLOR_BGR2RGB)
mask = np.any(pano_rgb != [0, 0, 0], axis=2)
coords = np.column_stack(np.where(mask))
if coords.size > 0:
y_min, x_min = coords.min(axis=0)
y_max, x_max = coords.max(axis=0)
if y_min <= y_max and x_min <= x_max:
cropped = pano_rgb[y_min:y_max+1, x_min:x_max+1]
return Image.fromarray(cropped), None
return Image.fromarray(pano_rgb), None
except Exception as cpu_e:
return None, f"Error al procesar las imágenes (CPU): {str(cpu_e)}"
@app.route('/')
def index():
return render_template('index.html')
@app.route('/community')
def community():
"""Página de comunidad con vistas tipo YouTube"""
community_images = []
for filename in os.listdir(COMMUNITY_FOLDER):
if filename.endswith(('.jpg', '.jpeg', '.png')):
# Crear thumbnail si no existe
thumbnail_name = f"thumb_{filename}"
thumbnail_path = os.path.join(THUMBNAILS_FOLDER, thumbnail_name)
if not os.path.exists(thumbnail_path):
create_thumbnail(
os.path.join(COMMUNITY_FOLDER, filename),
thumbnail_path
)
community_images.append({
'filename': filename,
'thumbnail': f'thumbnails/{thumbnail_name}',
'full_image': f'community/{filename}'
})
return render_template('community.html', images=community_images)
@app.route('/viewer/<filename>')
def panorama_viewer(filename):
"""Visualizador de panorama cilíndrico"""
image_path = f'community/{filename}'
return render_template('viewer.html', image_path=image_path, filename=filename)
# Dictionary to store ongoing processes by session ID
ongoing_processes = {}
@app.route('/upload', methods=['POST'])
def upload_files():
if 'files[]' not in request.files:
flash('No se seleccionaron archivos')
return redirect(url_for('index'))
files = request.files.getlist('files[]')
if not files or files[0].filename == '':
flash('No se seleccionaron archivos')
return redirect(url_for('index'))
# Verificar y guardar archivos temporalmente
temp_paths = []
for file in files:
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(filepath)
temp_paths.append(filepath)
if len(temp_paths) < 2:
flash('Se necesitan al menos 2 imágenes')
return redirect(url_for('index'))
# Store the process in session
session_id = str(session.get('user_id', uuid.uuid4()))
ongoing_processes[session_id] = {
'status': 'processing',
'log': [],
'temp_paths': temp_paths,
'start_time': time.time()
}
session['user_id'] = session_id
# Redirect to waiting page
thread = threading.Thread(target=process_background, args=(session_id, temp_paths))
thread.start()
return render_template('waiting.html')
def process_background(session_id, image_files):
"""Process panorama in background"""
try:
ongoing_processes[session_id]['log'].append(f"Iniciando procesamiento de {len(image_files)} imágenes...")
result_image, error = process_panorama(image_files)
if error:
ongoing_processes[session_id]['status'] = 'error'
ongoing_processes[session_id]['log'].append(f"Error: {error}")
return
# Limpiar archivos temporales
for path in image_files:
try:
os.remove(path)
ongoing_processes[session_id]['log'].append(f"Archivo temporal eliminado: {path}")
except Exception as e:
ongoing_processes[session_id]['log'].append(f"Error eliminando archivo temporal: {e}")
# Guardar resultado
result_filename = f'result_{uuid.uuid4().hex[:8]}.jpg'
result_path = os.path.join(app.config['UPLOAD_FOLDER'], result_filename)
result_image.save(result_path, 'JPEG', quality=95)
ongoing_processes[session_id]['status'] = 'completed'
ongoing_processes[session_id]['result_filename'] = result_filename
ongoing_processes[session_id]['log'].append(f"Proceso completado. Resultado guardado como: {result_filename}")
except Exception as e:
ongoing_processes[session_id]['status'] = 'error'
ongoing_processes[session_id]['log'].append(f"Error inesperado: {str(e)}")
@app.route('/check_progress')
def check_progress():
"""Check progress of the background process"""
session_id = str(session.get('user_id'))
if not session_id or session_id not in ongoing_processes:
return json.dumps({
'completed': True,
'redirect_url': url_for('index')
})
process = ongoing_processes[session_id]
if process['status'] == 'completed':
result_filename = process.get('result_filename', '')
return json.dumps({
'completed': True,
'redirect_url': url_for('result', filename=result_filename)
})
elif process['status'] == 'error':
return json.dumps({
'completed': True,
'redirect_url': url_for('index')
})
# Process is still running
log_output = "\n".join(process['log'])
return json.dumps({
'completed': False,
'log': log_output
})
@app.route('/result/<filename>')
def result(filename):
"""Show the result of panorama creation"""
try:
file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
if os.path.exists(file_path):
# Clean up process tracking for this session
session_id = str(session.get('user_id'))
if session_id in ongoing_processes:
del ongoing_processes[session_id]
return render_template('result.html', result_image=f'uploads/{filename}', filename=filename)
else:
flash('Archivo no encontrado')
return redirect(url_for('index'))
except Exception as e:
flash(f'Error al mostrar el resultado: {str(e)}')
return redirect(url_for('index'))
@app.route('/share', methods=['POST'])
def share_panorama():
result_path = request.form.get('result_path')
if not result_path:
flash('No hay imagen para compartir')
return redirect(url_for('index'))
try:
# Generar nombre único para la imagen compartida
original_filename = os.path.basename(result_path)
shared_filename = f'shared_{uuid.uuid4().hex[:8]}.jpg'
original_path = os.path.join('static', result_path)
shared_path = os.path.join(COMMUNITY_FOLDER, shared_filename)
# Copiar imagen a la carpeta de comunidad
Image.open(original_path).save(shared_path, 'JPEG', quality=95)
# Crear thumbnail para la nueva imagen
thumbnail_name = f"thumb_{shared_filename}"
thumbnail_path = os.path.join(THUMBNAILS_FOLDER, thumbnail_name)
create_thumbnail(shared_path, thumbnail_path)
# Eliminar imagen temporal
os.remove(original_path)
flash('¡Imagen compartida exitosamente!')
except Exception as e:
flash(f'Error al compartir la imagen: {str(e)}')
return redirect(url_for('community'))
@app.route('/download/<filename>')
def download_file(filename):
"""Descargar archivo de resultado"""
try:
file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
if os.path.exists(file_path):
return send_file(file_path, as_attachment=True, download_name=f'panorama_{filename}')
else:
flash('Archivo no encontrado')
return redirect(url_for('index'))
except Exception as e:
flash(f'Error al descargar: {str(e)}')
return redirect(url_for('index'))
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=7860)