Dynamic_routing / app.py
V8055's picture
Update app.py
d8e0173 verified
import gradio as gr
import time
import json
from typing import Dict, List, Optional, Tuple
import networkx as nx
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from io import BytesIO
import base64
class RouteEntry:
def __init__(self, next_hop_ip: str = "", hop_count: int = float('inf')):
self.next_hop_ip = next_hop_ip
self.hop_count = hop_count
def __repr__(self):
return f"RouteEntry(next_hop={self.next_hop_ip}, hops={self.hop_count})"
class Device:
def __init__(self, name: str, ip_address: str, mac_address: str):
self.name = name
self.ip_address = ip_address
self.mac_address = mac_address
self.gateway = ""
self.connected_router = None
def connect_router(self, router, gateway_ip: str):
self.connected_router = router
self.gateway = gateway_ip
class Router:
def __init__(self, name: str, ip_address: str):
self.name = name
self.ip_address = ip_address
self.routing_table: Dict[str, RouteEntry] = {}
self.neighbors: List['Router'] = []
self.connected_devices: List[Device] = []
def connect_neighbor(self, neighbor: 'Router'):
if neighbor not in self.neighbors:
self.neighbors.append(neighbor)
def connect_device(self, device: Device):
self.connected_devices.append(device)
device.connect_router(self, self.ip_address)
# Add route to directly connected network
network = self.get_network(device.ip_address)
self.routing_table[network] = RouteEntry(device.ip_address, 1)
def exchange_routes(self) -> List[str]:
updates = []
for neighbor in self.neighbors:
for dest_net, entry in self.routing_table.items():
new_hops = entry.hop_count + 1
if new_hops > 15: # RIP hop limit
continue
if (dest_net not in neighbor.routing_table or
new_hops < neighbor.routing_table[dest_net].hop_count):
neighbor.routing_table[dest_net] = RouteEntry(self.ip_address, new_hops)
update_msg = f"{neighbor.name} learned route to {dest_net} via {self.ip_address} (hops: {new_hops})"
updates.append(update_msg)
return updates
def get_routing_table(self) -> Dict[str, Dict[str, any]]:
table = {}
for dest, entry in self.routing_table.items():
table[dest] = {
"next_hop": entry.next_hop_ip,
"hop_count": entry.hop_count if entry.hop_count != float('inf') else "∞"
}
return table
def get_next_hop(self, dest_ip: str) -> str:
dest_net = self.get_network(dest_ip)
if dest_net in self.routing_table:
return self.routing_table[dest_net].next_hop_ip
return "unreachable"
@staticmethod
def get_network(ip: str) -> str:
parts = ip.split('.')
return f"{parts[0]}.{parts[1]}.{parts[2]}.0"
class NetworkSimulator:
def __init__(self):
self.routers: List[Router] = []
self.devices: List[Device] = []
self.simulation_log: List[str] = []
self.setup_default_topology()
def setup_default_topology(self):
# Create routers
router1 = Router("Router1", "192.168.1.1")
router2 = Router("Router2", "192.168.2.1")
router3 = Router("Router3", "192.168.3.1")
# Connect routers
router1.connect_neighbor(router2)
router2.connect_neighbor(router1)
router2.connect_neighbor(router3)
router3.connect_neighbor(router2)
# Create devices
dev1 = Device("PC1", "192.168.1.2", "00:1A:2B:3C:4D:01")
dev2 = Device("PC2", "192.168.2.2", "00:1A:2B:3C:4D:02")
dev3 = Device("PC3", "192.168.3.2", "00:1A:2B:3C:4D:03")
# Connect devices to routers
router1.connect_device(dev1)
router2.connect_device(dev2)
router3.connect_device(dev3)
self.routers = [router1, router2, router3]
self.devices = [dev1, dev2, dev3]
self.simulation_log = ["Network topology initialized"]
def run_rip_simulation(self, rounds: int = 3) -> Tuple[str, Dict]:
log = []
log.append("=== Starting RIP Simulation ===\n")
for round_num in range(1, rounds + 1):
log.append(f"--- RIP Round {round_num} ---")
round_updates = []
for router in self.routers:
updates = router.exchange_routes()
round_updates.extend(updates)
if round_updates:
log.extend(round_updates)
else:
log.append("No new route updates")
log.append("")
# Generate routing tables
routing_tables = {}
for router in self.routers:
routing_tables[router.name] = router.get_routing_table()
log.append("=== Final Routing Tables ===")
for router_name, table in routing_tables.items():
log.append(f"\n{router_name} Routing Table:")
for dest, info in table.items():
log.append(f" {dest} → Next Hop: {info['next_hop']}, Hops: {info['hop_count']}")
return "\n".join(log), routing_tables
def simulate_data_transfer(self, src_device_name: str, dst_device_name: str) -> str:
src_device = next((d for d in self.devices if d.name == src_device_name), None)
dst_device = next((d for d in self.devices if d.name == dst_device_name), None)
if not src_device or not dst_device:
return "Error: Device not found"
log = []
log.append(f"\n=== Data Transfer: {src_device.name}{dst_device.name} ===")
log.append(f"Source: {src_device.ip_address}")
log.append(f"Destination: {dst_device.ip_address}")
next_hop = src_device.connected_router.get_next_hop(dst_device.ip_address)
if next_hop == "unreachable":
log.append("❌ Destination unreachable")
else:
log.append(f"✅ Next hop: {next_hop}")
log.append(f"✅ Data successfully routed to {dst_device.name}")
return "\n".join(log)
def create_network_diagram(self) -> str:
# Create network graph
G = nx.Graph()
pos = {}
# Add routers
for i, router in enumerate(self.routers):
G.add_node(router.name, type='router')
pos[router.name] = (i * 3, 1) # Position routers horizontally
# Add devices
for i, device in enumerate(self.devices):
G.add_node(device.name, type='device')
pos[device.name] = (i * 3, 0) # Position devices below routers
# Add edges between routers
for router in self.routers:
for neighbor in router.neighbors:
G.add_edge(router.name, neighbor.name)
# Add edges between devices and routers
for device in self.devices:
if device.connected_router:
G.add_edge(device.name, device.connected_router.name)
# Create matplotlib figure
plt.figure(figsize=(12, 8))
# Draw nodes
router_nodes = [n for n, d in G.nodes(data=True) if d['type'] == 'router']
device_nodes = [n for n, d in G.nodes(data=True) if d['type'] == 'device']
nx.draw_networkx_nodes(G, pos, nodelist=router_nodes,
node_color='lightblue', node_size=1500,
node_shape='s', label='Routers')
nx.draw_networkx_nodes(G, pos, nodelist=device_nodes,
node_color='lightgreen', node_size=1000,
node_shape='o', label='Devices')
# Draw edges
nx.draw_networkx_edges(G, pos, alpha=0.6, width=2)
# Draw labels
nx.draw_networkx_labels(G, pos, font_size=10, font_weight='bold')
# Add IP addresses as labels
ip_labels = {}
for router in self.routers:
ip_labels[router.name] = router.ip_address
for device in self.devices:
ip_labels[device.name] = device.ip_address
label_pos = {k: (v[0], v[1] - 0.2) for k, v in pos.items()}
nx.draw_networkx_labels(G, label_pos, ip_labels, font_size=8, font_color='red')
plt.title("Network Topology", size=16, weight='bold')
plt.legend()
plt.axis('off')
plt.tight_layout()
# Convert to base64 string
buffer = BytesIO()
plt.savefig(buffer, format='png', dpi=150, bbox_inches='tight')
buffer.seek(0)
image_base64 = base64.b64encode(buffer.getvalue()).decode()
plt.close()