Spaces:
Sleeping
Sleeping
File size: 4,449 Bytes
e2e7a3f f65e238 2b1cb8b e2e7a3f f65e238 e2e7a3f 2b1cb8b e2e7a3f f65e238 e2e7a3f 2b1cb8b e2e7a3f f65e238 e2e7a3f 2b1cb8b f65e238 2b1cb8b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
import os
import cv2
import numpy as np
import asyncio
import logging
import time
from reportlab.lib.pagesizes import letter
from reportlab.pdfgen import canvas
from reportlab.lib.units import inch
from config import CONFIG
logger = logging.getLogger(__name__)
def preprocess_frame(frame):
frame = cv2.resize(frame, CONFIG["TARGET_RESOLUTION"], interpolation=cv2.INTER_LINEAR)
frame = cv2.convertScaleAbs(frame, alpha=1.2, beta=20)
return frame
def draw_detections(frame, detections):
result_frame = frame.copy()
for det in detections:
label = det.get("violation", "Unknown")
confidence = det.get("confidence", 0.0)
x, y, w, h = det.get("bounding_box", [0, 0, 0, 0])
worker_id = det.get("worker_id", "Unknown")
x1, y1 = int(x - w/2), int(y - h/2)
x2, y2 = int(x + w/2), int(y + h/2)
color = CONFIG["CLASS_COLORS"].get(label, (0, 0, 255))
cv2.rectangle(result_frame, (x1, y1), (x2, y2), color, 3)
display_text = f"{CONFIG['DISPLAY_NAMES'].get(label, label)} (Worker {worker_id})"
text_size = cv2.getTextSize(display_text, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)[0]
cv2.rectangle(result_frame, (x1, y1-text_size[1]-10), (x1+text_size[0]+10, y1), (0, 0, 0), -1)
cv2.putText(result_frame, display_text, (x1+5, y1-5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
cv2.putText(result_frame, f"Conf: {confidence:.2f}", (x1+5, y2+20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)
return result_frame
def calculate_safety_score(violations):
penalties = {
"no_helmet": 25, "no_harness": 30, "unsafe_posture": 20,
"unsafe_zone": 35, "improper_tool_use": 25
}
worker_violations = {}
for v in violations:
worker_id = v.get("worker_id", "Unknown")
worker_violations.setdefault(worker_id, set()).add(v.get("violation", "Unknown"))
total_penalty = sum(sum(penalties.get(v, 0) for v in worker_violations[wid]) for wid in worker_violations)
return max(0, 100 - total_penalty)
async def generate_violation_pdf(violations, score, output_dir):
try:
pdf_filename = f"violations_{int(time.time())}.pdf"
pdf_path = os.path.join(output_dir, pdf_filename)
pdf_file = open(pdf_path, "wb")
c = canvas.Canvas(pdf_file, pagesize=letter)
c.setFont("Helvetica-Bold", 16)
c.drawString(1 * inch, 10 * inch, "Worksite Safety Violation Report")
c.setFont("Helvetica", 12)
c.drawString(1 * inch, 9.5 * inch, f"Date: {time.strftime('%Y-%m-%d')}")
c.setFont("Helvetica-Bold", 14)
c.drawString(1 * inch, 8.7 * inch, f"Safety Compliance Score: {score}%")
c.setFont("Helvetica", 10)
y_position = 8.2 * inch
worker_violations = {}
for v in violations:
worker_id = v.get("worker_id", "Unknown")
worker_violations.setdefault(worker_id, []).append(v)
for worker_id, worker_vios in worker_violations.items():
c.drawString(1 * inch, y_position, f"Worker {worker_id}:")
y_position -= 0.2 * inch
for v in worker_vios:
display_name = CONFIG["DISPLAY_NAMES"].get(v.get("violation", "Unknown"), "Unknown")
c.drawString(1.2 * inch, y_position, f" - {display_name} at {v.get('timestamp', 0.0):.2f}s (Conf: {v.get('confidence', 0.0):.2f})")
y_position -= 0.2 * inch
if y_position < 1 * inch:
c.showPage()
y_position = 10 * inch
c.save()
pdf_file.close()
logger.info(f"PDF generated: {pdf_path}")
return pdf_path
except Exception as e:
logger.error(f"Error generating PDF: {e}")
return ""
def clean_output_directory(max_age_seconds=86400):
output_dir = os.path.join("static", "output")
if not os.path.exists(output_dir):
return
current_time = time.time()
for filename in os.listdir(output_dir):
file_path = os.path.join(output_dir, filename)
if os.path.isfile(file_path):
file_age = current_time - os.path.getmtime(file_path)
if file_age > max_age_seconds:
try:
os.remove(file_path)
logger.info(f"Removed old output file: {file_path}")
except Exception as e:
logger.error(f"Failed to remove {file_path}: {e}") |