|
|
from fastapi import FastAPI, File, UploadFile, Form, Request, BackgroundTasks, Depends, HTTPException |
|
|
from fastapi.responses import HTMLResponse, FileResponse, JSONResponse, RedirectResponse |
|
|
from fastapi.templating import Jinja2Templates |
|
|
from fastapi.staticfiles import StaticFiles |
|
|
from fastapi.middleware.cors import CORSMiddleware |
|
|
import uvicorn |
|
|
import os |
|
|
from augment_dataset import augment_session, get_session_stats, AVAILABLE_VARIANTS |
|
|
from PIL import Image, ImageDraw |
|
|
import numpy as np |
|
|
import random |
|
|
import io |
|
|
import base64 |
|
|
import json |
|
|
import zipfile |
|
|
from datetime import datetime |
|
|
import shutil |
|
|
from sqlalchemy.orm import Session |
|
|
|
|
|
|
|
|
from auth.database import create_tables, get_db |
|
|
from auth.models import User, UserSession |
|
|
from auth.routes import router as auth_router |
|
|
from auth.classes_routes import router as classes_router |
|
|
from auth.session_routes import router as hash_sessions_router |
|
|
from auth.dependencies import get_current_user, get_optional_user, verify_session_access |
|
|
|
|
|
|
|
|
app = FastAPI( |
|
|
title="YOLO Multi-Class Annotator & Visualizer (JWT Auth)", |
|
|
description="Sistema de anotación YOLO con autenticación JWT", |
|
|
version="2.0.0" |
|
|
) |
|
|
|
|
|
|
|
|
app.add_middleware( |
|
|
CORSMiddleware, |
|
|
allow_origins=["*"], |
|
|
allow_credentials=True, |
|
|
allow_methods=["*"], |
|
|
allow_headers=["*"], |
|
|
) |
|
|
|
|
|
|
|
|
templates = Jinja2Templates(directory="templates") |
|
|
app.mount("/static", StaticFiles(directory="static"), name="static") |
|
|
|
|
|
|
|
|
app.include_router(auth_router) |
|
|
app.include_router(classes_router) |
|
|
app.include_router(hash_sessions_router) |
|
|
|
|
|
|
|
|
os.makedirs("static", exist_ok=True) |
|
|
os.makedirs("templates", exist_ok=True) |
|
|
os.makedirs("annotations", exist_ok=True) |
|
|
os.makedirs("temp", exist_ok=True) |
|
|
|
|
|
|
|
|
create_tables() |
|
|
|
|
|
|
|
|
def create_session_structure(session_name, user_id=None, db=None): |
|
|
"""Crear estructura de sesión y asociarla con usuario""" |
|
|
session_path = f"annotations/{session_name}" |
|
|
os.makedirs(f"{session_path}/images", exist_ok=True) |
|
|
os.makedirs(f"{session_path}/labels", exist_ok=True) |
|
|
|
|
|
|
|
|
if user_id and db: |
|
|
existing_session = db.query(UserSession).filter( |
|
|
UserSession.user_id == user_id, |
|
|
UserSession.session_name == session_name |
|
|
).first() |
|
|
|
|
|
if not existing_session: |
|
|
user_session = UserSession( |
|
|
session_name=session_name, |
|
|
user_id=user_id |
|
|
) |
|
|
db.add(user_session) |
|
|
db.commit() |
|
|
|
|
|
return session_path |
|
|
|
|
|
def random_color(): |
|
|
return tuple(random.randint(0, 255) for _ in range(3)) |
|
|
|
|
|
def create_canvas_with_image(image_bytes, size, x, y, change_bg=True, max_size=800): |
|
|
"""Crear canvas con imagen redimensionada automáticamente""" |
|
|
bg_color = random_color() if change_bg else (200, 200, 200) |
|
|
canvas = Image.new('RGB', size, bg_color) |
|
|
|
|
|
|
|
|
img = Image.open(io.BytesIO(image_bytes)) |
|
|
|
|
|
|
|
|
if img.mode in ('RGBA', 'LA', 'P'): |
|
|
|
|
|
background = Image.new('RGB', img.size, (255, 255, 255)) |
|
|
if img.mode == 'P': |
|
|
img = img.convert('RGBA') |
|
|
background.paste(img, mask=img.split()[-1] if img.mode in ('RGBA', 'LA') else None) |
|
|
img = background |
|
|
elif img.mode != 'RGB': |
|
|
img = img.convert('RGB') |
|
|
|
|
|
|
|
|
original_width, original_height = img.size |
|
|
|
|
|
|
|
|
if original_width > max_size or original_height > max_size: |
|
|
ratio = min(max_size / original_width, max_size / original_height) |
|
|
new_width = int(original_width * ratio) |
|
|
new_height = int(original_height * ratio) |
|
|
img = img.resize((new_width, new_height), Image.Resampling.LANCZOS) |
|
|
|
|
|
|
|
|
canvas_width, canvas_height = size |
|
|
img_width, img_height = img.size |
|
|
|
|
|
|
|
|
if x == 0 and y == 0: |
|
|
paste_x = (canvas_width - img_width) // 2 |
|
|
paste_y = (canvas_height - img_height) // 2 |
|
|
else: |
|
|
|
|
|
paste_x = min(x, canvas_width - img_width) |
|
|
paste_y = min(y, canvas_height - img_height) |
|
|
|
|
|
paste_x = max(0, paste_x) |
|
|
paste_y = max(0, paste_y) |
|
|
|
|
|
|
|
|
canvas.paste(img, (paste_x, paste_y)) |
|
|
|
|
|
return canvas |
|
|
|
|
|
def image_to_base64(pil_image): |
|
|
buffer = io.BytesIO() |
|
|
pil_image.save(buffer, format='JPEG', quality=90) |
|
|
img_data = base64.b64encode(buffer.getvalue()).decode() |
|
|
return f"data:image/jpeg;base64,{img_data}" |
|
|
|
|
|
def get_user_sessions_list(user: User, db: Session): |
|
|
"""Obtener lista de nombres de sesiones del usuario (para compatibilidad)""" |
|
|
if user and user.is_admin: |
|
|
|
|
|
sessions_dir = "annotations" |
|
|
all_sessions = [] |
|
|
if os.path.exists(sessions_dir): |
|
|
for session_name in os.listdir(sessions_dir): |
|
|
session_path = os.path.join(sessions_dir, session_name) |
|
|
if os.path.isdir(session_path): |
|
|
all_sessions.append(session_name) |
|
|
return all_sessions |
|
|
elif user: |
|
|
|
|
|
user_sessions = db.query(UserSession).filter( |
|
|
UserSession.user_id == user.id, |
|
|
UserSession.is_active == True |
|
|
).all() |
|
|
return [session.session_name for session in user_sessions] |
|
|
else: |
|
|
|
|
|
sessions_dir = "annotations" |
|
|
all_sessions = [] |
|
|
if os.path.exists(sessions_dir): |
|
|
for session_name in os.listdir(sessions_dir): |
|
|
session_path = os.path.join(sessions_dir, session_name) |
|
|
if os.path.isdir(session_path): |
|
|
all_sessions.append(session_name) |
|
|
return all_sessions |
|
|
|
|
|
|
|
|
def get_user_sessions_with_info(user: User, db: Session): |
|
|
"""Obtener lista de sesiones del usuario con información completa (para API)""" |
|
|
if user and user.is_admin: |
|
|
|
|
|
user_sessions = db.query(UserSession).filter( |
|
|
UserSession.is_active == True |
|
|
).all() |
|
|
|
|
|
sessions_list = [] |
|
|
for session in user_sessions: |
|
|
session_info = { |
|
|
'name': session.session_name, |
|
|
'session_hash': session.session_hash, |
|
|
'is_private': bool(session.session_hash) |
|
|
} |
|
|
sessions_list.append(session_info) |
|
|
|
|
|
return sessions_list |
|
|
elif user: |
|
|
|
|
|
user_sessions = db.query(UserSession).filter( |
|
|
UserSession.user_id == user.id, |
|
|
UserSession.is_active == True |
|
|
).all() |
|
|
|
|
|
sessions_list = [] |
|
|
for session in user_sessions: |
|
|
session_info = { |
|
|
'name': session.session_name, |
|
|
'session_hash': session.session_hash, |
|
|
'is_private': bool(session.session_hash) |
|
|
} |
|
|
sessions_list.append(session_info) |
|
|
|
|
|
return sessions_list |
|
|
else: |
|
|
|
|
|
sessions_dir = "annotations" |
|
|
all_sessions = [] |
|
|
if os.path.exists(sessions_dir): |
|
|
for session_name in os.listdir(sessions_dir): |
|
|
session_path = os.path.join(sessions_dir, session_name) |
|
|
if os.path.isdir(session_path): |
|
|
|
|
|
session_info = { |
|
|
'name': session_name, |
|
|
'session_hash': None, |
|
|
'is_private': False |
|
|
} |
|
|
all_sessions.append(session_info) |
|
|
return all_sessions |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.middleware("http") |
|
|
async def auth_middleware(request: Request, call_next): |
|
|
"""Middleware para manejar autenticación en rutas protegidas""" |
|
|
|
|
|
|
|
|
public_paths = [ |
|
|
"/", |
|
|
"/login", |
|
|
"/register", |
|
|
"/auth/", |
|
|
"/static/", |
|
|
"/docs", |
|
|
"/redoc", |
|
|
"/openapi.json" |
|
|
] |
|
|
|
|
|
|
|
|
is_public = any(request.url.path.startswith(path) for path in public_paths) |
|
|
|
|
|
if is_public: |
|
|
response = await call_next(request) |
|
|
return response |
|
|
|
|
|
|
|
|
|
|
|
if not request.url.path.startswith("/api/"): |
|
|
|
|
|
auth_header = request.headers.get("authorization") |
|
|
if not auth_header: |
|
|
|
|
|
return RedirectResponse(url="/login", status_code=302) |
|
|
|
|
|
response = await call_next(request) |
|
|
return response |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.get("/", response_class=HTMLResponse) |
|
|
async def main(request: Request): |
|
|
"""Página principal - pública""" |
|
|
return templates.TemplateResponse("index.html", {"request": request}) |
|
|
|
|
|
@app.get("/login", response_class=HTMLResponse) |
|
|
async def login_page(request: Request): |
|
|
"""Página de login""" |
|
|
return templates.TemplateResponse("login.html", {"request": request}) |
|
|
|
|
|
@app.get("/register", response_class=HTMLResponse) |
|
|
async def register_page(request: Request): |
|
|
"""Página de registro""" |
|
|
return templates.TemplateResponse("register.html", {"request": request}) |
|
|
|
|
|
@app.get("/dashboard", response_class=HTMLResponse) |
|
|
async def dashboard( |
|
|
request: Request, |
|
|
current_user: User = Depends(get_optional_user), |
|
|
db: Session = Depends(get_db) |
|
|
): |
|
|
"""Dashboard principal - verificación de autenticación en frontend""" |
|
|
if current_user: |
|
|
user_sessions = get_user_sessions_list(current_user, db) |
|
|
else: |
|
|
user_sessions = [] |
|
|
|
|
|
return templates.TemplateResponse("dashboard.html", { |
|
|
"request": request, |
|
|
"user": current_user, |
|
|
"sessions": user_sessions |
|
|
}) |
|
|
|
|
|
@app.get("/sessions", response_class=HTMLResponse) |
|
|
async def sessions_page( |
|
|
request: Request, |
|
|
current_user: User = Depends(get_optional_user), |
|
|
db: Session = Depends(get_db) |
|
|
): |
|
|
"""Página de gestión de sesiones - autenticación opcional""" |
|
|
try: |
|
|
sessions_dir = "annotations" |
|
|
sessions = [] |
|
|
|
|
|
|
|
|
user_sessions = get_user_sessions_list(current_user, db) |
|
|
|
|
|
for session_name in user_sessions: |
|
|
session_path = os.path.join(sessions_dir, session_name) |
|
|
if os.path.isdir(session_path): |
|
|
images_path = os.path.join(session_path, "images") |
|
|
labels_path = os.path.join(session_path, "labels") |
|
|
|
|
|
image_count = 0 |
|
|
label_count = 0 |
|
|
|
|
|
if os.path.exists(images_path): |
|
|
image_count = len([f for f in os.listdir(images_path) |
|
|
if f.lower().endswith(('.jpg', '.jpeg', '.png'))]) |
|
|
|
|
|
if os.path.exists(labels_path): |
|
|
label_count = len([f for f in os.listdir(labels_path) |
|
|
if f.lower().endswith('.txt')]) |
|
|
|
|
|
sessions.append({ |
|
|
'name': session_name, |
|
|
'images': image_count, |
|
|
'labels': label_count, |
|
|
'path': session_path |
|
|
}) |
|
|
|
|
|
return templates.TemplateResponse("sessions.html", { |
|
|
"request": request, |
|
|
"sessions": sessions, |
|
|
"current_user": current_user |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
return HTMLResponse(content=f"<h1>Error</h1><p>Error: {str(e)}</p><a href='/dashboard'>← Volver</a>") |
|
|
|
|
|
@app.get("/annotator", response_class=HTMLResponse) |
|
|
async def annotator_page( |
|
|
request: Request, |
|
|
current_user: User = Depends(get_optional_user) |
|
|
): |
|
|
"""Anotador clásico para crear datasets - verificación de autenticación en frontend""" |
|
|
return templates.TemplateResponse("annotator.html", { |
|
|
"request": request, |
|
|
"user": current_user |
|
|
}) |
|
|
|
|
|
@app.get("/visualizer", response_class=HTMLResponse) |
|
|
async def visualizer_page( |
|
|
request: Request, |
|
|
session: str = None, |
|
|
current_user: User = Depends(get_optional_user), |
|
|
db: Session = Depends(get_db) |
|
|
): |
|
|
"""Visualizador de datasets con anotaciones - verificación de autenticación en frontend""" |
|
|
try: |
|
|
|
|
|
if current_user: |
|
|
user_sessions = get_user_sessions_list(current_user, db) |
|
|
|
|
|
|
|
|
if session and not verify_session_access(current_user, session, db): |
|
|
raise HTTPException(status_code=403, detail="No access to this session") |
|
|
else: |
|
|
user_sessions = [] |
|
|
|
|
|
return templates.TemplateResponse("visualizer.html", { |
|
|
"request": request, |
|
|
"sessions": user_sessions, |
|
|
"current_session": session, |
|
|
"user": current_user |
|
|
}) |
|
|
|
|
|
except HTTPException: |
|
|
raise |
|
|
except Exception as e: |
|
|
return HTMLResponse(content=f"<h1>Error</h1><p>Error en visualizador: {str(e)}</p><a href='/dashboard'>← Volver</a>") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.get("/session/{session_hash}", response_class=HTMLResponse) |
|
|
async def session_by_hash_page( |
|
|
request: Request, |
|
|
session_hash: str, |
|
|
db: Session = Depends(get_db) |
|
|
): |
|
|
""" |
|
|
Página de acceso a una sesión específica por hash. |
|
|
NO requiere autenticación - acceso público con hash. |
|
|
""" |
|
|
from auth.session_utils import get_session_by_hash |
|
|
|
|
|
|
|
|
session = get_session_by_hash(db, session_hash) |
|
|
if not session: |
|
|
return HTMLResponse( |
|
|
content=f""" |
|
|
<h1>🔍 Sesión no encontrada</h1> |
|
|
<p>La sesión con hash <code>{session_hash}</code> no existe o está inactiva.</p> |
|
|
<a href="/">← Ir al inicio</a> |
|
|
""", |
|
|
status_code=404 |
|
|
) |
|
|
|
|
|
return templates.TemplateResponse("session_access.html", { |
|
|
"request": request, |
|
|
"session": session, |
|
|
"session_hash": session_hash, |
|
|
"annotator_url": f"/session/{session_hash}/annotator", |
|
|
"visualizer_url": f"/session/{session_hash}/visualizer" |
|
|
}) |
|
|
|
|
|
@app.get("/session/{session_hash}/annotator", response_class=HTMLResponse) |
|
|
async def session_annotator_by_hash( |
|
|
request: Request, |
|
|
session_hash: str, |
|
|
db: Session = Depends(get_db) |
|
|
): |
|
|
""" |
|
|
Anotador para una sesión específica accesible por hash. |
|
|
NO requiere autenticación. |
|
|
""" |
|
|
from auth.session_utils import get_session_by_hash |
|
|
|
|
|
session = get_session_by_hash(db, session_hash) |
|
|
if not session: |
|
|
return HTMLResponse(content="Sesión no encontrada", status_code=404) |
|
|
|
|
|
return templates.TemplateResponse("annotator.html", { |
|
|
"request": request, |
|
|
"session_hash": session_hash, |
|
|
"session_name": session.session_name, |
|
|
"public_access": True, |
|
|
"user": None |
|
|
}) |
|
|
|
|
|
@app.get("/session/{session_hash}/visualizer", response_class=HTMLResponse) |
|
|
async def session_visualizer_by_hash( |
|
|
request: Request, |
|
|
session_hash: str, |
|
|
db: Session = Depends(get_db) |
|
|
): |
|
|
""" |
|
|
Visualizador para una sesión específica accesible por hash. |
|
|
NO requiere autenticación. |
|
|
""" |
|
|
from auth.session_utils import get_session_by_hash |
|
|
|
|
|
session = get_session_by_hash(db, session_hash) |
|
|
if not session: |
|
|
return HTMLResponse(content="Sesión no encontrada", status_code=404) |
|
|
|
|
|
return templates.TemplateResponse("visualizer.html", { |
|
|
"request": request, |
|
|
"session_hash": session_hash, |
|
|
"session_name": session.session_name, |
|
|
"current_session": session.session_name, |
|
|
"public_access": True, |
|
|
"sessions": [session], |
|
|
"user": None |
|
|
}) |
|
|
@app.get("/api/sessions") |
|
|
async def list_sessions_api( |
|
|
request: Request, |
|
|
current_user: User = Depends(get_current_user), |
|
|
db: Session = Depends(get_db) |
|
|
): |
|
|
"""Listar sesiones del usuario actual""" |
|
|
try: |
|
|
sessions_dir = "annotations" |
|
|
sessions = [] |
|
|
|
|
|
|
|
|
user_sessions = get_user_sessions_list(current_user, db) |
|
|
|
|
|
for session_info in user_sessions: |
|
|
|
|
|
if isinstance(session_info, dict): |
|
|
session_name = session_info['name'] |
|
|
session_hash = session_info.get('session_hash') |
|
|
is_private = session_info.get('is_private', False) |
|
|
else: |
|
|
session_name = session_info |
|
|
session_hash = None |
|
|
is_private = False |
|
|
|
|
|
session_path = os.path.join(sessions_dir, session_name) |
|
|
|
|
|
|
|
|
images_count = 0 |
|
|
labels_count = 0 |
|
|
|
|
|
if os.path.isdir(session_path): |
|
|
images_path = os.path.join(session_path, "images") |
|
|
labels_path = os.path.join(session_path, "labels") |
|
|
|
|
|
if os.path.exists(images_path): |
|
|
images_count = len([f for f in os.listdir(images_path) |
|
|
if f.lower().endswith(('.jpg', '.jpeg', '.png', '.bmp', '.gif', '.webp'))]) |
|
|
|
|
|
if os.path.exists(labels_path): |
|
|
labels_count = len([f for f in os.listdir(labels_path) |
|
|
if f.lower().endswith('.txt')]) |
|
|
|
|
|
session_data = { |
|
|
'name': session_name, |
|
|
'images_count': images_count, |
|
|
'labels_count': labels_count, |
|
|
'is_private': is_private |
|
|
} |
|
|
|
|
|
|
|
|
if session_hash: |
|
|
session_data['session_hash'] = session_hash |
|
|
session_data['share_url'] = f"{request.base_url}session/{session_hash}" |
|
|
|
|
|
sessions.append(session_data) |
|
|
|
|
|
|
|
|
sessions.sort(key=lambda x: x['name']) |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"sessions": sessions, |
|
|
"user": { |
|
|
"username": current_user.username, |
|
|
"is_admin": current_user.is_admin |
|
|
} |
|
|
} |
|
|
except Exception as e: |
|
|
return {"success": False, "message": f"Error al listar sesiones: {str(e)}"} |
|
|
|
|
|
@app.get("/api/session/{session_name}/visualize") |
|
|
async def get_session_visualize_data( |
|
|
session_name: str, |
|
|
limit: int = None, |
|
|
offset: int = 0, |
|
|
current_user: User = Depends(get_current_user), |
|
|
db: Session = Depends(get_db) |
|
|
): |
|
|
"""API endpoint para obtener datos de visualización de una sesión""" |
|
|
try: |
|
|
|
|
|
if not verify_session_access(current_user, session_name, db): |
|
|
return {"success": False, "message": "No tienes acceso a esta sesión"} |
|
|
|
|
|
session_path = os.path.join("annotations", session_name) |
|
|
images_path = os.path.join(session_path, "images") |
|
|
labels_path = os.path.join(session_path, "labels") |
|
|
|
|
|
if not os.path.exists(session_path): |
|
|
return {"success": False, "message": f"Sesión '{session_name}' no encontrada"} |
|
|
|
|
|
images_data = [] |
|
|
total_labels = 0 |
|
|
|
|
|
if os.path.exists(images_path): |
|
|
for filename in os.listdir(images_path): |
|
|
if filename.lower().endswith(('.jpg', '.jpeg', '.png')): |
|
|
|
|
|
from PIL import Image as PILImage |
|
|
image_path = os.path.join(images_path, filename) |
|
|
|
|
|
try: |
|
|
with PILImage.open(image_path) as img: |
|
|
width, height = img.size |
|
|
except: |
|
|
width, height = 640, 640 |
|
|
|
|
|
|
|
|
label_filename = os.path.splitext(filename)[0] + '.txt' |
|
|
label_path = os.path.join(labels_path, label_filename) |
|
|
|
|
|
annotations = [] |
|
|
if os.path.exists(label_path): |
|
|
with open(label_path, 'r') as f: |
|
|
for line_num, line in enumerate(f): |
|
|
line = line.strip() |
|
|
if line: |
|
|
try: |
|
|
parts = line.split() |
|
|
if len(parts) >= 5: |
|
|
class_id = int(parts[0]) |
|
|
x_center = float(parts[1]) |
|
|
y_center = float(parts[2]) |
|
|
bbox_width = float(parts[3]) |
|
|
bbox_height = float(parts[4]) |
|
|
|
|
|
|
|
|
x1 = int((x_center - bbox_width/2) * width) |
|
|
y1 = int((y_center - bbox_height/2) * height) |
|
|
x2 = int((x_center + bbox_width/2) * width) |
|
|
y2 = int((y_center + bbox_height/2) * height) |
|
|
|
|
|
annotations.append({ |
|
|
'class_id': class_id, |
|
|
'x1': x1, 'y1': y1, 'x2': x2, 'y2': y2, |
|
|
'x_center': x_center, 'y_center': y_center, |
|
|
'width': bbox_width, 'height': bbox_height |
|
|
}) |
|
|
except (ValueError, IndexError) as e: |
|
|
print(f"Error procesando línea {line_num} en {label_filename}: {e}") |
|
|
continue |
|
|
|
|
|
total_labels += len(annotations) |
|
|
|
|
|
images_data.append({ |
|
|
'name': filename, |
|
|
'labels': len(annotations), |
|
|
'annotations': annotations, |
|
|
'width': width, |
|
|
'height': height |
|
|
}) |
|
|
|
|
|
|
|
|
images_to_return = images_data |
|
|
if limit is not None and limit > 0: |
|
|
end_index = offset + limit |
|
|
images_to_return = images_data[offset:end_index] |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"session_name": session_name, |
|
|
"total_images": len(images_data), |
|
|
"total_labels": total_labels, |
|
|
"returned_images": len(images_to_return), |
|
|
"offset": offset, |
|
|
"has_more": limit is not None and limit > 0 and offset + limit < len(images_data), |
|
|
"images": images_to_return |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
return {"success": False, "message": f"Error: {str(e)}"} |
|
|
|
|
|
@app.post("/api/session/{session_name}/create") |
|
|
async def create_session_api( |
|
|
session_name: str, |
|
|
current_user: User = Depends(get_current_user), |
|
|
db: Session = Depends(get_db) |
|
|
): |
|
|
"""Crear nueva sesión para el usuario""" |
|
|
try: |
|
|
|
|
|
if not session_name or session_name.strip() == '': |
|
|
return {"success": False, "message": "Nombre de sesión inválido"} |
|
|
|
|
|
|
|
|
safe_session_name = "".join(c for c in session_name if c.isalnum() or c in ('_', '-')).strip() |
|
|
if not safe_session_name: |
|
|
return {"success": False, "message": "Nombre de sesión contiene caracteres inválidos"} |
|
|
|
|
|
|
|
|
existing_session = db.query(UserSession).filter( |
|
|
UserSession.user_id == current_user.id, |
|
|
UserSession.session_name == safe_session_name |
|
|
).first() |
|
|
|
|
|
if existing_session: |
|
|
return {"success": False, "message": "Ya tienes una sesión con este nombre"} |
|
|
|
|
|
|
|
|
session_path = create_session_structure(safe_session_name, current_user.id, db) |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"message": f"Sesión '{safe_session_name}' creada exitosamente", |
|
|
"session_name": safe_session_name |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
return {"success": False, "message": f"Error al crear sesión: {str(e)}"} |
|
|
|
|
|
@app.post("/api/upload") |
|
|
async def upload_image( |
|
|
session: str = Form(...), |
|
|
canvas_width: int = Form(640), |
|
|
canvas_height: int = Form(640), |
|
|
x: int = Form(0), |
|
|
y: int = Form(0), |
|
|
change_bg: bool = Form(True), |
|
|
file: UploadFile = File(...), |
|
|
current_user: User = Depends(get_current_user), |
|
|
db: Session = Depends(get_db) |
|
|
): |
|
|
"""Subir imagen a sesión del usuario""" |
|
|
try: |
|
|
|
|
|
if not verify_session_access(current_user, session, db): |
|
|
return {"success": False, "message": "No tienes acceso a esta sesión"} |
|
|
|
|
|
|
|
|
image_bytes = await file.read() |
|
|
|
|
|
|
|
|
canvas_image = create_canvas_with_image( |
|
|
image_bytes, (canvas_width, canvas_height), x, y, change_bg |
|
|
) |
|
|
|
|
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
image_filename = f"{session}_{timestamp}_{file.filename}" |
|
|
|
|
|
|
|
|
session_path = os.path.join("annotations", session) |
|
|
image_path = os.path.join(session_path, "images", image_filename) |
|
|
|
|
|
|
|
|
os.makedirs(os.path.dirname(image_path), exist_ok=True) |
|
|
|
|
|
canvas_image.save(image_path) |
|
|
|
|
|
|
|
|
preview_b64 = image_to_base64(canvas_image) |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"filename": image_filename, |
|
|
"preview": preview_b64, |
|
|
"message": f"Imagen subida a sesión '{session}'" |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
return {"success": False, "message": f"Error al subir imagen: {str(e)}"} |
|
|
|
|
|
@app.post("/api/save_annotations") |
|
|
async def save_annotations( |
|
|
session: str = Form(...), |
|
|
filename: str = Form(...), |
|
|
annotations: str = Form(...), |
|
|
current_user: User = Depends(get_current_user), |
|
|
db: Session = Depends(get_db) |
|
|
): |
|
|
"""Guardar anotaciones de una imagen""" |
|
|
try: |
|
|
|
|
|
if not verify_session_access(current_user, session, db): |
|
|
return {"success": False, "message": "No tienes acceso a esta sesión"} |
|
|
|
|
|
|
|
|
try: |
|
|
annotations_data = json.loads(annotations) |
|
|
except json.JSONDecodeError: |
|
|
return {"success": False, "message": "Formato de anotaciones inválido"} |
|
|
|
|
|
|
|
|
label_content = [] |
|
|
for ann in annotations_data: |
|
|
if all(key in ann for key in ['class_id', 'x_center', 'y_center', 'width', 'height']): |
|
|
label_line = f"{ann['class_id']} {ann['x_center']} {ann['y_center']} {ann['width']} {ann['height']}" |
|
|
label_content.append(label_line) |
|
|
|
|
|
|
|
|
session_path = os.path.join("annotations", session) |
|
|
label_filename = os.path.splitext(filename)[0] + '.txt' |
|
|
label_path = os.path.join(session_path, "labels", label_filename) |
|
|
|
|
|
|
|
|
os.makedirs(os.path.dirname(label_path), exist_ok=True) |
|
|
|
|
|
with open(label_path, 'w') as f: |
|
|
f.write('\n'.join(label_content)) |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"message": f"Anotaciones guardadas para {filename}", |
|
|
"annotations_count": len(label_content) |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
return {"success": False, "message": f"Error al guardar anotaciones: {str(e)}"} |
|
|
|
|
|
@app.post("/api/augment") |
|
|
async def augment_dataset_api( |
|
|
background_tasks: BackgroundTasks, |
|
|
session: str = Form(...), |
|
|
variants: list = Form(None), |
|
|
current_user: User = Depends(get_current_user), |
|
|
db: Session = Depends(get_db) |
|
|
): |
|
|
"""Aumentar dataset en background - requiere autenticación""" |
|
|
try: |
|
|
|
|
|
if not verify_session_access(current_user, session, db): |
|
|
return {"success": False, "message": "No tienes acceso a esta sesión"} |
|
|
|
|
|
session_path = os.path.join("annotations", session) |
|
|
|
|
|
if not os.path.exists(session_path): |
|
|
return {"success": False, "message": f"Sesión '{session}' no encontrada"} |
|
|
|
|
|
|
|
|
images_path = os.path.join(session_path, "images") |
|
|
if not os.path.exists(images_path) or not os.listdir(images_path): |
|
|
return {"success": False, "message": f"No hay imágenes en la sesión '{session}'"} |
|
|
|
|
|
|
|
|
selected_variants = variants if variants else list(AVAILABLE_VARIANTS.keys()) |
|
|
|
|
|
|
|
|
invalid_variants = [v for v in selected_variants if v not in AVAILABLE_VARIANTS] |
|
|
if invalid_variants: |
|
|
return {"success": False, "message": f"Variantes inválidas: {invalid_variants}"} |
|
|
|
|
|
|
|
|
background_tasks.add_task(augment_session, session, selected_variants) |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"message": f"Aumentación iniciada para sesión '{session}' con {len(selected_variants)} variantes", |
|
|
"session": session, |
|
|
"selected_variants": selected_variants, |
|
|
"available_variants": AVAILABLE_VARIANTS |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
return {"success": False, "message": f"Error al iniciar augmentación: {str(e)}"} |
|
|
|
|
|
@app.get("/api/augment/progress/{session}") |
|
|
async def get_augment_progress( |
|
|
session: str, |
|
|
current_user: User = Depends(get_optional_user), |
|
|
db: Session = Depends(get_db) |
|
|
): |
|
|
"""Obtener progreso de augmentación""" |
|
|
try: |
|
|
|
|
|
if current_user and not verify_session_access(current_user, session, db): |
|
|
return {"success": False, "message": "No tienes acceso a esta sesión"} |
|
|
|
|
|
progress_file = f"temp/progress_{session}.json" |
|
|
|
|
|
if not os.path.exists(progress_file): |
|
|
return { |
|
|
"success": False, |
|
|
"message": "No hay proceso de augmentación activo" |
|
|
} |
|
|
|
|
|
with open(progress_file, 'r') as f: |
|
|
progress_data = json.load(f) |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
**progress_data |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
return {"success": False, "message": f"Error al obtener progreso: {str(e)}"} |
|
|
|
|
|
@app.get("/api/stats/{session}") |
|
|
async def get_session_stats_api( |
|
|
session: str, |
|
|
current_user: User = Depends(get_current_user), |
|
|
db: Session = Depends(get_db) |
|
|
): |
|
|
"""Obtener estadísticas de una sesión""" |
|
|
try: |
|
|
|
|
|
if not verify_session_access(current_user, session, db): |
|
|
return {"success": False, "message": "No tienes acceso a esta sesión"} |
|
|
|
|
|
stats = get_session_stats(session) |
|
|
return { |
|
|
"success": True, |
|
|
"session": session, |
|
|
"stats": stats |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
return {"success": False, "message": f"Error al obtener estadísticas: {str(e)}"} |
|
|
|
|
|
@app.get("/api/download/{session}") |
|
|
async def download_session( |
|
|
session: str, |
|
|
current_user: User = Depends(get_current_user), |
|
|
db: Session = Depends(get_db) |
|
|
): |
|
|
"""Descargar sesión como archivo ZIP""" |
|
|
try: |
|
|
|
|
|
if not verify_session_access(current_user, session, db): |
|
|
raise HTTPException(status_code=403, detail="No tienes acceso a esta sesión") |
|
|
|
|
|
session_path = os.path.join("annotations", session) |
|
|
|
|
|
if not os.path.exists(session_path): |
|
|
raise HTTPException(status_code=404, detail=f"Sesión '{session}' no encontrada") |
|
|
|
|
|
|
|
|
zip_filename = f"{session}_dataset.zip" |
|
|
zip_path = os.path.join("temp", zip_filename) |
|
|
|
|
|
os.makedirs("temp", exist_ok=True) |
|
|
|
|
|
with zipfile.ZipFile(zip_path, 'w') as zipf: |
|
|
for root, dirs, files in os.walk(session_path): |
|
|
for file in files: |
|
|
file_path = os.path.join(root, file) |
|
|
arcname = os.path.relpath(file_path, session_path) |
|
|
zipf.write(file_path, arcname) |
|
|
|
|
|
return FileResponse( |
|
|
path=zip_path, |
|
|
media_type='application/zip', |
|
|
filename=zip_filename |
|
|
) |
|
|
|
|
|
except HTTPException: |
|
|
raise |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=f"Error al descargar: {str(e)}") |
|
|
|
|
|
@app.delete("/api/session/{session_name}") |
|
|
async def delete_session_api( |
|
|
session_name: str, |
|
|
current_user: User = Depends(get_current_user), |
|
|
db: Session = Depends(get_db) |
|
|
): |
|
|
"""Eliminar sesión del usuario""" |
|
|
try: |
|
|
|
|
|
if not verify_session_access(current_user, session_name, db): |
|
|
return {"success": False, "message": "No tienes acceso a esta sesión"} |
|
|
|
|
|
|
|
|
user_session = db.query(UserSession).filter( |
|
|
UserSession.user_id == current_user.id, |
|
|
UserSession.session_name == session_name |
|
|
).first() |
|
|
|
|
|
if user_session: |
|
|
db.delete(user_session) |
|
|
db.commit() |
|
|
|
|
|
|
|
|
session_path = os.path.join("annotations", session_name) |
|
|
if os.path.exists(session_path): |
|
|
shutil.rmtree(session_path) |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"message": f"Sesión '{session_name}' eliminada exitosamente" |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
return {"success": False, "message": f"Error al eliminar sesión: {str(e)}"} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.get("/api/admin/users") |
|
|
async def list_all_users( |
|
|
current_user: User = Depends(get_current_user), |
|
|
db: Session = Depends(get_db) |
|
|
): |
|
|
"""Listar todos los usuarios (solo admins)""" |
|
|
if not current_user.is_admin: |
|
|
raise HTTPException(status_code=403, detail="Acceso denegado") |
|
|
|
|
|
users = db.query(User).all() |
|
|
return { |
|
|
"success": True, |
|
|
"users": [ |
|
|
{ |
|
|
"id": user.id, |
|
|
"username": user.username, |
|
|
"email": user.email, |
|
|
"is_admin": user.is_admin, |
|
|
"created_at": user.created_at.isoformat() if user.created_at else None |
|
|
} |
|
|
for user in users |
|
|
] |
|
|
} |
|
|
|
|
|
@app.get("/api/admin/sessions") |
|
|
async def list_all_sessions( |
|
|
current_user: User = Depends(get_current_user), |
|
|
db: Session = Depends(get_db) |
|
|
): |
|
|
"""Listar todas las sesiones del sistema (solo admins)""" |
|
|
if not current_user.is_admin: |
|
|
raise HTTPException(status_code=403, detail="Acceso denegado") |
|
|
|
|
|
sessions = db.query(UserSession).all() |
|
|
return { |
|
|
"success": True, |
|
|
"sessions": [ |
|
|
{ |
|
|
"session_name": session.session_name, |
|
|
"user_id": session.user_id, |
|
|
"created_at": session.created_at.isoformat() if session.created_at else None, |
|
|
"is_active": session.is_active |
|
|
} |
|
|
for session in sessions |
|
|
] |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.get("/image/{session_name}/{image_name}") |
|
|
async def serve_session_image( |
|
|
session_name: str, |
|
|
image_name: str, |
|
|
current_user: User = Depends(get_optional_user), |
|
|
db: Session = Depends(get_db) |
|
|
): |
|
|
"""Servir imágenes de las sesiones con control de acceso""" |
|
|
try: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
image_path = os.path.join("annotations", session_name, "images", image_name) |
|
|
print(f"🔍 Buscando imagen en: {image_path}") |
|
|
print(f"🔍 Existe archivo: {os.path.exists(image_path)}") |
|
|
|
|
|
if os.path.exists(image_path): |
|
|
print(f"✅ Sirviendo imagen: {image_path}") |
|
|
return FileResponse(image_path) |
|
|
else: |
|
|
|
|
|
svg_content = f""" |
|
|
<svg width="300" height="200" xmlns="http://www.w3.org/2000/svg"> |
|
|
<rect width="300" height="200" fill="#f8f9fa" stroke="#dee2e6"/> |
|
|
<text x="150" y="90" text-anchor="middle" fill="#6c757d" font-family="Arial" font-size="14"> |
|
|
📷 Imagen no encontrada |
|
|
</text> |
|
|
<text x="150" y="110" text-anchor="middle" fill="#adb5bd" font-family="Arial" font-size="12"> |
|
|
{image_name} |
|
|
</text> |
|
|
<text x="150" y="130" text-anchor="middle" fill="#adb5bd" font-family="Arial" font-size="10"> |
|
|
Sesión: {session_name} |
|
|
</text> |
|
|
</svg> |
|
|
""" |
|
|
return HTMLResponse(content=svg_content, media_type="image/svg+xml") |
|
|
except HTTPException: |
|
|
raise |
|
|
except Exception as e: |
|
|
|
|
|
svg_error = f""" |
|
|
<svg width="300" height="200" xmlns="http://www.w3.org/2000/svg"> |
|
|
<rect width="300" height="200" fill="#f8d7da" stroke="#f5c6cb"/> |
|
|
<text x="150" y="100" text-anchor="middle" fill="#721c24" font-family="Arial" font-size="12"> |
|
|
❌ Error: {str(e)[:30]} |
|
|
</text> |
|
|
</svg> |
|
|
""" |
|
|
return HTMLResponse(content=svg_error, media_type="image/svg+xml") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
print("🚀 Iniciando YOLO Image Annotator con JWT Auth") |
|
|
print("📍 Abre tu navegador en: http://localhost:8002") |
|
|
print("🔐 Primera cuenta registrada será ADMIN") |
|
|
uvicorn.run("app_auth:app", host="127.0.0.1", port=8002, reload=False) |
|
|
|