import gradio as gr import cv2 import numpy as np import tempfile import os import time import logging import sys import io from datetime import datetime from ultralytics import YOLO LOG_FORMAT = "%(asctime)s | %(levelname)-8s | %(name)-20s | %(message)s" DATE_FORMAT = "%H:%M:%S" logging.basicConfig( level=logging.DEBUG, format=LOG_FORMAT, datefmt=DATE_FORMAT, handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler("people_counter.log", encoding="utf-8"), ] ) log_main = logging.getLogger("PeopleCounter.Main") log_model = logging.getLogger("PeopleCounter.Model") log_tracker = logging.getLogger("PeopleCounter.Tracker") log_video = logging.getLogger("PeopleCounter.Video") log_ui = logging.getLogger("PeopleCounter.UI") # Ultralytics verbose chiqishini bostiramiz logging.getLogger("ultralytics").setLevel(logging.WARNING) log_main.info("=" * 60) log_main.info(" People Counter — Ishga tushmoqda") log_main.info(f" Sana/Vaqt: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") log_main.info("=" * 60) # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ # MODEL YUKLASH — YOLOv11n # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ MODEL_NAME = "yolo11n.pt" # YOLOv11 nano PERSON_CLASS = 0 # COCO dataset: 0 = person log_model.info(f"Model yuklanmoqda: {MODEL_NAME}") log_model.info("YOLOv11n — A2-Attention mexanizmi, real-time surveillance grade.") log_model.info("COCO 80-klass, person = class index 0") _t0 = time.time() try: model = YOLO(MODEL_NAME) load_time = time.time() - _t0 log_model.info(f"Model muvaffaqiyatli yuklandi ({load_time:.2f}s)") log_model.debug(f" Model fayl : {MODEL_NAME}") log_model.debug(f" Task : {model.task}") except Exception as exc: log_model.error(f"Model yuklanmadi: {exc}") log_model.warning("Fallback: yolov8n.pt ga o'tilmoqda...") model = YOLO("yolov8n.pt") MODEL_NAME = "yolov8n.pt" log_model.info("Fallback model yuklandi: yolov8n.pt") # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ # IoU TRACKER # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ class IoUTracker: """ Sodda IoU-asosidagi odam tracker. Har bir frame'da: 1. Detection <-> Track IoU solishtirish. 2. Mos kelgan detection — eski track_id saqlanadi. 3. Mos kelmagan — yangi track_id beriladi. 4. max_lost frame ko'rinmasa — track o'chiriladi. """ def __init__(self, iou_threshold: float = 0.3, max_lost: int = 30): self.tracks = {} # {tid: {'bbox': [...], 'lost': int}} self.next_id = 1 self.unique_ids = set() self.iou_thr = iou_threshold self.max_lost = max_lost self.stat_match = 0 self.stat_new = 0 self.stat_del = 0 log_tracker.info( f"IoUTracker yaratildi | iou_thr={iou_threshold} | max_lost={max_lost}" ) @staticmethod def _iou(a, b) -> float: ax1, ay1, ax2, ay2 = a bx1, by1, bx2, by2 = b ix1 = max(ax1, bx1); iy1 = max(ay1, by1) ix2 = min(ax2, bx2); iy2 = min(ay2, by2) inter = max(0, ix2 - ix1) * max(0, iy2 - iy1) if inter == 0: return 0.0 union = (ax2-ax1)*(ay2-ay1) + (bx2-bx1)*(by2-by1) - inter return inter / union if union > 0 else 0.0 def update(self, detections: list, frame_idx: int = 0) -> dict: matched_tids = set() matched_didxs = set() # Step 1 — Match existing tracks for det_idx, det_bbox in enumerate(detections): best_iou, best_tid = 0.0, None for tid, tdata in self.tracks.items(): if tid in matched_tids: continue iou = self._iou(det_bbox, tdata['bbox']) if iou > best_iou: best_iou, best_tid = iou, tid if best_iou >= self.iou_thr and best_tid is not None: self.tracks[best_tid]['bbox'] = det_bbox self.tracks[best_tid]['lost'] = 0 matched_tids.add(best_tid) matched_didxs.add(det_idx) self.stat_match += 1 log_tracker.debug( f"F{frame_idx:04d} | Track #{best_tid:02d} MATCH iou={best_iou:.3f}" ) # Step 2 — New detections -> new tracks for det_idx, det_bbox in enumerate(detections): if det_idx not in matched_didxs: tid = self.next_id self.next_id += 1 self.tracks[tid] = {'bbox': det_bbox, 'lost': 0} self.unique_ids.add(tid) self.stat_new += 1 log_tracker.info( f"F{frame_idx:04d} | Track #{tid:02d} NEW " f"bbox={det_bbox} | Unique jami: {len(self.unique_ids)}" ) # Step 3 — Increment lost counter for tid in list(self.tracks.keys()): if tid not in matched_tids: self.tracks[tid]['lost'] += 1 # Step 4 — Prune dead tracks before = len(self.tracks) self.tracks = {t: v for t, v in self.tracks.items() if v['lost'] < self.max_lost} deleted = before - len(self.tracks) if deleted: self.stat_del += deleted log_tracker.debug( f"F{frame_idx:04d} | {deleted} track silindi (max_lost={self.max_lost})" ) active = {t: v['bbox'] for t, v in self.tracks.items() if v['lost'] == 0} if frame_idx % 25 == 0: log_tracker.info( f"F{frame_idx:04d} | Aktiv={len(active):2d} " f"Unique={len(self.unique_ids):2d} " f"Tracks={len(self.tracks):2d}" ) return active def summary(self): return { "unique_people" : len(self.unique_ids), "stat_match" : self.stat_match, "stat_new" : self.stat_new, "stat_del" : self.stat_del, } # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ # ASOSIY HISOBLASH FUNKSIYASI # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ COLORS = [ (255, 80, 80), (80, 200, 120), (80, 160, 255), (255,200, 50), (200, 80, 255), (50, 220, 220), (255,140, 0), ( 0, 200, 200), (180,255, 80), (255, 80, 180), ] def count_people( video_path: str, conf_threshold: float = 0.4, progress=gr.Progress(), stream_handler=None, ) -> tuple: session = datetime.now().strftime("%H%M%S") log_main.info("─" * 55) log_main.info(f"[{session}] YANGI SESSION boshlandi") log_main.info(f"[{session}] Conf threshold : {conf_threshold}") log_main.info(f"[{session}] Model : {MODEL_NAME}") if video_path is None: log_main.warning("Video yuklanmadi.") return None, "Video yuklanmadi." # ── Video ochish log_video.info(f"Video ochilmoqda ...") cap = cv2.VideoCapture(video_path) if not cap.isOpened(): log_video.error("Video fayl ochilmadi!") return None, "Video ochilmadi." total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) fps = cap.get(cv2.CAP_PROP_FPS) or 25.0 width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) dur = total_frames / fps log_video.info(f" Kadrlar : {total_frames}") log_video.info(f" FPS : {fps:.1f}") log_video.info(f" Hajm : {width}x{height} px") log_video.info(f" Davomiylik: {dur:.1f}s") # ── Output video out_path = tempfile.mktemp(suffix="_result.mp4") fourcc = cv2.VideoWriter_fourcc(*"mp4v") writer = cv2.VideoWriter(out_path, fourcc, fps, (width, height)) log_video.info(f"Output yozuvchi ochildi -> {os.path.basename(out_path)}") # ── Tracker max_lost = int(fps * 1.5) tracker = IoUTracker(iou_threshold=0.3, max_lost=max_lost) frame_idx = 0 total_dets = 0 t_start = time.time() log_main.info("Frame loop boshlandi ...") log_main.info(f" Har {25} ta kadrda tracker holati ko'rsatiladi.") while True: ret, frame = cap.read() if not ret: break # ── YOLO inference t_inf = time.time() results = model(frame, classes=[PERSON_CLASS], conf=conf_threshold, verbose=False)[0] inf_ms = (time.time() - t_inf) * 1000 detections = [] for box in results.boxes: x1, y1, x2, y2 = map(int, box.xyxy[0]) conf_val = float(box.conf[0]) detections.append([x1, y1, x2, y2]) log_model.debug( f"F{frame_idx:04d} | bbox=[{x1},{y1},{x2},{y2}] " f"conf={conf_val:.3f}" ) total_dets += len(detections) if frame_idx % 25 == 0: log_model.info( f"F{frame_idx:04d} | det={len(detections):2d} | " f"inf={inf_ms:5.1f}ms" ) # ── Tracking active = tracker.update(detections, frame_idx) # ── Frame annotatsiyasi for tid, (x1, y1, x2, y2) in active.items(): color = COLORS[tid % len(COLORS)] cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2) lbl = f"#{tid}" (tw, th), _ = cv2.getTextSize(lbl, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2) cv2.rectangle(frame, (x1, y1-th-8), (x1+tw+6, y1), color, -1) cv2.putText(frame, lbl, (x1+3, y1-4), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255,255,255), 2) # ── Overlay panel (yuqori chap) total_unique = len(tracker.unique_ids) currently = len(active) cv2.rectangle(frame, (8, 8), (330, 80), (12,12,18), -1) cv2.rectangle(frame, (8, 8), (330, 80), (50,230,120), 1) cv2.putText(frame, f"Jami: {total_unique} ta odam", (14, 37), cv2.FONT_HERSHEY_SIMPLEX, 0.82, (50,230,120), 2) cv2.putText(frame, f"Hozir: {currently} Frame: {frame_idx}", (14, 68), cv2.FONT_HERSHEY_SIMPLEX, 0.52, (160,160,160), 1) # ── Model tegi (quyi o'ng) cv2.putText(frame, f"{MODEL_NAME} + IoUTracker", (width-230, height-10), cv2.FONT_HERSHEY_SIMPLEX, 0.42, (80,80,80), 1) writer.write(frame) frame_idx += 1 if total_frames > 0: progress( frame_idx / total_frames, desc=f"Frame {frame_idx}/{total_frames} | Unique: {total_unique} odam" ) cap.release() writer.release() elapsed = time.time() - t_start avg_fps_p = frame_idx / elapsed if elapsed > 0 else 0 stats = tracker.summary() log_main.info("─" * 55) log_main.info(f"[{session}] YAKUNIY STATISTIKA") log_main.info(f" Jami kadrlar : {frame_idx}") log_main.info(f" Jami detectionlar: {total_dets}") log_main.info(f" Unique odamlar : {stats['unique_people']}") log_main.info(f" Track match : {stats['stat_match']}") log_main.info(f" Yangi track : {stats['stat_new']}") log_main.info(f" O'chirilgan track: {stats['stat_del']}") log_main.info(f" Ishlash vaqti : {elapsed:.2f}s") log_main.info(f" O'rtacha tezlik : {avg_fps_p:.1f} fps") log_main.info("─" * 55) n = stats['unique_people'] if n == 0: result = "Odam yo'q" elif n == 1: result = "1 ta odam" else: result = f"{n} ta odam" log_main.info(f"[{session}] NATIJA: {result}") return out_path, result # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ # GRADIO UI # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ log_ui.info("Gradio interfeysi qurilmoqda ...") css = """ @import url('https://fonts.googleapis.com/css2?family=JetBrains+Mono:wght@400;700&family=Space+Grotesk:wght@400;600;700&display=swap'); body { font-family: 'Space Grotesk', sans-serif; } #ttl h1 { font-family:'JetBrains Mono',monospace; color:#50e37c; text-align:center; } #ttl p { text-align:center; color:#888; font-size:.9rem; } #rbox { font-family:'JetBrains Mono',monospace; font-size:2rem; font-weight:700; text-align:center; padding:1.4rem; background:#0d1a12; color:#50e37c; border-radius:10px; border:1px solid #50e37c55; margin-top:8px; } #logbox textarea { font-family:'JetBrains Mono',monospace !important; font-size:.7rem !important; background:#080d08 !important; color:#7ddb8a !important; border:1px solid #1a2e1a !important; } """ with gr.Blocks(css=css, title="People Counter — YOLOv11n") as demo: gr.Markdown( "# 👁️ Videodagi Odamlar Sonini Hisoblash\n" "**YOLOv11n** (A²-Attention Modules) + **IoU Tracker** — surveillance grade", elem_id="ttl" ) with gr.Row(): with gr.Column(scale=1): video_in = gr.Video(label="📹 Video yuklang", sources=["upload"]) conf_sl = gr.Slider(0.2, 0.85, value=0.4, step=0.05, label="Conf threshold") run_btn = gr.Button("▶ Hisoblashni boshlash", variant="primary", size="lg") with gr.Column(scale=1): video_out = gr.Video(label="📊 Annotated video") result_htm = gr.HTML("