1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
| import re
import paramiko
def ssh_execute(ip, user, pwd, cmd):
"""统一的 SSH 命令执行函数"""
try:
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(ip, 22, username=user, password=pwd, timeout=5)
stdin, stdout, stderr = client.exec_command(cmd)
output = stdout.read().decode()
client.close()
return output
except Exception as e:
print(f"SSH 连接失败:{e}")
return ""
def format_mac_with_dash(mac):
"""统一 MAC 地址格式为 xxxx-xxxx-xxxx"""
mac = mac.lower().replace("-", "").replace(":", "")
if len(mac) != 12:
return mac
return f"{mac[0:4]}-{mac[4:8]}-{mac[8:12]}"
def get_arp_info(ip, user, pwd, target_ip):
"""在核心交换机上通过 ARP 查找 MAC 和初始出口"""
output = ssh_execute(ip, user, pwd, f"display arp | include {target_ip}")
if not output.strip():
print("未找到 ARP 记录")
return None, None
for line in output.splitlines():
line = line.strip()
if not re.match(r"^\d", line):
continue
parts = line.split()
if len(parts) < 6:
continue
if not re.match(r"^[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}$", parts[1]):
continue
raw_mac = parts[1]
mac = format_mac_with_dash(raw_mac)
interface = parts[3]
return mac, interface
print("未找到有效 ARP 行")
return None, None
def get_mac_interface(sw_ip, user, pwd, mac):
"""在非核心交换机上查询 MAC 地址表定位端口"""
output = ssh_execute(sw_ip, user, pwd, f"display mac-address | include {mac}")
for line in output.splitlines():
line = line.strip()
parts = line.split()
if len(parts) >= 4 and mac in parts[0]:
return parts[3]
return None
def get_bagg_members(sw_ip, user, pwd, bagg):
"""当接口为动态聚合口(BAGG)时,解析出其含有的物理成员端口"""
bagg_id = bagg.replace("BAGG", "")
cmd = f"display link-aggregation verbose Bridge-Aggregation {bagg_id}"
output = ssh_execute(sw_ip, user, pwd, cmd)
members = []
for line in output.splitlines():
line = line.strip()
m = re.match(r"^(X?GE\d+/\d+/\d+)", line)
if m:
members.append(m.group(1))
return members
def find_lldp_neighbor(sw_ip, user, pwd, interfaces):
"""通过 LLDP 邻居信息查找下一跳交换机的管理 IP"""
output = ssh_execute(sw_ip, user, pwd, "display lldp neighbor-information list")
for line in output.splitlines():
line = line.strip()
m = re.match(r"^(X?GE\d+/\d+/\d+)\s+[0-9a-fA-F\-]+\s+\S+\s+(.+)$", line)
if m:
local_if = m.group(1)
sysname = m.group(2).strip()
if local_if in interfaces:
# 假设交换机命名规范中自带 IP 尾数,如 SW-8.56
ip_match = re.search(r"8\.(\d+)", sysname)
if ip_match:
next_ip = f"192.168.8.{ip_match.group(1)}"
return sysname, next_ip
return None, None
def trace_path(sw_ip, user, pwd, mac, terminal_ip, core_interface):
"""链路追踪的核心主循环流程"""
CORE_IP = "192.168.8.1"
while True:
print(f"\n正在 {sw_ip} 上查询路径...")
# 1. 核心交换机直接采用初始 ARP 端口,避免三层 MAC 表不准的问题
if sw_ip == CORE_IP:
interface = core_interface
if interface.upper().startswith("BAGG"):
members = get_bagg_members(sw_ip, user, pwd, interface)
else:
members = [interface]
sysname, next_ip = find_lldp_neighbor(sw_ip, user, pwd, members)
else:
# 2. 非核心交换机则正常查找 MAC 地址表
interface = get_mac_interface(sw_ip, user, pwd, mac)
if not interface:
print("当前交换机未发现该 MAC 地址")
return
if interface.upper().startswith("BAGG"):
members = get_bagg_members(sw_ip, user, pwd, interface)
else:
members = [interface]
sysname, next_ip = find_lldp_neighbor(sw_ip, user, pwd, members)
# 3. 如果找不到 LLDP 邻居,说明已经到达接入层末端
if not next_ip:
print("\n最终定位结果:")
print(f"终端 IP:{terminal_ip}")
print(f"终端 MAC:{mac}")
print(f"接入层交换机 IP:{sw_ip}")
print(f"接入层具体端口:{interface}")
return
# 4. 将下一跳 IP 赋给 sw_ip,进入下一次循环
sw_ip = next_ip
if __name__ == "__main__":
core_ip = "192.168.8.1"
username = "admin"
password = "maifeng2021"
target_ip = input("请输入想查询的终端 IP:").strip()
mac, interface = get_arp_info(core_ip, username, password, target_ip)
if mac and interface:
print(f"\n初始查询成功:MAC 是 {mac},核心出口是 {interface}")
trace_path(core_ip, username, password, mac, target_ip, interface)
input('\n查询结束,按回车键退出。')
|