Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import time | |
| import json | |
| from typing import Dict, List, Optional, Tuple | |
| import networkx as nx | |
| import matplotlib.pyplot as plt | |
| import matplotlib.patches as patches | |
| from io import BytesIO | |
| import base64 | |
| class RouteEntry: | |
| def __init__(self, next_hop_ip: str = "", hop_count: int = float('inf')): | |
| self.next_hop_ip = next_hop_ip | |
| self.hop_count = hop_count | |
| def __repr__(self): | |
| return f"RouteEntry(next_hop={self.next_hop_ip}, hops={self.hop_count})" | |
| class Device: | |
| def __init__(self, name: str, ip_address: str, mac_address: str): | |
| self.name = name | |
| self.ip_address = ip_address | |
| self.mac_address = mac_address | |
| self.gateway = "" | |
| self.connected_router = None | |
| def connect_router(self, router, gateway_ip: str): | |
| self.connected_router = router | |
| self.gateway = gateway_ip | |
| class Router: | |
| def __init__(self, name: str, ip_address: str): | |
| self.name = name | |
| self.ip_address = ip_address | |
| self.routing_table: Dict[str, RouteEntry] = {} | |
| self.neighbors: List['Router'] = [] | |
| self.connected_devices: List[Device] = [] | |
| def connect_neighbor(self, neighbor: 'Router'): | |
| if neighbor not in self.neighbors: | |
| self.neighbors.append(neighbor) | |
| def connect_device(self, device: Device): | |
| self.connected_devices.append(device) | |
| device.connect_router(self, self.ip_address) | |
| # Add route to directly connected network | |
| network = self.get_network(device.ip_address) | |
| self.routing_table[network] = RouteEntry(device.ip_address, 1) | |
| def exchange_routes(self) -> List[str]: | |
| updates = [] | |
| for neighbor in self.neighbors: | |
| for dest_net, entry in self.routing_table.items(): | |
| new_hops = entry.hop_count + 1 | |
| if new_hops > 15: # RIP hop limit | |
| continue | |
| if (dest_net not in neighbor.routing_table or | |
| new_hops < neighbor.routing_table[dest_net].hop_count): | |
| neighbor.routing_table[dest_net] = RouteEntry(self.ip_address, new_hops) | |
| update_msg = f"{neighbor.name} learned route to {dest_net} via {self.ip_address} (hops: {new_hops})" | |
| updates.append(update_msg) | |
| return updates | |
| def get_routing_table(self) -> Dict[str, Dict[str, any]]: | |
| table = {} | |
| for dest, entry in self.routing_table.items(): | |
| table[dest] = { | |
| "next_hop": entry.next_hop_ip, | |
| "hop_count": entry.hop_count if entry.hop_count != float('inf') else "∞" | |
| } | |
| return table | |
| def get_next_hop(self, dest_ip: str) -> str: | |
| dest_net = self.get_network(dest_ip) | |
| if dest_net in self.routing_table: | |
| return self.routing_table[dest_net].next_hop_ip | |
| return "unreachable" | |
| def get_network(ip: str) -> str: | |
| parts = ip.split('.') | |
| return f"{parts[0]}.{parts[1]}.{parts[2]}.0" | |
| class NetworkSimulator: | |
| def __init__(self): | |
| self.routers: List[Router] = [] | |
| self.devices: List[Device] = [] | |
| self.simulation_log: List[str] = [] | |
| self.setup_default_topology() | |
| def setup_default_topology(self): | |
| # Create routers | |
| router1 = Router("Router1", "192.168.1.1") | |
| router2 = Router("Router2", "192.168.2.1") | |
| router3 = Router("Router3", "192.168.3.1") | |
| # Connect routers | |
| router1.connect_neighbor(router2) | |
| router2.connect_neighbor(router1) | |
| router2.connect_neighbor(router3) | |
| router3.connect_neighbor(router2) | |
| # Create devices | |
| dev1 = Device("PC1", "192.168.1.2", "00:1A:2B:3C:4D:01") | |
| dev2 = Device("PC2", "192.168.2.2", "00:1A:2B:3C:4D:02") | |
| dev3 = Device("PC3", "192.168.3.2", "00:1A:2B:3C:4D:03") | |
| # Connect devices to routers | |
| router1.connect_device(dev1) | |
| router2.connect_device(dev2) | |
| router3.connect_device(dev3) | |
| self.routers = [router1, router2, router3] | |
| self.devices = [dev1, dev2, dev3] | |
| self.simulation_log = ["Network topology initialized"] | |
| def run_rip_simulation(self, rounds: int = 3) -> Tuple[str, Dict]: | |
| log = [] | |
| log.append("=== Starting RIP Simulation ===\n") | |
| for round_num in range(1, rounds + 1): | |
| log.append(f"--- RIP Round {round_num} ---") | |
| round_updates = [] | |
| for router in self.routers: | |
| updates = router.exchange_routes() | |
| round_updates.extend(updates) | |
| if round_updates: | |
| log.extend(round_updates) | |
| else: | |
| log.append("No new route updates") | |
| log.append("") | |
| # Generate routing tables | |
| routing_tables = {} | |
| for router in self.routers: | |
| routing_tables[router.name] = router.get_routing_table() | |
| log.append("=== Final Routing Tables ===") | |
| for router_name, table in routing_tables.items(): | |
| log.append(f"\n{router_name} Routing Table:") | |
| for dest, info in table.items(): | |
| log.append(f" {dest} → Next Hop: {info['next_hop']}, Hops: {info['hop_count']}") | |
| return "\n".join(log), routing_tables | |
| def simulate_data_transfer(self, src_device_name: str, dst_device_name: str) -> str: | |
| src_device = next((d for d in self.devices if d.name == src_device_name), None) | |
| dst_device = next((d for d in self.devices if d.name == dst_device_name), None) | |
| if not src_device or not dst_device: | |
| return "Error: Device not found" | |
| log = [] | |
| log.append(f"\n=== Data Transfer: {src_device.name} → {dst_device.name} ===") | |
| log.append(f"Source: {src_device.ip_address}") | |
| log.append(f"Destination: {dst_device.ip_address}") | |
| next_hop = src_device.connected_router.get_next_hop(dst_device.ip_address) | |
| if next_hop == "unreachable": | |
| log.append("❌ Destination unreachable") | |
| else: | |
| log.append(f"✅ Next hop: {next_hop}") | |
| log.append(f"✅ Data successfully routed to {dst_device.name}") | |
| return "\n".join(log) | |
| def create_network_diagram(self) -> str: | |
| # Create network graph | |
| G = nx.Graph() | |
| pos = {} | |
| # Add routers | |
| for i, router in enumerate(self.routers): | |
| G.add_node(router.name, type='router') | |
| pos[router.name] = (i * 3, 1) # Position routers horizontally | |
| # Add devices | |
| for i, device in enumerate(self.devices): | |
| G.add_node(device.name, type='device') | |
| pos[device.name] = (i * 3, 0) # Position devices below routers | |
| # Add edges between routers | |
| for router in self.routers: | |
| for neighbor in router.neighbors: | |
| G.add_edge(router.name, neighbor.name) | |
| # Add edges between devices and routers | |
| for device in self.devices: | |
| if device.connected_router: | |
| G.add_edge(device.name, device.connected_router.name) | |
| # Create matplotlib figure | |
| plt.figure(figsize=(12, 8)) | |
| # Draw nodes | |
| router_nodes = [n for n, d in G.nodes(data=True) if d['type'] == 'router'] | |
| device_nodes = [n for n, d in G.nodes(data=True) if d['type'] == 'device'] | |
| nx.draw_networkx_nodes(G, pos, nodelist=router_nodes, | |
| node_color='lightblue', node_size=1500, | |
| node_shape='s', label='Routers') | |
| nx.draw_networkx_nodes(G, pos, nodelist=device_nodes, | |
| node_color='lightgreen', node_size=1000, | |
| node_shape='o', label='Devices') | |
| # Draw edges | |
| nx.draw_networkx_edges(G, pos, alpha=0.6, width=2) | |
| # Draw labels | |
| nx.draw_networkx_labels(G, pos, font_size=10, font_weight='bold') | |
| # Add IP addresses as labels | |
| ip_labels = {} | |
| for router in self.routers: | |
| ip_labels[router.name] = router.ip_address | |
| for device in self.devices: | |
| ip_labels[device.name] = device.ip_address | |
| label_pos = {k: (v[0], v[1] - 0.2) for k, v in pos.items()} | |
| nx.draw_networkx_labels(G, label_pos, ip_labels, font_size=8, font_color='red') | |
| plt.title("Network Topology", size=16, weight='bold') | |
| plt.legend() | |
| plt.axis('off') | |
| plt.tight_layout() | |
| # Convert to base64 string | |
| buffer = BytesIO() | |
| plt.savefig(buffer, format='png', dpi=150, bbox_inches='tight') | |
| buffer.seek(0) | |
| image_base64 = base64.b64encode(buffer.getvalue()).decode() | |
| plt.close() | |