daniel-saed commited on
Commit
2d578e2
verified
1 Parent(s): 40a95d8

Update utils/helper.py

Browse files
Files changed (1) hide show
  1. utils/helper.py +550 -549
utils/helper.py CHANGED
@@ -1,549 +1,550 @@
1
- import cv2
2
- import numpy as np
3
- from typing import Tuple
4
- import tempfile
5
- import os
6
- from PIL import Image
7
- import sys
8
- from pymongo import MongoClient
9
- from dotenv import load_dotenv
10
- import os
11
- import streamlit as st
12
-
13
- try:
14
- if getattr(sys, 'frozen', False):
15
- # En el ejecutable, intentar sys._MEIPASS
16
- BASE_DIR = getattr(sys, '_MEIPASS', os.path.dirname(sys.executable))
17
- print(f"Executable mode - Initial BASE_DIR: {BASE_DIR} (_MEIPASS: {hasattr(sys, '_MEIPASS')})")
18
- # Verificar si BASE_DIR contiene los archivos esperados
19
- expected_dirs = ['navigation', 'models', 'assets', 'img', 'utils']
20
- if not any(os.path.exists(os.path.join(BASE_DIR, d)) for d in expected_dirs):
21
- print(f"Warning: Expected directories not found in {BASE_DIR}")
22
- # Buscar _MEI<random> en el directorio padre
23
- temp_dir = os.path.dirname(BASE_DIR) if BASE_DIR != os.path.dirname(sys.executable) else BASE_DIR
24
- for d in os.listdir(temp_dir):
25
- if d.startswith('_MEI'):
26
- candidate = os.path.join(temp_dir, d)
27
- if any(os.path.exists(os.path.join(candidate, ed)) for ed in expected_dirs):
28
- BASE_DIR = candidate
29
- print(f"Adjusted BASE_DIR to _MEI directory: {BASE_DIR}")
30
- break
31
- else:
32
- print(f"No _MEI directory found in {temp_dir}, using {BASE_DIR}")
33
- else:
34
- # En desarrollo, usar el directorio del proyecto
35
- current_file = os.path.abspath(os.path.realpath(__file__))
36
- print(f"Development mode - Current file: {current_file}")
37
- BASE_DIR = os.path.dirname(os.path.dirname(current_file)) # Subir de utils/ a F1-machine-learning-webapp/
38
- print(f"Development mode - BASE_DIR: {BASE_DIR}")
39
- except Exception as e:
40
- print(f"Error setting BASE_DIR: {e}")
41
- # Fallback
42
- BASE_DIR = os.path.dirname(os.path.abspath(os.path.realpath(__file__)))
43
- BASE_DIR = os.path.dirname(BASE_DIR)
44
- print(f"Fallback BASE_DIR: {BASE_DIR}")
45
-
46
- BASE_DIR = os.path.normpath(BASE_DIR)
47
- print(f"Final BASE_DIR: {BASE_DIR}")
48
-
49
-
50
-
51
-
52
- #load_dotenv() # Carga las variables desde .env
53
- #mongo_uri = os.getenv("MONGO_URI")
54
- @st.cache_resource
55
- def get_mongo_client():
56
- return MongoClient(st.secrets["MONGO_URI"])
57
- client = get_mongo_client()
58
-
59
-
60
- def get_metrics_collections():
61
-
62
- db = client["f1_data"]
63
- metrics_collection = db["usage_metrics"]
64
- metrics_page = db["visits"]
65
- return metrics_collection, metrics_page, db
66
-
67
- metrics_collection, metrics_page, db = get_metrics_collections()
68
- '''if not metrics_page.find_one({"page": "inicio"}):
69
- metrics_page.insert_one({"page": "inicio", "visits": 0})
70
- if not metrics_collection.find_one({"action": "descargar_app"}):
71
- metrics_collection.insert_one({"action": "descargar_app", "count": 0})'''
72
- '''except:
73
- print("Error loading MongoDB URI from .env file. Please check your configuration.")
74
- client = None
75
- metrics_collection = None
76
- metrics_page = None
77
- db = None'''
78
-
79
-
80
- #-------------YOLO ONNX HELPERS-------------------
81
-
82
- def preprocess_image_tensor(image_rgb: np.ndarray) -> np.ndarray:
83
- """Preprocess image to match Ultralytics YOLOv8."""
84
-
85
- '''input = np.array(image_rgb)
86
- input = input.transpose(2, 0, 1)
87
- input = input.reshape(1,3,224,224).astype("float32")
88
- input = input/255.0'''
89
-
90
- input_data = image_rgb.transpose(2, 0, 1).reshape(1, 3, 224, 224)
91
-
92
- # Convert to float32 and normalize to [0, 1]
93
- input_data = input_data.astype(np.float32) / 255.0
94
-
95
- return input_data
96
-
97
- def postprocess_outputs(outputs: list, height: int, width: int) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
98
- """Process ONNX model outputs for a single-class model."""
99
- res_size = 56
100
- output0 = outputs[0]
101
- output1 = outputs[1]
102
-
103
- output0 = output0[0].transpose()
104
- output1 = output1[0]
105
-
106
- boxes = output0[:,0:5]
107
- masks = output0[:,5:]
108
-
109
- output1 = output1.reshape(32,res_size*res_size)
110
-
111
- masks = masks @ output1
112
-
113
- boxes = np.hstack([boxes,masks])
114
-
115
- yolo_classes = [
116
- "helmet"
117
- ]
118
-
119
- # parse and filter all boxes
120
- objects = []
121
- for row in boxes:
122
- xc,yc,w,h = row[:4]
123
- x1 = (xc-w/2)/224*width
124
- y1 = (yc-h/2)/224*height
125
- x2 = (xc+w/2)/224*width
126
- y2 = (yc+h/2)/224*height
127
- prob = row[4:5].max()
128
- if prob < 0.2:
129
- continue
130
- class_id = row[4:5].argmax()
131
- label = yolo_classes[class_id]
132
-
133
- mask = get_mask(row[5:25684], (x1,y1,x2,y2), width, height)
134
- try:
135
- polygon = get_polygon(mask)
136
- except:
137
- continue
138
- objects.append([x1,y1,x2,y2,label,prob,mask,polygon])
139
-
140
-
141
-
142
- # apply non-maximum suppression
143
- objects.sort(key=lambda x: x[5], reverse=True)
144
- result = []
145
- while len(objects)>0:
146
- result.append(objects[0])
147
- objects = [object for object in objects if iou(object,objects[0])<0.7]
148
-
149
-
150
-
151
- return True,result
152
-
153
- def intersection(box1,box2):
154
- box1_x1,box1_y1,box1_x2,box1_y2 = box1[:4]
155
- box2_x1,box2_y1,box2_x2,box2_y2 = box2[:4]
156
- x1 = max(box1_x1,box2_x1)
157
- y1 = max(box1_y1,box2_y1)
158
- x2 = min(box1_x2,box2_x2)
159
- y2 = min(box1_y2,box2_y2)
160
- return (x2-x1)*(y2-y1)
161
-
162
- def union(box1,box2):
163
- box1_x1,box1_y1,box1_x2,box1_y2 = box1[:4]
164
- box2_x1,box2_y1,box2_x2,box2_y2 = box2[:4]
165
- box1_area = (box1_x2-box1_x1)*(box1_y2-box1_y1)
166
- box2_area = (box2_x2-box2_x1)*(box2_y2-box2_y1)
167
- return box1_area + box2_area - intersection(box1,box2)
168
-
169
- def iou(box1,box2):
170
- return intersection(box1,box2)/union(box1,box2)
171
-
172
- def sigmoid(z):
173
- return 1/(1 + np.exp(-z))
174
-
175
- # parse segmentation mask
176
- def get_mask(row, box, img_width, img_height):
177
- # convert mask to image (matrix of pixels)
178
- res_size = 56
179
- mask = row.reshape(res_size,res_size)
180
- mask = sigmoid(mask)
181
- mask = (mask > 0.2).astype("uint8")*255
182
- # crop the object defined by "box" from mask
183
- x1,y1,x2,y2 = box
184
- mask_x1 = round(x1/img_width*res_size)
185
- mask_y1 = round(y1/img_height*res_size)
186
- mask_x2 = round(x2/img_width*res_size)
187
- mask_y2 = round(y2/img_height*res_size)
188
- mask = mask[mask_y1:mask_y2,mask_x1:mask_x2]
189
- # resize the cropped mask to the size of object
190
- img_mask = Image.fromarray(mask,"L")
191
- img_mask = img_mask.resize((round(x2-x1),round(y2-y1)))
192
- mask = np.array(img_mask)
193
- return mask
194
-
195
-
196
-
197
- # calculate bounding polygon from mask
198
- def get_polygon(mask):
199
- contours = cv2.findContours(mask, cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE)
200
- polygon = [[contour[0][0],contour[0][1]] for contour in contours[0][0]]
201
- return polygon
202
-
203
-
204
-
205
-
206
-
207
-
208
-
209
-
210
-
211
-
212
- #------------------VIDEO CONVERSION------------------
213
-
214
- def convert_video_to_10fps(video_file):
215
- """
216
- Convert an uploaded video file to 10 FPS and return metadata
217
-
218
- Args:
219
- video_file: Streamlit uploaded file object
220
-
221
- Returns:
222
- Dictionary with video metadata and path to converted file
223
- """
224
- try:
225
- # Create temporary file for the original upload
226
- orig_tfile = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4')
227
- orig_tfile.write(video_file.read())
228
- orig_tfile.close()
229
-
230
- # Open the original video to get properties
231
- orig_cap = cv2.VideoCapture(orig_tfile.name)
232
-
233
- if not orig_cap.isOpened():
234
- return {"success": False, "error": "Could not open video file"}
235
-
236
- orig_fps = orig_cap.get(cv2.CAP_PROP_FPS)
237
- width = int(orig_cap.get(cv2.CAP_PROP_FRAME_WIDTH))
238
- height = int(orig_cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
239
- orig_total_frames = int(orig_cap.get(cv2.CAP_PROP_FRAME_COUNT))
240
-
241
- # Calculate duration
242
- duration_seconds = orig_total_frames / orig_fps
243
- expected_frames = int(duration_seconds * 10) # 10 fps
244
-
245
- # Create output temp file
246
- converted_path = tempfile.mktemp(suffix='.mp4')
247
-
248
- # Create VideoWriter
249
- fourcc = cv2.VideoWriter_fourcc(*'mp4v')
250
- out = cv2.VideoWriter(converted_path, fourcc, 10, (width, height))
251
-
252
- # Calculate frame sampling
253
- if orig_fps <= 10:
254
- # If original is slower than target, duplicate frames
255
- step = 1
256
- duplication = int(10 / orig_fps)
257
- else:
258
- # If original is faster, skip frames
259
- step = orig_fps / 10
260
- duplication = 1
261
-
262
- # Convert the video
263
- frame_count = 0
264
- output_count = 0
265
-
266
- while orig_cap.isOpened():
267
- ret, frame = orig_cap.read()
268
- if not ret:
269
- break
270
-
271
- # Determine if we should include this frame
272
- if frame_count % step < 1: # Using modulo < 1 for floating point step values
273
- # Write frame (possibly multiple times)
274
- for _ in range(duplication):
275
- out.write(frame)
276
- output_count += 1
277
-
278
- frame_count += 1
279
-
280
- # Release resources
281
- orig_cap.release()
282
- out.release()
283
- os.unlink(orig_tfile.name) # Delete original temp file
284
-
285
- # Instead of returning a dictionary, read the file back into memory
286
- with open(converted_path, "rb") as f:
287
- video_data = f.read()
288
-
289
- # Clean up the temporary file
290
- os.unlink(converted_path)
291
-
292
- # Return a file-like object
293
- from io import BytesIO
294
- video_io = BytesIO(video_data)
295
- video_io.name = "converted_10fps.mp4"
296
- return video_io
297
-
298
- except Exception as e:
299
- print(f"Error converting video: {e}")
300
- return None
301
-
302
- def recortar_imagen(image,starty_dic, axes_dic):
303
- height, width, _ = image.shape
304
- mask = np.zeros((height, width), dtype=np.uint8)
305
- start_y = int((starty_dic-.02) * height)
306
- cv2.rectangle(mask, (0, start_y), (width, height), 255, -1)
307
- center = (width // 2, start_y)
308
- axes = (width // 2, int(axes_dic * height))
309
- cv2.ellipse(mask, center, axes, 0, 180, 360, 255, -1)
310
- result = cv2.bitwise_and(image, image, mask=mask)
311
- return result
312
-
313
- def recortar_imagen_again(image,starty_dic, axes_dic):
314
-
315
- try:
316
- height, width,_ = image.shape
317
- except :
318
- height, width = image.shape
319
-
320
- mask = np.zeros((height, width), dtype=np.uint8)
321
-
322
- start_y = int(starty_dic * height)
323
- cv2.rectangle(mask, (0, start_y), (width, height), 255, -1)
324
- center = (width // 2, start_y)
325
- axes = (width // 2, int(axes_dic * height))
326
- cv2.ellipse(mask, center, axes, 0, 180, 360, 255, -1)
327
- result = cv2.bitwise_and(image, image, mask=mask)
328
- return result
329
-
330
- def calculate_black_pixels_percentage(image):
331
- """
332
- Calcula el porcentaje de p铆xeles totalmente negros en la imagen.
333
-
334
- Args:
335
- image: Imagen cargada con cv2 (BGR o escala de grises).
336
- is_grayscale: True si la imagen ya est谩 en escala de gruises, False si es a color.
337
-
338
- Returns:
339
- float: Porcentaje de p铆xeles negros.
340
- """
341
- # Obtener dimensiones
342
- '''image = cv2.imread(image_path)
343
- image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)'''
344
- if image is None:
345
- print(f"Error loading image")
346
- return 0
347
-
348
- if len(image.shape) == 3:
349
- image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
350
- else:
351
- image = image.copy()
352
- h, w = image.shape[:2]
353
- total_pixels = h * w
354
-
355
- black_pixels = np.sum(image < 10)
356
-
357
- # Calcular porcentaje
358
- percentage = (black_pixels / total_pixels) * 100
359
-
360
-
361
- percentage = (100.00 - float(percentage)) * .06
362
-
363
-
364
- return percentage
365
-
366
- def create_rectangular_roi(height, width, x1=0, y1=0, x2=None, y2=None):
367
- if x2 is None:
368
- x2 = width
369
- if y2 is None:
370
- y2 = height
371
- mask = np.zeros((height, width), dtype=np.uint8)
372
- cv2.rectangle(mask, (x1, y1), (x2, y2), 255, -1)
373
- return mask
374
-
375
- def preprocess_image(image, mask=None):
376
- if len(image.shape) == 3:
377
- gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
378
- else:
379
- gray = image.copy()
380
-
381
- denoised = cv2.bilateralFilter(gray, d=3, sigmaColor=20, sigmaSpace=10)
382
- sharpened = cv2.addWeighted(denoised, 3.0, denoised, -2.0, 0)
383
- normalized = cv2.normalize(sharpened, None, 0, 255, cv2.NORM_MINMAX)
384
-
385
- if mask is not None:
386
- return cv2.bitwise_and(normalized, normalized, mask=mask)
387
- return normalized
388
-
389
- def calculate_robust_rms_contrast(image, mask=None, bright_threshold=240):
390
- if len(image.shape) == 3:
391
- image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
392
-
393
- if mask is not None:
394
- masked_image = image[mask > 0]
395
- else:
396
- masked_image = image.ravel()
397
-
398
- if len(masked_image) == 0:
399
- mean = np.mean(image)
400
- std_dev = np.sqrt(np.mean((image - mean) ** 2))
401
- else:
402
- mask_bright = masked_image < bright_threshold
403
- masked_image = masked_image[mask_bright]
404
- if len(masked_image) == 0:
405
- mean = np.mean(image)
406
- std_dev = np.sqrt(np.mean((image - mean) ** 2))
407
- else:
408
- mean = np.mean(masked_image)
409
- std_dev = np.sqrt(np.mean((masked_image - mean) ** 2))
410
- return std_dev / 255.0
411
-
412
- def adaptive_clahe_iterative(image, roi_mask, initial_clip_limit=1.0, max_clip_limit=10.0, iterations=20, target_rms_min=0.199, target_rms_max=0.5, bright_threshold=230):
413
- if len(image.shape) == 3:
414
- original_gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
415
- else:
416
- original_gray = image.copy()
417
-
418
- #preprocessed_image = preprocess_image(original_gray)
419
-
420
- best_image = original_gray.copy()
421
- best_rms = calculate_robust_rms_contrast(original_gray, roi_mask, bright_threshold)
422
- clip_limit = initial_clip_limit
423
-
424
- for i in range(iterations):
425
- clahe = cv2.createCLAHE(clipLimit=clip_limit, tileGridSize=(8, 8))
426
- current_image = clahe.apply(original_gray)
427
-
428
- rms_contrast = calculate_robust_rms_contrast(current_image, roi_mask, bright_threshold)
429
-
430
- if target_rms_min <= rms_contrast <= target_rms_max:
431
- return current_image
432
- if rms_contrast > best_rms:
433
- best_rms = rms_contrast
434
- best_image = current_image.copy()
435
- if rms_contrast > target_rms_max:
436
- clip_limit = min(clip_limit, 1.0)
437
- else:
438
- clip_limit = min(initial_clip_limit + (i * 0.5), max_clip_limit)
439
-
440
- return best_image
441
-
442
- def adaptive_edge_detection(imagen, min_edge_percentage=5.5, max_edge_percentage=6.5, target_percentage=6.0, max_attempts=5,mode="Default"):
443
- """
444
- Detecta bordes con ajuste progresivo de par谩metros hasta lograr un porcentaje 贸ptimo
445
- de p铆xeles de borde en la imagen - optimizado con operaciones vectorizadas.
446
- """
447
- # Read image
448
- original = imagen
449
- if original is None:
450
- print(f"Error loading image")
451
- return None, None, None, None
452
-
453
- # Convert to grayscale
454
- gray = original
455
-
456
- # Calculate total pixels for percentage calculation
457
- total_pixels = gray.shape[0] * gray.shape[1]
458
- min_edge_pixels = int((min_edge_percentage / 100) * total_pixels)
459
- max_edge_pixels = int((max_edge_percentage / 100) * total_pixels)
460
- target_edge_pixels = int((target_percentage / 100) * total_pixels)
461
-
462
- # Initial parameters - ajustados para conseguir un rango alrededor del 6% de bordes
463
- clip_limits = [1]
464
- grid_sizes = [(2, 2)]
465
- # Empezamos con umbrales m谩s altos para restringir la cantidad de bordes
466
- canny_thresholds = [(55, 170), (45, 160), (35, 150), (25, 140), (20, 130),(20, 130),(20, 130)]
467
-
468
- best_edges = None
469
- best_enhanced = None
470
- best_config = None
471
- best_edge_score = float('inf') # Inicializamos con un valor alto
472
- edge_percentage = 0
473
-
474
-
475
- # Try progressively more aggressive parameters
476
- for attempt in range(max_attempts):
477
- # Get parameters for this attempt
478
- clip_limit = clip_limits[attempt]
479
- grid_size = grid_sizes[attempt]
480
- low_threshold, high_threshold = canny_thresholds[attempt]
481
-
482
- if edge_percentage <= max_edge_percentage:
483
- clahe = cv2.createCLAHE(clipLimit=clip_limit, tileGridSize=grid_size)
484
- elif edge_count > max_edge_percentage:
485
- # Si hay demasiados bordes, aplicamos un CLAHE m谩s fuerte
486
- clahe = cv2.createCLAHE(clipLimit=1, tileGridSize=grid_size)
487
-
488
- enhanced = clahe.apply(gray)
489
-
490
-
491
- #print("denoised shape:", denoised.shape, "dtype:", denoised.dtype)
492
- # Apply noise reduction for higher attempts
493
- '''if attempt >= 2:
494
- enhanced = cv2.bilateralFilter(enhanced, 5, 100, 100)'''
495
-
496
-
497
-
498
- if mode == "Default":
499
- denoised = cv2.bilateralFilter(enhanced, d=5, sigmaColor=200, sigmaSpace=200)
500
- median_intensity = np.median(denoised)
501
- low_threshold = max(20, (1.0 - .3) * median_intensity)
502
- high_threshold = max(80, (1.0 + .8) * median_intensity)
503
- elif mode == "Low ilumination":
504
- denoised = cv2.bilateralFilter(enhanced, d=5, sigmaColor=200, sigmaSpace=200)
505
- median_intensity = np.median(denoised)
506
- low_threshold = max(20, (1.0 - .3) * median_intensity)
507
- high_threshold = max(80, (1.0 + .8) * median_intensity)
508
- # Edge detection
509
-
510
- edges = cv2.Canny(denoised, low_threshold, high_threshold)
511
- std_intensity = np.std(edges)
512
-
513
- # Reducir ruido con operaciones morfol贸gicas - vectorizado
514
- kernel = np.ones((1, 1), np.uint8)
515
- edges = cv2.morphologyEx(
516
- edges,
517
- cv2.MORPH_OPEN,
518
- kernel,
519
- iterations=0 if std_intensity < 60 else 1 # M谩s iteraciones si hay m谩s ruido
520
- )
521
-
522
-
523
- # Count edge pixels - vectorizado usando np.count_nonzero
524
- edge_count = np.count_nonzero(edges)
525
- edge_percentage = (edge_count / total_pixels) * 100
526
-
527
- # Calcular distancia al objetivo - vectorizado
528
- edge_score = abs(edge_count - target_edge_pixels)
529
-
530
- # Record the best attempt (closest to target percentage)
531
- if edge_score < best_edge_score:
532
- best_edge_score = edge_score
533
- best_edges = edges.copy() # Hacer copia para evitar sobrescrituras
534
- best_enhanced = enhanced.copy()
535
- best_config = {
536
- 'attempt': attempt + 1,
537
- 'clip_limit': clip_limit,
538
- 'grid_size': grid_size,
539
- 'canny_thresholds': (low_threshold, high_threshold),
540
- 'edge_pixels': edge_count,
541
- 'edge_percentage': edge_percentage
542
- }
543
-
544
- # Salida temprana si estamos cerca del objetivo
545
- if abs(edge_percentage - target_percentage) < 0.1: # Within 0.2% of target
546
- break
547
-
548
- print(f"Mejor intento: {best_config['attempt']}, porcentaje de bordes: {edge_percentage:.2f}%")
549
- return best_enhanced, best_edges, original, best_config
 
 
1
+ import cv2
2
+ import numpy as np
3
+ from typing import Tuple
4
+ import tempfile
5
+ import os
6
+ from PIL import Image
7
+ import sys
8
+ from pymongo import MongoClient
9
+ from dotenv import load_dotenv
10
+ import os
11
+ import streamlit as st
12
+
13
+ try:
14
+ if getattr(sys, 'frozen', False):
15
+ # En el ejecutable, intentar sys._MEIPASS
16
+ BASE_DIR = getattr(sys, '_MEIPASS', os.path.dirname(sys.executable))
17
+ print(f"Executable mode - Initial BASE_DIR: {BASE_DIR} (_MEIPASS: {hasattr(sys, '_MEIPASS')})")
18
+ # Verificar si BASE_DIR contiene los archivos esperados
19
+ expected_dirs = ['navigation', 'models', 'assets', 'img', 'utils']
20
+ if not any(os.path.exists(os.path.join(BASE_DIR, d)) for d in expected_dirs):
21
+ print(f"Warning: Expected directories not found in {BASE_DIR}")
22
+ # Buscar _MEI<random> en el directorio padre
23
+ temp_dir = os.path.dirname(BASE_DIR) if BASE_DIR != os.path.dirname(sys.executable) else BASE_DIR
24
+ for d in os.listdir(temp_dir):
25
+ if d.startswith('_MEI'):
26
+ candidate = os.path.join(temp_dir, d)
27
+ if any(os.path.exists(os.path.join(candidate, ed)) for ed in expected_dirs):
28
+ BASE_DIR = candidate
29
+ print(f"Adjusted BASE_DIR to _MEI directory: {BASE_DIR}")
30
+ break
31
+ else:
32
+ print(f"No _MEI directory found in {temp_dir}, using {BASE_DIR}")
33
+ else:
34
+ # En desarrollo, usar el directorio del proyecto
35
+ current_file = os.path.abspath(os.path.realpath(__file__))
36
+ print(f"Development mode - Current file: {current_file}")
37
+ BASE_DIR = os.path.dirname(os.path.dirname(current_file)) # Subir de utils/ a F1-machine-learning-webapp/
38
+ print(f"Development mode - BASE_DIR: {BASE_DIR}")
39
+ except Exception as e:
40
+ print(f"Error setting BASE_DIR: {e}")
41
+ # Fallback
42
+ BASE_DIR = os.path.dirname(os.path.abspath(os.path.realpath(__file__)))
43
+ BASE_DIR = os.path.dirname(BASE_DIR)
44
+ print(f"Fallback BASE_DIR: {BASE_DIR}")
45
+
46
+ BASE_DIR = os.path.normpath(BASE_DIR)
47
+ print(f"Final BASE_DIR: {BASE_DIR}")
48
+
49
+
50
+
51
+
52
+ #load_dotenv() # Carga las variables desde .env
53
+ #mongo_uri = os.getenv("MONGO_URI")
54
+ @st.cache_resource
55
+ def get_mongo_client():
56
+
57
+ return os.getenv('MONGO_URI')
58
+ client = get_mongo_client()
59
+
60
+
61
+ def get_metrics_collections():
62
+
63
+ db = client["f1_data"]
64
+ metrics_collection = db["usage_metrics"]
65
+ metrics_page = db["visits"]
66
+ return metrics_collection, metrics_page, db
67
+
68
+ metrics_collection, metrics_page, db = get_metrics_collections()
69
+ '''if not metrics_page.find_one({"page": "inicio"}):
70
+ metrics_page.insert_one({"page": "inicio", "visits": 0})
71
+ if not metrics_collection.find_one({"action": "descargar_app"}):
72
+ metrics_collection.insert_one({"action": "descargar_app", "count": 0})'''
73
+ '''except:
74
+ print("Error loading MongoDB URI from .env file. Please check your configuration.")
75
+ client = None
76
+ metrics_collection = None
77
+ metrics_page = None
78
+ db = None'''
79
+
80
+
81
+ #-------------YOLO ONNX HELPERS-------------------
82
+
83
+ def preprocess_image_tensor(image_rgb: np.ndarray) -> np.ndarray:
84
+ """Preprocess image to match Ultralytics YOLOv8."""
85
+
86
+ '''input = np.array(image_rgb)
87
+ input = input.transpose(2, 0, 1)
88
+ input = input.reshape(1,3,224,224).astype("float32")
89
+ input = input/255.0'''
90
+
91
+ input_data = image_rgb.transpose(2, 0, 1).reshape(1, 3, 224, 224)
92
+
93
+ # Convert to float32 and normalize to [0, 1]
94
+ input_data = input_data.astype(np.float32) / 255.0
95
+
96
+ return input_data
97
+
98
+ def postprocess_outputs(outputs: list, height: int, width: int) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
99
+ """Process ONNX model outputs for a single-class model."""
100
+ res_size = 56
101
+ output0 = outputs[0]
102
+ output1 = outputs[1]
103
+
104
+ output0 = output0[0].transpose()
105
+ output1 = output1[0]
106
+
107
+ boxes = output0[:,0:5]
108
+ masks = output0[:,5:]
109
+
110
+ output1 = output1.reshape(32,res_size*res_size)
111
+
112
+ masks = masks @ output1
113
+
114
+ boxes = np.hstack([boxes,masks])
115
+
116
+ yolo_classes = [
117
+ "helmet"
118
+ ]
119
+
120
+ # parse and filter all boxes
121
+ objects = []
122
+ for row in boxes:
123
+ xc,yc,w,h = row[:4]
124
+ x1 = (xc-w/2)/224*width
125
+ y1 = (yc-h/2)/224*height
126
+ x2 = (xc+w/2)/224*width
127
+ y2 = (yc+h/2)/224*height
128
+ prob = row[4:5].max()
129
+ if prob < 0.2:
130
+ continue
131
+ class_id = row[4:5].argmax()
132
+ label = yolo_classes[class_id]
133
+
134
+ mask = get_mask(row[5:25684], (x1,y1,x2,y2), width, height)
135
+ try:
136
+ polygon = get_polygon(mask)
137
+ except:
138
+ continue
139
+ objects.append([x1,y1,x2,y2,label,prob,mask,polygon])
140
+
141
+
142
+
143
+ # apply non-maximum suppression
144
+ objects.sort(key=lambda x: x[5], reverse=True)
145
+ result = []
146
+ while len(objects)>0:
147
+ result.append(objects[0])
148
+ objects = [object for object in objects if iou(object,objects[0])<0.7]
149
+
150
+
151
+
152
+ return True,result
153
+
154
+ def intersection(box1,box2):
155
+ box1_x1,box1_y1,box1_x2,box1_y2 = box1[:4]
156
+ box2_x1,box2_y1,box2_x2,box2_y2 = box2[:4]
157
+ x1 = max(box1_x1,box2_x1)
158
+ y1 = max(box1_y1,box2_y1)
159
+ x2 = min(box1_x2,box2_x2)
160
+ y2 = min(box1_y2,box2_y2)
161
+ return (x2-x1)*(y2-y1)
162
+
163
+ def union(box1,box2):
164
+ box1_x1,box1_y1,box1_x2,box1_y2 = box1[:4]
165
+ box2_x1,box2_y1,box2_x2,box2_y2 = box2[:4]
166
+ box1_area = (box1_x2-box1_x1)*(box1_y2-box1_y1)
167
+ box2_area = (box2_x2-box2_x1)*(box2_y2-box2_y1)
168
+ return box1_area + box2_area - intersection(box1,box2)
169
+
170
+ def iou(box1,box2):
171
+ return intersection(box1,box2)/union(box1,box2)
172
+
173
+ def sigmoid(z):
174
+ return 1/(1 + np.exp(-z))
175
+
176
+ # parse segmentation mask
177
+ def get_mask(row, box, img_width, img_height):
178
+ # convert mask to image (matrix of pixels)
179
+ res_size = 56
180
+ mask = row.reshape(res_size,res_size)
181
+ mask = sigmoid(mask)
182
+ mask = (mask > 0.2).astype("uint8")*255
183
+ # crop the object defined by "box" from mask
184
+ x1,y1,x2,y2 = box
185
+ mask_x1 = round(x1/img_width*res_size)
186
+ mask_y1 = round(y1/img_height*res_size)
187
+ mask_x2 = round(x2/img_width*res_size)
188
+ mask_y2 = round(y2/img_height*res_size)
189
+ mask = mask[mask_y1:mask_y2,mask_x1:mask_x2]
190
+ # resize the cropped mask to the size of object
191
+ img_mask = Image.fromarray(mask,"L")
192
+ img_mask = img_mask.resize((round(x2-x1),round(y2-y1)))
193
+ mask = np.array(img_mask)
194
+ return mask
195
+
196
+
197
+
198
+ # calculate bounding polygon from mask
199
+ def get_polygon(mask):
200
+ contours = cv2.findContours(mask, cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE)
201
+ polygon = [[contour[0][0],contour[0][1]] for contour in contours[0][0]]
202
+ return polygon
203
+
204
+
205
+
206
+
207
+
208
+
209
+
210
+
211
+
212
+
213
+ #------------------VIDEO CONVERSION------------------
214
+
215
+ def convert_video_to_10fps(video_file):
216
+ """
217
+ Convert an uploaded video file to 10 FPS and return metadata
218
+
219
+ Args:
220
+ video_file: Streamlit uploaded file object
221
+
222
+ Returns:
223
+ Dictionary with video metadata and path to converted file
224
+ """
225
+ try:
226
+ # Create temporary file for the original upload
227
+ orig_tfile = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4')
228
+ orig_tfile.write(video_file.read())
229
+ orig_tfile.close()
230
+
231
+ # Open the original video to get properties
232
+ orig_cap = cv2.VideoCapture(orig_tfile.name)
233
+
234
+ if not orig_cap.isOpened():
235
+ return {"success": False, "error": "Could not open video file"}
236
+
237
+ orig_fps = orig_cap.get(cv2.CAP_PROP_FPS)
238
+ width = int(orig_cap.get(cv2.CAP_PROP_FRAME_WIDTH))
239
+ height = int(orig_cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
240
+ orig_total_frames = int(orig_cap.get(cv2.CAP_PROP_FRAME_COUNT))
241
+
242
+ # Calculate duration
243
+ duration_seconds = orig_total_frames / orig_fps
244
+ expected_frames = int(duration_seconds * 10) # 10 fps
245
+
246
+ # Create output temp file
247
+ converted_path = tempfile.mktemp(suffix='.mp4')
248
+
249
+ # Create VideoWriter
250
+ fourcc = cv2.VideoWriter_fourcc(*'mp4v')
251
+ out = cv2.VideoWriter(converted_path, fourcc, 10, (width, height))
252
+
253
+ # Calculate frame sampling
254
+ if orig_fps <= 10:
255
+ # If original is slower than target, duplicate frames
256
+ step = 1
257
+ duplication = int(10 / orig_fps)
258
+ else:
259
+ # If original is faster, skip frames
260
+ step = orig_fps / 10
261
+ duplication = 1
262
+
263
+ # Convert the video
264
+ frame_count = 0
265
+ output_count = 0
266
+
267
+ while orig_cap.isOpened():
268
+ ret, frame = orig_cap.read()
269
+ if not ret:
270
+ break
271
+
272
+ # Determine if we should include this frame
273
+ if frame_count % step < 1: # Using modulo < 1 for floating point step values
274
+ # Write frame (possibly multiple times)
275
+ for _ in range(duplication):
276
+ out.write(frame)
277
+ output_count += 1
278
+
279
+ frame_count += 1
280
+
281
+ # Release resources
282
+ orig_cap.release()
283
+ out.release()
284
+ os.unlink(orig_tfile.name) # Delete original temp file
285
+
286
+ # Instead of returning a dictionary, read the file back into memory
287
+ with open(converted_path, "rb") as f:
288
+ video_data = f.read()
289
+
290
+ # Clean up the temporary file
291
+ os.unlink(converted_path)
292
+
293
+ # Return a file-like object
294
+ from io import BytesIO
295
+ video_io = BytesIO(video_data)
296
+ video_io.name = "converted_10fps.mp4"
297
+ return video_io
298
+
299
+ except Exception as e:
300
+ print(f"Error converting video: {e}")
301
+ return None
302
+
303
+ def recortar_imagen(image,starty_dic, axes_dic):
304
+ height, width, _ = image.shape
305
+ mask = np.zeros((height, width), dtype=np.uint8)
306
+ start_y = int((starty_dic-.02) * height)
307
+ cv2.rectangle(mask, (0, start_y), (width, height), 255, -1)
308
+ center = (width // 2, start_y)
309
+ axes = (width // 2, int(axes_dic * height))
310
+ cv2.ellipse(mask, center, axes, 0, 180, 360, 255, -1)
311
+ result = cv2.bitwise_and(image, image, mask=mask)
312
+ return result
313
+
314
+ def recortar_imagen_again(image,starty_dic, axes_dic):
315
+
316
+ try:
317
+ height, width,_ = image.shape
318
+ except :
319
+ height, width = image.shape
320
+
321
+ mask = np.zeros((height, width), dtype=np.uint8)
322
+
323
+ start_y = int(starty_dic * height)
324
+ cv2.rectangle(mask, (0, start_y), (width, height), 255, -1)
325
+ center = (width // 2, start_y)
326
+ axes = (width // 2, int(axes_dic * height))
327
+ cv2.ellipse(mask, center, axes, 0, 180, 360, 255, -1)
328
+ result = cv2.bitwise_and(image, image, mask=mask)
329
+ return result
330
+
331
+ def calculate_black_pixels_percentage(image):
332
+ """
333
+ Calcula el porcentaje de p铆xeles totalmente negros en la imagen.
334
+
335
+ Args:
336
+ image: Imagen cargada con cv2 (BGR o escala de grises).
337
+ is_grayscale: True si la imagen ya est谩 en escala de gruises, False si es a color.
338
+
339
+ Returns:
340
+ float: Porcentaje de p铆xeles negros.
341
+ """
342
+ # Obtener dimensiones
343
+ '''image = cv2.imread(image_path)
344
+ image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)'''
345
+ if image is None:
346
+ print(f"Error loading image")
347
+ return 0
348
+
349
+ if len(image.shape) == 3:
350
+ image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
351
+ else:
352
+ image = image.copy()
353
+ h, w = image.shape[:2]
354
+ total_pixels = h * w
355
+
356
+ black_pixels = np.sum(image < 10)
357
+
358
+ # Calcular porcentaje
359
+ percentage = (black_pixels / total_pixels) * 100
360
+
361
+
362
+ percentage = (100.00 - float(percentage)) * .06
363
+
364
+
365
+ return percentage
366
+
367
+ def create_rectangular_roi(height, width, x1=0, y1=0, x2=None, y2=None):
368
+ if x2 is None:
369
+ x2 = width
370
+ if y2 is None:
371
+ y2 = height
372
+ mask = np.zeros((height, width), dtype=np.uint8)
373
+ cv2.rectangle(mask, (x1, y1), (x2, y2), 255, -1)
374
+ return mask
375
+
376
+ def preprocess_image(image, mask=None):
377
+ if len(image.shape) == 3:
378
+ gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
379
+ else:
380
+ gray = image.copy()
381
+
382
+ denoised = cv2.bilateralFilter(gray, d=3, sigmaColor=20, sigmaSpace=10)
383
+ sharpened = cv2.addWeighted(denoised, 3.0, denoised, -2.0, 0)
384
+ normalized = cv2.normalize(sharpened, None, 0, 255, cv2.NORM_MINMAX)
385
+
386
+ if mask is not None:
387
+ return cv2.bitwise_and(normalized, normalized, mask=mask)
388
+ return normalized
389
+
390
+ def calculate_robust_rms_contrast(image, mask=None, bright_threshold=240):
391
+ if len(image.shape) == 3:
392
+ image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
393
+
394
+ if mask is not None:
395
+ masked_image = image[mask > 0]
396
+ else:
397
+ masked_image = image.ravel()
398
+
399
+ if len(masked_image) == 0:
400
+ mean = np.mean(image)
401
+ std_dev = np.sqrt(np.mean((image - mean) ** 2))
402
+ else:
403
+ mask_bright = masked_image < bright_threshold
404
+ masked_image = masked_image[mask_bright]
405
+ if len(masked_image) == 0:
406
+ mean = np.mean(image)
407
+ std_dev = np.sqrt(np.mean((image - mean) ** 2))
408
+ else:
409
+ mean = np.mean(masked_image)
410
+ std_dev = np.sqrt(np.mean((masked_image - mean) ** 2))
411
+ return std_dev / 255.0
412
+
413
+ def adaptive_clahe_iterative(image, roi_mask, initial_clip_limit=1.0, max_clip_limit=10.0, iterations=20, target_rms_min=0.199, target_rms_max=0.5, bright_threshold=230):
414
+ if len(image.shape) == 3:
415
+ original_gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
416
+ else:
417
+ original_gray = image.copy()
418
+
419
+ #preprocessed_image = preprocess_image(original_gray)
420
+
421
+ best_image = original_gray.copy()
422
+ best_rms = calculate_robust_rms_contrast(original_gray, roi_mask, bright_threshold)
423
+ clip_limit = initial_clip_limit
424
+
425
+ for i in range(iterations):
426
+ clahe = cv2.createCLAHE(clipLimit=clip_limit, tileGridSize=(8, 8))
427
+ current_image = clahe.apply(original_gray)
428
+
429
+ rms_contrast = calculate_robust_rms_contrast(current_image, roi_mask, bright_threshold)
430
+
431
+ if target_rms_min <= rms_contrast <= target_rms_max:
432
+ return current_image
433
+ if rms_contrast > best_rms:
434
+ best_rms = rms_contrast
435
+ best_image = current_image.copy()
436
+ if rms_contrast > target_rms_max:
437
+ clip_limit = min(clip_limit, 1.0)
438
+ else:
439
+ clip_limit = min(initial_clip_limit + (i * 0.5), max_clip_limit)
440
+
441
+ return best_image
442
+
443
+ def adaptive_edge_detection(imagen, min_edge_percentage=5.5, max_edge_percentage=6.5, target_percentage=6.0, max_attempts=5,mode="Default"):
444
+ """
445
+ Detecta bordes con ajuste progresivo de par谩metros hasta lograr un porcentaje 贸ptimo
446
+ de p铆xeles de borde en la imagen - optimizado con operaciones vectorizadas.
447
+ """
448
+ # Read image
449
+ original = imagen
450
+ if original is None:
451
+ print(f"Error loading image")
452
+ return None, None, None, None
453
+
454
+ # Convert to grayscale
455
+ gray = original
456
+
457
+ # Calculate total pixels for percentage calculation
458
+ total_pixels = gray.shape[0] * gray.shape[1]
459
+ min_edge_pixels = int((min_edge_percentage / 100) * total_pixels)
460
+ max_edge_pixels = int((max_edge_percentage / 100) * total_pixels)
461
+ target_edge_pixels = int((target_percentage / 100) * total_pixels)
462
+
463
+ # Initial parameters - ajustados para conseguir un rango alrededor del 6% de bordes
464
+ clip_limits = [1]
465
+ grid_sizes = [(2, 2)]
466
+ # Empezamos con umbrales m谩s altos para restringir la cantidad de bordes
467
+ canny_thresholds = [(55, 170), (45, 160), (35, 150), (25, 140), (20, 130),(20, 130),(20, 130)]
468
+
469
+ best_edges = None
470
+ best_enhanced = None
471
+ best_config = None
472
+ best_edge_score = float('inf') # Inicializamos con un valor alto
473
+ edge_percentage = 0
474
+
475
+
476
+ # Try progressively more aggressive parameters
477
+ for attempt in range(max_attempts):
478
+ # Get parameters for this attempt
479
+ clip_limit = clip_limits[attempt]
480
+ grid_size = grid_sizes[attempt]
481
+ low_threshold, high_threshold = canny_thresholds[attempt]
482
+
483
+ if edge_percentage <= max_edge_percentage:
484
+ clahe = cv2.createCLAHE(clipLimit=clip_limit, tileGridSize=grid_size)
485
+ elif edge_count > max_edge_percentage:
486
+ # Si hay demasiados bordes, aplicamos un CLAHE m谩s fuerte
487
+ clahe = cv2.createCLAHE(clipLimit=1, tileGridSize=grid_size)
488
+
489
+ enhanced = clahe.apply(gray)
490
+
491
+
492
+ #print("denoised shape:", denoised.shape, "dtype:", denoised.dtype)
493
+ # Apply noise reduction for higher attempts
494
+ '''if attempt >= 2:
495
+ enhanced = cv2.bilateralFilter(enhanced, 5, 100, 100)'''
496
+
497
+
498
+
499
+ if mode == "Default":
500
+ denoised = cv2.bilateralFilter(enhanced, d=5, sigmaColor=200, sigmaSpace=200)
501
+ median_intensity = np.median(denoised)
502
+ low_threshold = max(20, (1.0 - .3) * median_intensity)
503
+ high_threshold = max(80, (1.0 + .8) * median_intensity)
504
+ elif mode == "Low ilumination":
505
+ denoised = cv2.bilateralFilter(enhanced, d=5, sigmaColor=200, sigmaSpace=200)
506
+ median_intensity = np.median(denoised)
507
+ low_threshold = max(20, (1.0 - .3) * median_intensity)
508
+ high_threshold = max(80, (1.0 + .8) * median_intensity)
509
+ # Edge detection
510
+
511
+ edges = cv2.Canny(denoised, low_threshold, high_threshold)
512
+ std_intensity = np.std(edges)
513
+
514
+ # Reducir ruido con operaciones morfol贸gicas - vectorizado
515
+ kernel = np.ones((1, 1), np.uint8)
516
+ edges = cv2.morphologyEx(
517
+ edges,
518
+ cv2.MORPH_OPEN,
519
+ kernel,
520
+ iterations=0 if std_intensity < 60 else 1 # M谩s iteraciones si hay m谩s ruido
521
+ )
522
+
523
+
524
+ # Count edge pixels - vectorizado usando np.count_nonzero
525
+ edge_count = np.count_nonzero(edges)
526
+ edge_percentage = (edge_count / total_pixels) * 100
527
+
528
+ # Calcular distancia al objetivo - vectorizado
529
+ edge_score = abs(edge_count - target_edge_pixels)
530
+
531
+ # Record the best attempt (closest to target percentage)
532
+ if edge_score < best_edge_score:
533
+ best_edge_score = edge_score
534
+ best_edges = edges.copy() # Hacer copia para evitar sobrescrituras
535
+ best_enhanced = enhanced.copy()
536
+ best_config = {
537
+ 'attempt': attempt + 1,
538
+ 'clip_limit': clip_limit,
539
+ 'grid_size': grid_size,
540
+ 'canny_thresholds': (low_threshold, high_threshold),
541
+ 'edge_pixels': edge_count,
542
+ 'edge_percentage': edge_percentage
543
+ }
544
+
545
+ # Salida temprana si estamos cerca del objetivo
546
+ if abs(edge_percentage - target_percentage) < 0.1: # Within 0.2% of target
547
+ break
548
+
549
+ print(f"Mejor intento: {best_config['attempt']}, porcentaje de bordes: {edge_percentage:.2f}%")
550
+ return best_enhanced, best_edges, original, best_config