Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import cv2 | |
| import numpy as np | |
| import tempfile | |
| import os | |
| import time | |
| import logging | |
| import sys | |
| import io | |
| from datetime import datetime | |
| from ultralytics import YOLO | |
| LOG_FORMAT = "%(asctime)s | %(levelname)-8s | %(name)-20s | %(message)s" | |
| DATE_FORMAT = "%H:%M:%S" | |
| logging.basicConfig( | |
| level=logging.DEBUG, | |
| format=LOG_FORMAT, | |
| datefmt=DATE_FORMAT, | |
| handlers=[ | |
| logging.StreamHandler(sys.stdout), | |
| logging.FileHandler("people_counter.log", encoding="utf-8"), | |
| ] | |
| ) | |
| log_main = logging.getLogger("PeopleCounter.Main") | |
| log_model = logging.getLogger("PeopleCounter.Model") | |
| log_tracker = logging.getLogger("PeopleCounter.Tracker") | |
| log_video = logging.getLogger("PeopleCounter.Video") | |
| log_ui = logging.getLogger("PeopleCounter.UI") | |
| # Ultralytics verbose chiqishini bostiramiz | |
| logging.getLogger("ultralytics").setLevel(logging.WARNING) | |
| log_main.info("=" * 60) | |
| log_main.info(" People Counter β Ishga tushmoqda") | |
| log_main.info(f" Sana/Vaqt: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") | |
| log_main.info("=" * 60) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # MODEL YUKLASH β YOLOv11n | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| MODEL_NAME = "yolo11n.pt" # YOLOv11 nano | |
| PERSON_CLASS = 0 # COCO dataset: 0 = person | |
| log_model.info(f"Model yuklanmoqda: {MODEL_NAME}") | |
| log_model.info("YOLOv11n β A2-Attention mexanizmi, real-time surveillance grade.") | |
| log_model.info("COCO 80-klass, person = class index 0") | |
| _t0 = time.time() | |
| try: | |
| model = YOLO(MODEL_NAME) | |
| load_time = time.time() - _t0 | |
| log_model.info(f"Model muvaffaqiyatli yuklandi ({load_time:.2f}s)") | |
| log_model.debug(f" Model fayl : {MODEL_NAME}") | |
| log_model.debug(f" Task : {model.task}") | |
| except Exception as exc: | |
| log_model.error(f"Model yuklanmadi: {exc}") | |
| log_model.warning("Fallback: yolov8n.pt ga o'tilmoqda...") | |
| model = YOLO("yolov8n.pt") | |
| MODEL_NAME = "yolov8n.pt" | |
| log_model.info("Fallback model yuklandi: yolov8n.pt") | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # IoU TRACKER | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class IoUTracker: | |
| """ | |
| Sodda IoU-asosidagi odam tracker. | |
| Har bir frame'da: | |
| 1. Detection <-> Track IoU solishtirish. | |
| 2. Mos kelgan detection β eski track_id saqlanadi. | |
| 3. Mos kelmagan β yangi track_id beriladi. | |
| 4. max_lost frame ko'rinmasa β track o'chiriladi. | |
| """ | |
| def __init__(self, iou_threshold: float = 0.3, max_lost: int = 30): | |
| self.tracks = {} # {tid: {'bbox': [...], 'lost': int}} | |
| self.next_id = 1 | |
| self.unique_ids = set() | |
| self.iou_thr = iou_threshold | |
| self.max_lost = max_lost | |
| self.stat_match = 0 | |
| self.stat_new = 0 | |
| self.stat_del = 0 | |
| log_tracker.info( | |
| f"IoUTracker yaratildi | iou_thr={iou_threshold} | max_lost={max_lost}" | |
| ) | |
| def _iou(a, b) -> float: | |
| ax1, ay1, ax2, ay2 = a | |
| bx1, by1, bx2, by2 = b | |
| ix1 = max(ax1, bx1); iy1 = max(ay1, by1) | |
| ix2 = min(ax2, bx2); iy2 = min(ay2, by2) | |
| inter = max(0, ix2 - ix1) * max(0, iy2 - iy1) | |
| if inter == 0: | |
| return 0.0 | |
| union = (ax2-ax1)*(ay2-ay1) + (bx2-bx1)*(by2-by1) - inter | |
| return inter / union if union > 0 else 0.0 | |
| def update(self, detections: list, frame_idx: int = 0) -> dict: | |
| matched_tids = set() | |
| matched_didxs = set() | |
| # Step 1 β Match existing tracks | |
| for det_idx, det_bbox in enumerate(detections): | |
| best_iou, best_tid = 0.0, None | |
| for tid, tdata in self.tracks.items(): | |
| if tid in matched_tids: | |
| continue | |
| iou = self._iou(det_bbox, tdata['bbox']) | |
| if iou > best_iou: | |
| best_iou, best_tid = iou, tid | |
| if best_iou >= self.iou_thr and best_tid is not None: | |
| self.tracks[best_tid]['bbox'] = det_bbox | |
| self.tracks[best_tid]['lost'] = 0 | |
| matched_tids.add(best_tid) | |
| matched_didxs.add(det_idx) | |
| self.stat_match += 1 | |
| log_tracker.debug( | |
| f"F{frame_idx:04d} | Track #{best_tid:02d} MATCH iou={best_iou:.3f}" | |
| ) | |
| # Step 2 β New detections -> new tracks | |
| for det_idx, det_bbox in enumerate(detections): | |
| if det_idx not in matched_didxs: | |
| tid = self.next_id | |
| self.next_id += 1 | |
| self.tracks[tid] = {'bbox': det_bbox, 'lost': 0} | |
| self.unique_ids.add(tid) | |
| self.stat_new += 1 | |
| log_tracker.info( | |
| f"F{frame_idx:04d} | Track #{tid:02d} NEW " | |
| f"bbox={det_bbox} | Unique jami: {len(self.unique_ids)}" | |
| ) | |
| # Step 3 β Increment lost counter | |
| for tid in list(self.tracks.keys()): | |
| if tid not in matched_tids: | |
| self.tracks[tid]['lost'] += 1 | |
| # Step 4 β Prune dead tracks | |
| before = len(self.tracks) | |
| self.tracks = {t: v for t, v in self.tracks.items() | |
| if v['lost'] < self.max_lost} | |
| deleted = before - len(self.tracks) | |
| if deleted: | |
| self.stat_del += deleted | |
| log_tracker.debug( | |
| f"F{frame_idx:04d} | {deleted} track silindi (max_lost={self.max_lost})" | |
| ) | |
| active = {t: v['bbox'] for t, v in self.tracks.items() if v['lost'] == 0} | |
| if frame_idx % 25 == 0: | |
| log_tracker.info( | |
| f"F{frame_idx:04d} | Aktiv={len(active):2d} " | |
| f"Unique={len(self.unique_ids):2d} " | |
| f"Tracks={len(self.tracks):2d}" | |
| ) | |
| return active | |
| def summary(self): | |
| return { | |
| "unique_people" : len(self.unique_ids), | |
| "stat_match" : self.stat_match, | |
| "stat_new" : self.stat_new, | |
| "stat_del" : self.stat_del, | |
| } | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # ASOSIY HISOBLASH FUNKSIYASI | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| COLORS = [ | |
| (255, 80, 80), (80, 200, 120), (80, 160, 255), | |
| (255,200, 50), (200, 80, 255), (50, 220, 220), | |
| (255,140, 0), ( 0, 200, 200), (180,255, 80), | |
| (255, 80, 180), | |
| ] | |
| def count_people( | |
| video_path: str, | |
| conf_threshold: float = 0.4, | |
| progress=gr.Progress(), | |
| stream_handler=None, | |
| ) -> tuple: | |
| session = datetime.now().strftime("%H%M%S") | |
| log_main.info("β" * 55) | |
| log_main.info(f"[{session}] YANGI SESSION boshlandi") | |
| log_main.info(f"[{session}] Conf threshold : {conf_threshold}") | |
| log_main.info(f"[{session}] Model : {MODEL_NAME}") | |
| if video_path is None: | |
| log_main.warning("Video yuklanmadi.") | |
| return None, "Video yuklanmadi." | |
| # ββ Video ochish | |
| log_video.info(f"Video ochilmoqda ...") | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| log_video.error("Video fayl ochilmadi!") | |
| return None, "Video ochilmadi." | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| fps = cap.get(cv2.CAP_PROP_FPS) or 25.0 | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| dur = total_frames / fps | |
| log_video.info(f" Kadrlar : {total_frames}") | |
| log_video.info(f" FPS : {fps:.1f}") | |
| log_video.info(f" Hajm : {width}x{height} px") | |
| log_video.info(f" Davomiylik: {dur:.1f}s") | |
| # ββ Output video | |
| out_path = tempfile.mktemp(suffix="_result.mp4") | |
| fourcc = cv2.VideoWriter_fourcc(*"mp4v") | |
| writer = cv2.VideoWriter(out_path, fourcc, fps, (width, height)) | |
| log_video.info(f"Output yozuvchi ochildi -> {os.path.basename(out_path)}") | |
| # ββ Tracker | |
| max_lost = int(fps * 1.5) | |
| tracker = IoUTracker(iou_threshold=0.3, max_lost=max_lost) | |
| frame_idx = 0 | |
| total_dets = 0 | |
| t_start = time.time() | |
| log_main.info("Frame loop boshlandi ...") | |
| log_main.info(f" Har {25} ta kadrda tracker holati ko'rsatiladi.") | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| # ββ YOLO inference | |
| t_inf = time.time() | |
| results = model(frame, classes=[PERSON_CLASS], | |
| conf=conf_threshold, verbose=False)[0] | |
| inf_ms = (time.time() - t_inf) * 1000 | |
| detections = [] | |
| for box in results.boxes: | |
| x1, y1, x2, y2 = map(int, box.xyxy[0]) | |
| conf_val = float(box.conf[0]) | |
| detections.append([x1, y1, x2, y2]) | |
| log_model.debug( | |
| f"F{frame_idx:04d} | bbox=[{x1},{y1},{x2},{y2}] " | |
| f"conf={conf_val:.3f}" | |
| ) | |
| total_dets += len(detections) | |
| if frame_idx % 25 == 0: | |
| log_model.info( | |
| f"F{frame_idx:04d} | det={len(detections):2d} | " | |
| f"inf={inf_ms:5.1f}ms" | |
| ) | |
| # ββ Tracking | |
| active = tracker.update(detections, frame_idx) | |
| # ββ Frame annotatsiyasi | |
| for tid, (x1, y1, x2, y2) in active.items(): | |
| color = COLORS[tid % len(COLORS)] | |
| cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2) | |
| lbl = f"#{tid}" | |
| (tw, th), _ = cv2.getTextSize(lbl, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2) | |
| cv2.rectangle(frame, (x1, y1-th-8), (x1+tw+6, y1), color, -1) | |
| cv2.putText(frame, lbl, (x1+3, y1-4), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255,255,255), 2) | |
| # ββ Overlay panel (yuqori chap) | |
| total_unique = len(tracker.unique_ids) | |
| currently = len(active) | |
| cv2.rectangle(frame, (8, 8), (330, 80), (12,12,18), -1) | |
| cv2.rectangle(frame, (8, 8), (330, 80), (50,230,120), 1) | |
| cv2.putText(frame, f"Jami: {total_unique} ta odam", | |
| (14, 37), cv2.FONT_HERSHEY_SIMPLEX, 0.82, (50,230,120), 2) | |
| cv2.putText(frame, f"Hozir: {currently} Frame: {frame_idx}", | |
| (14, 68), cv2.FONT_HERSHEY_SIMPLEX, 0.52, (160,160,160), 1) | |
| # ββ Model tegi (quyi o'ng) | |
| cv2.putText(frame, f"{MODEL_NAME} + IoUTracker", | |
| (width-230, height-10), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.42, (80,80,80), 1) | |
| writer.write(frame) | |
| frame_idx += 1 | |
| if total_frames > 0: | |
| progress( | |
| frame_idx / total_frames, | |
| desc=f"Frame {frame_idx}/{total_frames} | Unique: {total_unique} odam" | |
| ) | |
| cap.release() | |
| writer.release() | |
| elapsed = time.time() - t_start | |
| avg_fps_p = frame_idx / elapsed if elapsed > 0 else 0 | |
| stats = tracker.summary() | |
| log_main.info("β" * 55) | |
| log_main.info(f"[{session}] YAKUNIY STATISTIKA") | |
| log_main.info(f" Jami kadrlar : {frame_idx}") | |
| log_main.info(f" Jami detectionlar: {total_dets}") | |
| log_main.info(f" Unique odamlar : {stats['unique_people']}") | |
| log_main.info(f" Track match : {stats['stat_match']}") | |
| log_main.info(f" Yangi track : {stats['stat_new']}") | |
| log_main.info(f" O'chirilgan track: {stats['stat_del']}") | |
| log_main.info(f" Ishlash vaqti : {elapsed:.2f}s") | |
| log_main.info(f" O'rtacha tezlik : {avg_fps_p:.1f} fps") | |
| log_main.info("β" * 55) | |
| n = stats['unique_people'] | |
| if n == 0: | |
| result = "Odam yo'q" | |
| elif n == 1: | |
| result = "1 ta odam" | |
| else: | |
| result = f"{n} ta odam" | |
| log_main.info(f"[{session}] NATIJA: {result}") | |
| return out_path, result | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # GRADIO UI | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| log_ui.info("Gradio interfeysi qurilmoqda ...") | |
| css = """ | |
| @import url('https://fonts.googleapis.com/css2?family=JetBrains+Mono:wght@400;700&family=Space+Grotesk:wght@400;600;700&display=swap'); | |
| body { font-family: 'Space Grotesk', sans-serif; } | |
| #ttl h1 { font-family:'JetBrains Mono',monospace; color:#50e37c; text-align:center; } | |
| #ttl p { text-align:center; color:#888; font-size:.9rem; } | |
| #rbox { | |
| font-family:'JetBrains Mono',monospace; font-size:2rem; font-weight:700; | |
| text-align:center; padding:1.4rem; background:#0d1a12; color:#50e37c; | |
| border-radius:10px; border:1px solid #50e37c55; margin-top:8px; | |
| } | |
| #logbox textarea { | |
| font-family:'JetBrains Mono',monospace !important; | |
| font-size:.7rem !important; | |
| background:#080d08 !important; | |
| color:#7ddb8a !important; | |
| border:1px solid #1a2e1a !important; | |
| } | |
| """ | |
| with gr.Blocks(css=css, title="People Counter β YOLOv11n") as demo: | |
| gr.Markdown( | |
| "# ποΈ Videodagi Odamlar Sonini Hisoblash\n" | |
| "**YOLOv11n** (AΒ²-Attention Modules) + **IoU Tracker** β surveillance grade", | |
| elem_id="ttl" | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| video_in = gr.Video(label="πΉ Video yuklang", sources=["upload"]) | |
| conf_sl = gr.Slider(0.2, 0.85, value=0.4, step=0.05, | |
| label="Conf threshold") | |
| run_btn = gr.Button("βΆ Hisoblashni boshlash", | |
| variant="primary", size="lg") | |
| with gr.Column(scale=1): | |
| video_out = gr.Video(label="π Annotated video") | |
| result_htm = gr.HTML("<div id='rbox'>β natija β</div>") | |
| def run(video, conf): | |
| log_ui.info(f"UI: Run bosildi | conf={conf}") | |
| # In-memory stream handler to capture logs for UI | |
| buf = io.StringIO() | |
| handler = logging.StreamHandler(buf) | |
| handler.setLevel(logging.DEBUG) | |
| handler.setFormatter( | |
| logging.Formatter( | |
| "%(asctime)s | %(levelname)-8s | %(name)-18s | %(message)s", | |
| datefmt="%H:%M:%S" | |
| ) | |
| ) | |
| root_pc = logging.getLogger("PeopleCounter") | |
| root_pc.addHandler(handler) | |
| try: | |
| out_vid, text = count_people(video, conf) | |
| finally: | |
| root_pc.removeHandler(handler) | |
| html = f"<div id='rbox'>{'β ' if 'ta odam' in text or '1 ta' in text else 'πΆ '}{text}</div>" | |
| return out_vid, html, buf.getvalue() | |
| run_btn.click( | |
| fn=run, | |
| inputs=[video_in, conf_sl], | |
| outputs=[video_out, result_htm] | |
| ) | |
| log_ui.info("Gradio tayyor.") | |
| if __name__ == "__main__": | |
| log_main.info("demo.launch() ...") | |
| demo.launch() |