ATOA_LDobj / app1.py
asrcoddeploy's picture
Rename app.py to app1.py
eab8672 verified
import gradio as gr
import cv2
import torch
import torch.nn as nn
import numpy as np
from torchvision import transforms
import os
import time
# --- 1. MODEL ARCHITECTURE ---
class LDobjModel(nn.Module):
def __init__(self):
super(LDobjModel, self).__init__()
self.enc1 = self.conv_block(3, 16); self.pool1 = nn.MaxPool2d(2)
self.enc2 = self.conv_block(16, 32); self.pool2 = nn.MaxPool2d(2)
self.bottleneck = self.conv_block(32, 64)
self.up1 = nn.ConvTranspose2d(64, 32, 2, 2)
self.dec1 = self.conv_block(64, 32)
self.up2 = nn.ConvTranspose2d(32, 16, 2, 2)
self.dec2 = self.conv_block(32, 16)
self.final = nn.Sequential(nn.Conv2d(16, 1, 1), nn.Sigmoid())
def conv_block(self, in_c, out_c):
return nn.Sequential(nn.Conv2d(in_c, out_c, 3, 1, 1), nn.ReLU(),
nn.Conv2d(out_c, out_c, 3, 1, 1), nn.ReLU())
def forward(self, x):
e1 = self.enc1(x); e2 = self.enc2(self.pool1(e1))
b = self.bottleneck(self.pool2(e2))
d1 = torch.cat((e2, self.up1(b)), dim=1); d1 = self.dec1(d1)
d2 = torch.cat((e1, self.up2(d1)), dim=1); d2 = self.dec2(d2)
return self.final(d2)
# --- 2. INITIALIZATION ---
device = torch.device('cpu')
model = LDobjModel().to(device)
if os.path.exists('LDobj_weights.pth'):
model.load_state_dict(torch.load('LDobj_weights.pth', map_location=device))
model.eval()
transform = transforms.Compose([
transforms.ToPILImage(),
transforms.Resize((288, 800)),
transforms.ToTensor()
])
# --- 3. ADVANCED PROCESSING LOGIC (With Progress & Analytics) ---
def analyze_video(input_video_path, sensitivity, progress=gr.Progress()):
if not input_video_path:
return None, "⚠️ Please upload a video first."
start_time = time.time()
cap = cv2.VideoCapture(input_video_path)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
raw_output = "temp_raw.mp4"
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(raw_output, fourcc, fps, (width, height))
morph_kernel = np.ones((5, 5), np.uint8)
drift_threshold = width * (sensitivity / 100.0)
frame_count = 0
alerts_triggered = 0
while cap.isOpened():
ret, frame = cap.read()
if not ret: break
frame_count += 1
if frame_count % 5 == 0:
progress(frame_count / total_frames, desc=f"Processing AI Vision: Frame {frame_count}/{total_frames}")
# AI Prediction
input_img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
img_tensor = transform(input_img).unsqueeze(0).to(device)
with torch.no_grad():
pred = model(img_tensor).squeeze().numpy()
# Mask Cleaning
mask = (pred > 0.5).astype(np.uint8)
mask_full = cv2.resize(mask, (width, height), interpolation=cv2.INTER_NEAREST)
mask_full = cv2.morphologyEx(mask_full, cv2.MORPH_OPEN, morph_kernel)
# Departure Alert Logic
moments = cv2.moments(mask_full[int(height*0.75):, :])
if moments["m00"] > 0 and abs(int(moments["m10"] / moments["m00"]) - width // 2) > drift_threshold:
alerts_triggered += 1
overlay = frame.copy()
overlay[mask_full > 0] = (0, 0, 255)
frame = cv2.addWeighted(frame, 0.7, overlay, 0.3, 0)
# Draw sleek UI elements
cv2.rectangle(frame, (0, 0), (width, 120), (0, 0, 0), -1) # Black header bar
cv2.putText(frame, "LANE DEPARTURE DETECTED", (30, 80), cv2.FONT_HERSHEY_DUPLEX, 1.5, (0, 0, 255), 3)
out.write(frame)
cap.release()
out.release()
progress(0.95, desc="Optimizing Video for Web...")
web_output = "ldobj_final.mp4"
os.system(f"ffmpeg -y -i {raw_output} -c:v libx264 -preset fast -pix_fmt yuv420p -movflags +faststart {web_output}")
process_time = time.time() - start_time
avg_fps = frame_count / process_time if process_time > 0 else 0
telemetry_report = (
f"✅ Analysis Complete\n"
f"------------------------\n"
f"⏱️ Processing Time: {process_time:.1f} seconds\n"
f"🎞️ Total Frames: {frame_count}\n"
f"🚀 AI Speed: {avg_fps:.1f} FPS\n"
f"🚨 Alerts Triggered: {alerts_triggered} frames"
)
return web_output, telemetry_report
# --- 4. ULTIMATE FRONTEND DESIGN ---
custom_css = """
#video-in, #video-out { min-height: 450px; border-radius: 10px; border: 1px solid #333; }
.gradio-container { max-width: 1200px !important; margin: auto; }
footer { visibility: hidden; }
.glow-title { color: #ff4a4a; text-shadow: 0px 0px 15px rgba(255, 74, 74, 0.5); text-align: center; margin-bottom: 5px; }
.sub-title { text-align: center; color: #888; margin-top: 0px; margin-bottom: 30px; }
"""
# IMPORTANT: No theme/css parameters inside gr.Blocks() for Gradio 6.0!
with gr.Blocks() as app:
gr.HTML("<h1 class='glow-title'>🛡️ LDobj ADAS Command Center</h1>")
gr.HTML("<h3 class='sub-title'>Advanced Driver Assistance System • Neural Lane Tracking</h3>")
with gr.Group():
with gr.Row():
with gr.Column(scale=4):
gr.Markdown("### 1. Input Source")
video_in = gr.Video(label="Dashcam Feed", elem_id="video-in")
gr.Markdown("### 2. AI Parameters")
sensitivity_slider = gr.Slider(
minimum=5, maximum=25, value=10, step=1,
label="Drift Sensitivity Threshold (%)",
info="Lower % = Stricter alerts. Higher % = Allows more drift before alerting."
)
run_btn = gr.Button("INITIALIZE SCAN", variant="primary", size="lg")
with gr.Column(scale=5):
gr.Markdown("### Live Output Feed")
video_out = gr.Video(label="LDobj Processed Feed", interactive=False, autoplay=True, elem_id="video-out")
gr.Markdown("### System Telemetry")
telemetry_out = gr.Textbox(label="Analytics Console", lines=6, interactive=False)
run_btn.click(
fn=analyze_video,
inputs=[video_in, sensitivity_slider],
outputs=[video_out, telemetry_out]
)
if __name__ == "__main__":
# IMPORTANT: Theme and CSS MUST go inside the launch method!
# Using 'Glass' theme which is natively supported and looks fantastic in Dark Mode.
app.launch(
theme=gr.themes.Glass(primary_hue="red"),
css=custom_css,
footer_links=[] # Hides the Gradio footer cleanly
)