File size: 4,449 Bytes
e2e7a3f
 
 
f65e238
 
2b1cb8b
e2e7a3f
 
 
 
 
 
 
f65e238
e2e7a3f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2b1cb8b
e2e7a3f
f65e238
e2e7a3f
 
 
 
2b1cb8b
e2e7a3f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f65e238
 
 
e2e7a3f
 
 
 
2b1cb8b
 
 
 
f65e238
2b1cb8b
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import os
import cv2
import numpy as np
import asyncio
import logging
import time
from reportlab.lib.pagesizes import letter
from reportlab.pdfgen import canvas
from reportlab.lib.units import inch
from config import CONFIG

logger = logging.getLogger(__name__)

def preprocess_frame(frame):
    frame = cv2.resize(frame, CONFIG["TARGET_RESOLUTION"], interpolation=cv2.INTER_LINEAR)
    frame = cv2.convertScaleAbs(frame, alpha=1.2, beta=20)
    return frame

def draw_detections(frame, detections):
    result_frame = frame.copy()
    for det in detections:
        label = det.get("violation", "Unknown")
        confidence = det.get("confidence", 0.0)
        x, y, w, h = det.get("bounding_box", [0, 0, 0, 0])
        worker_id = det.get("worker_id", "Unknown")
        x1, y1 = int(x - w/2), int(y - h/2)
        x2, y2 = int(x + w/2), int(y + h/2)
        color = CONFIG["CLASS_COLORS"].get(label, (0, 0, 255))
        cv2.rectangle(result_frame, (x1, y1), (x2, y2), color, 3)
        display_text = f"{CONFIG['DISPLAY_NAMES'].get(label, label)} (Worker {worker_id})"
        text_size = cv2.getTextSize(display_text, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)[0]
        cv2.rectangle(result_frame, (x1, y1-text_size[1]-10), (x1+text_size[0]+10, y1), (0, 0, 0), -1)
        cv2.putText(result_frame, display_text, (x1+5, y1-5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
        cv2.putText(result_frame, f"Conf: {confidence:.2f}", (x1+5, y2+20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)
    return result_frame

def calculate_safety_score(violations):
    penalties = {
        "no_helmet": 25, "no_harness": 30, "unsafe_posture": 20,
        "unsafe_zone": 35, "improper_tool_use": 25
    }
    worker_violations = {}
    for v in violations:
        worker_id = v.get("worker_id", "Unknown")
        worker_violations.setdefault(worker_id, set()).add(v.get("violation", "Unknown"))
    total_penalty = sum(sum(penalties.get(v, 0) for v in worker_violations[wid]) for wid in worker_violations)
    return max(0, 100 - total_penalty)

async def generate_violation_pdf(violations, score, output_dir):
    try:
        pdf_filename = f"violations_{int(time.time())}.pdf"
        pdf_path = os.path.join(output_dir, pdf_filename)
        pdf_file = open(pdf_path, "wb")
        c = canvas.Canvas(pdf_file, pagesize=letter)
        c.setFont("Helvetica-Bold", 16)
        c.drawString(1 * inch, 10 * inch, "Worksite Safety Violation Report")
        c.setFont("Helvetica", 12)
        c.drawString(1 * inch, 9.5 * inch, f"Date: {time.strftime('%Y-%m-%d')}")
        c.setFont("Helvetica-Bold", 14)
        c.drawString(1 * inch, 8.7 * inch, f"Safety Compliance Score: {score}%")
        c.setFont("Helvetica", 10)
        y_position = 8.2 * inch
        worker_violations = {}
        for v in violations:
            worker_id = v.get("worker_id", "Unknown")
            worker_violations.setdefault(worker_id, []).append(v)
        for worker_id, worker_vios in worker_violations.items():
            c.drawString(1 * inch, y_position, f"Worker {worker_id}:")
            y_position -= 0.2 * inch
            for v in worker_vios:
                display_name = CONFIG["DISPLAY_NAMES"].get(v.get("violation", "Unknown"), "Unknown")
                c.drawString(1.2 * inch, y_position, f"  - {display_name} at {v.get('timestamp', 0.0):.2f}s (Conf: {v.get('confidence', 0.0):.2f})")
                y_position -= 0.2 * inch
                if y_position < 1 * inch:
                    c.showPage()
                    y_position = 10 * inch
        c.save()
        pdf_file.close()
        logger.info(f"PDF generated: {pdf_path}")
        return pdf_path
    except Exception as e:
        logger.error(f"Error generating PDF: {e}")
        return ""

def clean_output_directory(max_age_seconds=86400):
    output_dir = os.path.join("static", "output")
    if not os.path.exists(output_dir):
        return
    
    current_time = time.time()
    for filename in os.listdir(output_dir):
        file_path = os.path.join(output_dir, filename)
        if os.path.isfile(file_path):
            file_age = current_time - os.path.getmtime(file_path)
            if file_age > max_age_seconds:
                try:
                    os.remove(file_path)
                    logger.info(f"Removed old output file: {file_path}")
                except Exception as e:
                    logger.error(f"Failed to remove {file_path}: {e}")