import os import cv2 import numpy as np import asyncio import logging import time from reportlab.lib.pagesizes import letter from reportlab.pdfgen import canvas from reportlab.lib.units import inch from config import CONFIG logger = logging.getLogger(__name__) def preprocess_frame(frame): frame = cv2.resize(frame, CONFIG["TARGET_RESOLUTION"], interpolation=cv2.INTER_LINEAR) frame = cv2.convertScaleAbs(frame, alpha=1.2, beta=20) return frame def draw_detections(frame, detections): result_frame = frame.copy() for det in detections: label = det.get("violation", "Unknown") confidence = det.get("confidence", 0.0) x, y, w, h = det.get("bounding_box", [0, 0, 0, 0]) worker_id = det.get("worker_id", "Unknown") x1, y1 = int(x - w/2), int(y - h/2) x2, y2 = int(x + w/2), int(y + h/2) color = CONFIG["CLASS_COLORS"].get(label, (0, 0, 255)) cv2.rectangle(result_frame, (x1, y1), (x2, y2), color, 3) display_text = f"{CONFIG['DISPLAY_NAMES'].get(label, label)} (Worker {worker_id})" text_size = cv2.getTextSize(display_text, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)[0] cv2.rectangle(result_frame, (x1, y1-text_size[1]-10), (x1+text_size[0]+10, y1), (0, 0, 0), -1) cv2.putText(result_frame, display_text, (x1+5, y1-5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) cv2.putText(result_frame, f"Conf: {confidence:.2f}", (x1+5, y2+20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2) return result_frame def calculate_safety_score(violations): penalties = { "no_helmet": 25, "no_harness": 30, "unsafe_posture": 20, "unsafe_zone": 35, "improper_tool_use": 25 } worker_violations = {} for v in violations: worker_id = v.get("worker_id", "Unknown") worker_violations.setdefault(worker_id, set()).add(v.get("violation", "Unknown")) total_penalty = sum(sum(penalties.get(v, 0) for v in worker_violations[wid]) for wid in worker_violations) return max(0, 100 - total_penalty) async def generate_violation_pdf(violations, score, output_dir): try: pdf_filename = f"violations_{int(time.time())}.pdf" pdf_path = os.path.join(output_dir, pdf_filename) pdf_file = open(pdf_path, "wb") c = canvas.Canvas(pdf_file, pagesize=letter) c.setFont("Helvetica-Bold", 16) c.drawString(1 * inch, 10 * inch, "Worksite Safety Violation Report") c.setFont("Helvetica", 12) c.drawString(1 * inch, 9.5 * inch, f"Date: {time.strftime('%Y-%m-%d')}") c.setFont("Helvetica-Bold", 14) c.drawString(1 * inch, 8.7 * inch, f"Safety Compliance Score: {score}%") c.setFont("Helvetica", 10) y_position = 8.2 * inch worker_violations = {} for v in violations: worker_id = v.get("worker_id", "Unknown") worker_violations.setdefault(worker_id, []).append(v) for worker_id, worker_vios in worker_violations.items(): c.drawString(1 * inch, y_position, f"Worker {worker_id}:") y_position -= 0.2 * inch for v in worker_vios: display_name = CONFIG["DISPLAY_NAMES"].get(v.get("violation", "Unknown"), "Unknown") c.drawString(1.2 * inch, y_position, f" - {display_name} at {v.get('timestamp', 0.0):.2f}s (Conf: {v.get('confidence', 0.0):.2f})") y_position -= 0.2 * inch if y_position < 1 * inch: c.showPage() y_position = 10 * inch c.save() pdf_file.close() logger.info(f"PDF generated: {pdf_path}") return pdf_path except Exception as e: logger.error(f"Error generating PDF: {e}") return "" def clean_output_directory(max_age_seconds=86400): output_dir = os.path.join("static", "output") if not os.path.exists(output_dir): return current_time = time.time() for filename in os.listdir(output_dir): file_path = os.path.join(output_dir, filename) if os.path.isfile(file_path): file_age = current_time - os.path.getmtime(file_path) if file_age > max_age_seconds: try: os.remove(file_path) logger.info(f"Removed old output file: {file_path}") except Exception as e: logger.error(f"Failed to remove {file_path}: {e}")