import time import threading import random from collections import deque from dataclasses import dataclass from typing import List, Optional import numpy as np @dataclass class RandomBufferStats: """র‍্যান্ডম বাফারের পরিসংখ্যান""" buffer_size: int reads_per_second: float fills_per_second: float empty_reads: int average_read_time: float class HardwareTimerRNG: """ 6522 VIA টাইমার-ভিত্তিক র‍্যান্ডম নম্বর জেনারেটরের Python ইমুলেশন মূল বৈশিষ্ট্য: - প্রতি সেকেন্ডে ১,০০০,০০০ বার কাউন্ট ডাউন - অপ্রেডিক্টেবল সময়ে পড়া - ডিটারমিনিস্টিক হার্ডওয়্যার কিন্তু র‍্যান্ডম আচরণ """ def __init__(self, buffer_size: int = 1000, timer_frequency: int = 1_000_000): """ Args: buffer_size: র‍্যান্ডম বাফারের সাইজ timer_frequency: টাইমার ফ্রিকোয়েন্সি (Hz) """ self.buffer_size = buffer_size self.timer_frequency = timer_frequency self.buffer = deque(maxlen=buffer_size) self.timer_value = random.randint(0, timer_frequency) self.lock = threading.Lock() # পরিসংখ্যান self.total_reads = 0 self.total_fills = 0 self.empty_reads = 0 self.read_times = [] # টাইমার থ্রেড self.running = True self.timer_thread = threading.Thread(target=self._timer_loop, daemon=True) self.timer_thread.start() # প্রাথমিক বাফার ফিল self._fill_buffer() def _timer_loop(self): """ টাইমার লুপ - প্রতি সেকেন্ডে timer_frequency বার কাউন্ট ডাউন এবং র‍্যান্ডম সময়ে বাফার ফিল করে """ last_fill_time = time.time() while self.running: # টাইমার কাউন্ট ডাউন self.timer_value = (self.timer_value - 1) % self.timer_frequency # প্রতি ০.১ সেকেন্ডে বাফার ফিল করার চেষ্টা current_time = time.time() if current_time - last_fill_time > 0.1: self._fill_buffer() last_fill_time = current_time # টাইমার স্পিড সিমুলেশন (sleep ছাড়া খুব দ্রুত) time.sleep(1 / self.timer_frequency / 10) # ১০ গুণ ধীরে (সিমুলেশনের জন্য) def _fill_buffer(self): """বাফারে নতুন র‍্যান্ডম ভ্যালু যোগ করে""" with self.lock: while len(self.buffer) < self.buffer_size: # টাইমার ভ্যালু পড়ে self.buffer.append(self.timer_value) self.total_fills += 1 def next_random(self) -> int: """ পরবর্তী র‍্যান্ডম ভ্যালু রিটার্ন করে SetRandomNumber/NextRandomNumber ফাংশনের সমতুল্য """ start_time = time.time() with self.lock: if len(self.buffer) == 0: self.empty_reads += 1 self._fill_buffer() value = self.buffer.popleft() self.total_reads += 1 read_time = time.time() - start_time self.read_times.append(read_time) if len(self.read_times) > 1000: self.read_times.pop(0) return value def next_random_float(self, min_val: float = 0.0, max_val: float = 1.0) -> float: """ফ্লোট র‍্যান্ডম ভ্যালু (০-১)""" return min_val + (self.next_random() % 1000000) / 1000000 * (max_val - min_val) def next_random_range(self, min_val: int, max_val: int) -> int: """নির্দিষ্ট রেঞ্জের মধ্যে র‍্যান্ডম ভ্যালু""" return min_val + (self.next_random() % (max_val - min_val + 1)) def get_stats(self) -> RandomBufferStats: """বাফারের পরিসংখ্যান""" with self.lock: return RandomBufferStats( buffer_size=len(self.buffer), reads_per_second=self.total_reads / max(1, time.time()), fills_per_second=self.total_fills / max(1, time.time()), empty_reads=self.empty_reads, average_read_time=np.mean(self.read_times) if self.read_times else 0 ) def reset(self): """সবকিছু রিসেট""" with self.lock: self.buffer.clear() self.total_reads = 0 self.total_fills = 0 self.empty_reads = 0 self.read_times = [] self._fill_buffer() def stop(self): """টাইমার থ্রেড বন্ধ""" self.running = False if self.timer_thread.is_alive(): self.timer_thread.join(timeout=1) class RandomBufferMonitor: """ র‍্যান্ডম বাফার মনিটর - বাফারের স্বাস্থ্য পরীক্ষা করে """ def __init__(self, rng: HardwareTimerRNG): self.rng = rng self.history = [] def check_health(self) -> dict: """বাফারের স্বাস্থ্য পরীক্ষা""" stats = self.rng.get_stats() # স্বাস্থ্য স্কোর ক্যালকুলেশন health_score = 100 if stats.buffer_size < 100: health_score -= 30 if stats.empty_reads > 100: health_score -= 20 if stats.average_read_time > 0.001: # 1ms এর বেশি হলে health_score -= 10 return { 'health_score': health_score, 'buffer_level': stats.buffer_size, 'empty_reads': stats.empty_reads, 'avg_read_time_ms': stats.average_read_time * 1000, 'reads_per_sec': stats.reads_per_second, 'fills_per_sec': stats.fills_per_second, 'status': 'Healthy' if health_score > 70 else 'Warning' if health_score > 50 else 'Critical' } def log_status(self): """স্ট্যাটাস লগ""" health = self.check_health() print(f"[Buffer Monitor] {health['status']} - Level: {health['buffer_level']}, " f"Empty Reads: {health['empty_reads']}, Read Time: {health['avg_read_time_ms']:.3f}ms") self.history.append(health) # সিঙ্গলটন RNG ইনস্ট্যান্স (গোটা অ্যাপে একটাই) _rng_instance = None def get_rng() -> HardwareTimerRNG: """গ্লোবাল RNG ইনস্ট্যান্স রিটার্ন করে""" global _rng_instance if _rng_instance is None: _rng_instance = HardwareTimerRNG() return _rng_instance