MrSimple01's picture
Update app.py (#1)
1f54bd5
import gradio as gr
import cv2
import numpy as np
import tempfile
import os
import time
import logging
import sys
import io
from datetime import datetime
from ultralytics import YOLO
LOG_FORMAT = "%(asctime)s | %(levelname)-8s | %(name)-20s | %(message)s"
DATE_FORMAT = "%H:%M:%S"
logging.basicConfig(
level=logging.DEBUG,
format=LOG_FORMAT,
datefmt=DATE_FORMAT,
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler("people_counter.log", encoding="utf-8"),
]
)
log_main = logging.getLogger("PeopleCounter.Main")
log_model = logging.getLogger("PeopleCounter.Model")
log_tracker = logging.getLogger("PeopleCounter.Tracker")
log_video = logging.getLogger("PeopleCounter.Video")
log_ui = logging.getLogger("PeopleCounter.UI")
# Ultralytics verbose chiqishini bostiramiz
logging.getLogger("ultralytics").setLevel(logging.WARNING)
log_main.info("=" * 60)
log_main.info(" People Counter β€” Ishga tushmoqda")
log_main.info(f" Sana/Vaqt: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
log_main.info("=" * 60)
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# MODEL YUKLASH β€” YOLOv11n
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
MODEL_NAME = "yolo11n.pt" # YOLOv11 nano
PERSON_CLASS = 0 # COCO dataset: 0 = person
log_model.info(f"Model yuklanmoqda: {MODEL_NAME}")
log_model.info("YOLOv11n β€” A2-Attention mexanizmi, real-time surveillance grade.")
log_model.info("COCO 80-klass, person = class index 0")
_t0 = time.time()
try:
model = YOLO(MODEL_NAME)
load_time = time.time() - _t0
log_model.info(f"Model muvaffaqiyatli yuklandi ({load_time:.2f}s)")
log_model.debug(f" Model fayl : {MODEL_NAME}")
log_model.debug(f" Task : {model.task}")
except Exception as exc:
log_model.error(f"Model yuklanmadi: {exc}")
log_model.warning("Fallback: yolov8n.pt ga o'tilmoqda...")
model = YOLO("yolov8n.pt")
MODEL_NAME = "yolov8n.pt"
log_model.info("Fallback model yuklandi: yolov8n.pt")
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# IoU TRACKER
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
class IoUTracker:
"""
Sodda IoU-asosidagi odam tracker.
Har bir frame'da:
1. Detection <-> Track IoU solishtirish.
2. Mos kelgan detection β€” eski track_id saqlanadi.
3. Mos kelmagan β€” yangi track_id beriladi.
4. max_lost frame ko'rinmasa β€” track o'chiriladi.
"""
def __init__(self, iou_threshold: float = 0.3, max_lost: int = 30):
self.tracks = {} # {tid: {'bbox': [...], 'lost': int}}
self.next_id = 1
self.unique_ids = set()
self.iou_thr = iou_threshold
self.max_lost = max_lost
self.stat_match = 0
self.stat_new = 0
self.stat_del = 0
log_tracker.info(
f"IoUTracker yaratildi | iou_thr={iou_threshold} | max_lost={max_lost}"
)
@staticmethod
def _iou(a, b) -> float:
ax1, ay1, ax2, ay2 = a
bx1, by1, bx2, by2 = b
ix1 = max(ax1, bx1); iy1 = max(ay1, by1)
ix2 = min(ax2, bx2); iy2 = min(ay2, by2)
inter = max(0, ix2 - ix1) * max(0, iy2 - iy1)
if inter == 0:
return 0.0
union = (ax2-ax1)*(ay2-ay1) + (bx2-bx1)*(by2-by1) - inter
return inter / union if union > 0 else 0.0
def update(self, detections: list, frame_idx: int = 0) -> dict:
matched_tids = set()
matched_didxs = set()
# Step 1 β€” Match existing tracks
for det_idx, det_bbox in enumerate(detections):
best_iou, best_tid = 0.0, None
for tid, tdata in self.tracks.items():
if tid in matched_tids:
continue
iou = self._iou(det_bbox, tdata['bbox'])
if iou > best_iou:
best_iou, best_tid = iou, tid
if best_iou >= self.iou_thr and best_tid is not None:
self.tracks[best_tid]['bbox'] = det_bbox
self.tracks[best_tid]['lost'] = 0
matched_tids.add(best_tid)
matched_didxs.add(det_idx)
self.stat_match += 1
log_tracker.debug(
f"F{frame_idx:04d} | Track #{best_tid:02d} MATCH iou={best_iou:.3f}"
)
# Step 2 β€” New detections -> new tracks
for det_idx, det_bbox in enumerate(detections):
if det_idx not in matched_didxs:
tid = self.next_id
self.next_id += 1
self.tracks[tid] = {'bbox': det_bbox, 'lost': 0}
self.unique_ids.add(tid)
self.stat_new += 1
log_tracker.info(
f"F{frame_idx:04d} | Track #{tid:02d} NEW "
f"bbox={det_bbox} | Unique jami: {len(self.unique_ids)}"
)
# Step 3 β€” Increment lost counter
for tid in list(self.tracks.keys()):
if tid not in matched_tids:
self.tracks[tid]['lost'] += 1
# Step 4 β€” Prune dead tracks
before = len(self.tracks)
self.tracks = {t: v for t, v in self.tracks.items()
if v['lost'] < self.max_lost}
deleted = before - len(self.tracks)
if deleted:
self.stat_del += deleted
log_tracker.debug(
f"F{frame_idx:04d} | {deleted} track silindi (max_lost={self.max_lost})"
)
active = {t: v['bbox'] for t, v in self.tracks.items() if v['lost'] == 0}
if frame_idx % 25 == 0:
log_tracker.info(
f"F{frame_idx:04d} | Aktiv={len(active):2d} "
f"Unique={len(self.unique_ids):2d} "
f"Tracks={len(self.tracks):2d}"
)
return active
def summary(self):
return {
"unique_people" : len(self.unique_ids),
"stat_match" : self.stat_match,
"stat_new" : self.stat_new,
"stat_del" : self.stat_del,
}
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# ASOSIY HISOBLASH FUNKSIYASI
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
COLORS = [
(255, 80, 80), (80, 200, 120), (80, 160, 255),
(255,200, 50), (200, 80, 255), (50, 220, 220),
(255,140, 0), ( 0, 200, 200), (180,255, 80),
(255, 80, 180),
]
def count_people(
video_path: str,
conf_threshold: float = 0.4,
progress=gr.Progress(),
stream_handler=None,
) -> tuple:
session = datetime.now().strftime("%H%M%S")
log_main.info("─" * 55)
log_main.info(f"[{session}] YANGI SESSION boshlandi")
log_main.info(f"[{session}] Conf threshold : {conf_threshold}")
log_main.info(f"[{session}] Model : {MODEL_NAME}")
if video_path is None:
log_main.warning("Video yuklanmadi.")
return None, "Video yuklanmadi."
# ── Video ochish
log_video.info(f"Video ochilmoqda ...")
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
log_video.error("Video fayl ochilmadi!")
return None, "Video ochilmadi."
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
dur = total_frames / fps
log_video.info(f" Kadrlar : {total_frames}")
log_video.info(f" FPS : {fps:.1f}")
log_video.info(f" Hajm : {width}x{height} px")
log_video.info(f" Davomiylik: {dur:.1f}s")
# ── Output video
out_path = tempfile.mktemp(suffix="_result.mp4")
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
writer = cv2.VideoWriter(out_path, fourcc, fps, (width, height))
log_video.info(f"Output yozuvchi ochildi -> {os.path.basename(out_path)}")
# ── Tracker
max_lost = int(fps * 1.5)
tracker = IoUTracker(iou_threshold=0.3, max_lost=max_lost)
frame_idx = 0
total_dets = 0
t_start = time.time()
log_main.info("Frame loop boshlandi ...")
log_main.info(f" Har {25} ta kadrda tracker holati ko'rsatiladi.")
while True:
ret, frame = cap.read()
if not ret:
break
# ── YOLO inference
t_inf = time.time()
results = model(frame, classes=[PERSON_CLASS],
conf=conf_threshold, verbose=False)[0]
inf_ms = (time.time() - t_inf) * 1000
detections = []
for box in results.boxes:
x1, y1, x2, y2 = map(int, box.xyxy[0])
conf_val = float(box.conf[0])
detections.append([x1, y1, x2, y2])
log_model.debug(
f"F{frame_idx:04d} | bbox=[{x1},{y1},{x2},{y2}] "
f"conf={conf_val:.3f}"
)
total_dets += len(detections)
if frame_idx % 25 == 0:
log_model.info(
f"F{frame_idx:04d} | det={len(detections):2d} | "
f"inf={inf_ms:5.1f}ms"
)
# ── Tracking
active = tracker.update(detections, frame_idx)
# ── Frame annotatsiyasi
for tid, (x1, y1, x2, y2) in active.items():
color = COLORS[tid % len(COLORS)]
cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
lbl = f"#{tid}"
(tw, th), _ = cv2.getTextSize(lbl, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)
cv2.rectangle(frame, (x1, y1-th-8), (x1+tw+6, y1), color, -1)
cv2.putText(frame, lbl, (x1+3, y1-4),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255,255,255), 2)
# ── Overlay panel (yuqori chap)
total_unique = len(tracker.unique_ids)
currently = len(active)
cv2.rectangle(frame, (8, 8), (330, 80), (12,12,18), -1)
cv2.rectangle(frame, (8, 8), (330, 80), (50,230,120), 1)
cv2.putText(frame, f"Jami: {total_unique} ta odam",
(14, 37), cv2.FONT_HERSHEY_SIMPLEX, 0.82, (50,230,120), 2)
cv2.putText(frame, f"Hozir: {currently} Frame: {frame_idx}",
(14, 68), cv2.FONT_HERSHEY_SIMPLEX, 0.52, (160,160,160), 1)
# ── Model tegi (quyi o'ng)
cv2.putText(frame, f"{MODEL_NAME} + IoUTracker",
(width-230, height-10),
cv2.FONT_HERSHEY_SIMPLEX, 0.42, (80,80,80), 1)
writer.write(frame)
frame_idx += 1
if total_frames > 0:
progress(
frame_idx / total_frames,
desc=f"Frame {frame_idx}/{total_frames} | Unique: {total_unique} odam"
)
cap.release()
writer.release()
elapsed = time.time() - t_start
avg_fps_p = frame_idx / elapsed if elapsed > 0 else 0
stats = tracker.summary()
log_main.info("─" * 55)
log_main.info(f"[{session}] YAKUNIY STATISTIKA")
log_main.info(f" Jami kadrlar : {frame_idx}")
log_main.info(f" Jami detectionlar: {total_dets}")
log_main.info(f" Unique odamlar : {stats['unique_people']}")
log_main.info(f" Track match : {stats['stat_match']}")
log_main.info(f" Yangi track : {stats['stat_new']}")
log_main.info(f" O'chirilgan track: {stats['stat_del']}")
log_main.info(f" Ishlash vaqti : {elapsed:.2f}s")
log_main.info(f" O'rtacha tezlik : {avg_fps_p:.1f} fps")
log_main.info("─" * 55)
n = stats['unique_people']
if n == 0:
result = "Odam yo'q"
elif n == 1:
result = "1 ta odam"
else:
result = f"{n} ta odam"
log_main.info(f"[{session}] NATIJA: {result}")
return out_path, result
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# GRADIO UI
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
log_ui.info("Gradio interfeysi qurilmoqda ...")
css = """
@import url('https://fonts.googleapis.com/css2?family=JetBrains+Mono:wght@400;700&family=Space+Grotesk:wght@400;600;700&display=swap');
body { font-family: 'Space Grotesk', sans-serif; }
#ttl h1 { font-family:'JetBrains Mono',monospace; color:#50e37c; text-align:center; }
#ttl p { text-align:center; color:#888; font-size:.9rem; }
#rbox {
font-family:'JetBrains Mono',monospace; font-size:2rem; font-weight:700;
text-align:center; padding:1.4rem; background:#0d1a12; color:#50e37c;
border-radius:10px; border:1px solid #50e37c55; margin-top:8px;
}
#logbox textarea {
font-family:'JetBrains Mono',monospace !important;
font-size:.7rem !important;
background:#080d08 !important;
color:#7ddb8a !important;
border:1px solid #1a2e1a !important;
}
"""
with gr.Blocks(css=css, title="People Counter β€” YOLOv11n") as demo:
gr.Markdown(
"# πŸ‘οΈ Videodagi Odamlar Sonini Hisoblash\n"
"**YOLOv11n** (AΒ²-Attention Modules) + **IoU Tracker** β€” surveillance grade",
elem_id="ttl"
)
with gr.Row():
with gr.Column(scale=1):
video_in = gr.Video(label="πŸ“Ή Video yuklang", sources=["upload"])
conf_sl = gr.Slider(0.2, 0.85, value=0.4, step=0.05,
label="Conf threshold")
run_btn = gr.Button("β–Ά Hisoblashni boshlash",
variant="primary", size="lg")
with gr.Column(scale=1):
video_out = gr.Video(label="πŸ“Š Annotated video")
result_htm = gr.HTML("<div id='rbox'>β€” natija β€”</div>")
def run(video, conf):
log_ui.info(f"UI: Run bosildi | conf={conf}")
# In-memory stream handler to capture logs for UI
buf = io.StringIO()
handler = logging.StreamHandler(buf)
handler.setLevel(logging.DEBUG)
handler.setFormatter(
logging.Formatter(
"%(asctime)s | %(levelname)-8s | %(name)-18s | %(message)s",
datefmt="%H:%M:%S"
)
)
root_pc = logging.getLogger("PeopleCounter")
root_pc.addHandler(handler)
try:
out_vid, text = count_people(video, conf)
finally:
root_pc.removeHandler(handler)
html = f"<div id='rbox'>{'βœ… ' if 'ta odam' in text or '1 ta' in text else '🚢 '}{text}</div>"
return out_vid, html, buf.getvalue()
run_btn.click(
fn=run,
inputs=[video_in, conf_sl],
outputs=[video_out, result_htm]
)
log_ui.info("Gradio tayyor.")
if __name__ == "__main__":
log_main.info("demo.launch() ...")
demo.launch()