H3C 交换机自动链路追踪脚本:一键定位终端 IP 所在接入端口

H3C 交换机自动化链路追踪

平时排查网络,最烦的就是找某个IP具体插在哪个接入交换机上。

以前我的的方法:先登核心查ARP拿到MAC,再登下级交换机查MAC表,如果中间过好几层汇聚,得一台一台登过去翻,手敲命令敲到手酸。

索性用Python写了个自动化脚本。现在只要输入终端IP,它自己顺着网线(LLDP邻居)一路往下摸,直接把最末端的接入交换机IP和具体端口吐出来。

环境:核心交换机是 S6520X-54QC-EI,接入交换机是 S5024PV5-EI


脚本逻辑

这个脚本的整体思路可以用四个字概括:“顺藤摸瓜”。它模拟了网络工程师手工排查的步骤,并将其自动化。

步骤一:去核心交换机拿第一条线索

程序启动后,你输入一个终端 IP。脚本第一件事是登录到核心交换机(192.168.8.1)上敲下 display arp。 核心交换机会返回两条关键信息:

  • 知道了这个 IP 对应的 MAC 地址。
  • 知道了这个 MAC 地址是从核心交换机的哪个端口跑出去的。

步骤二:开启主循环向下追查

拿到初始端口后,脚本开始进入循环追踪阶段,这里对核心和后续交换机做了区分:

  • 在核心交换机上: 直接使用刚刚 ARP 查出来的端口,不再查核心的 MAC 表(因为核心做三层转发,MAC 表可能刷新不及时导致不准)。
  • 在后续交换机上: 脚本会登录到当前交换机敲 display mac-address,看看这个终端 MAC 地址目前落在哪一个端口上。

步骤三:遇到聚合口(BAGG)自动拆分

无论是核心还是下级交换机,查出来的端口如果是个链路聚合口(以 BAGG 开头),脚本就会自动执行 display link-aggregation verbose。它会把聚合口里包含的物理小端口(比如 GE1/0/1)全部捞出来,因为后面的 LLDP 邻居信息是记录在物理端口上的。如果只是普通单口,就直接使用该接口。

步骤四:通过 LLDP 探测对面是谁

拿到端口(或聚合口的物理成员口)后,脚本会去查 LLDP 邻居信息(display lldp neighbor-information list)。通过邻居表,看看这个端口对面接的是哪台交换机,并从对端设备名称(Sysname)里通过正则匹配出它的管理 IP。

步骤五:判定终点

  • 如果找到了下一跳交换机 IP: 说明还没到底,脚本会把这个 IP 传给下一次循环,自动登录到这台新交换机上,重复前面的查 MAC 流程。
  • 如果查不到任何 LLDP 邻居了: 这意味着该端口对面连接的不再是交换机,而是最终的电脑终端。这时候,当前登录的这台交换机就是接入层交换机,对应的端口就是目标物理端口。脚本打印出定位结果,结束运行。

完整脚本源码

本脚本基于 Python 的 paramiko 库实现 SSH 自动化控制,通过正则表达式解析交换机回显内容。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import re
import paramiko

def ssh_execute(ip, user, pwd, cmd):
    """统一的 SSH 命令执行函数"""
    try:
        client = paramiko.SSHClient()
        client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        client.connect(ip, 22, username=user, password=pwd, timeout=5)
        stdin, stdout, stderr = client.exec_command(cmd)
        output = stdout.read().decode()
        client.close()
        return output
    except Exception as e:
        print(f"SSH 连接失败:{e}")
        return ""

def format_mac_with_dash(mac):
    """统一 MAC 地址格式为 xxxx-xxxx-xxxx"""
    mac = mac.lower().replace("-", "").replace(":", "")
    if len(mac) != 12:
        return mac
    return f"{mac[0:4]}-{mac[4:8]}-{mac[8:12]}"

def get_arp_info(ip, user, pwd, target_ip):
    """在核心交换机上通过 ARP 查找 MAC 和初始出口"""
    output = ssh_execute(ip, user, pwd, f"display arp | include {target_ip}")
    if not output.strip():
        print("未找到 ARP 记录")
        return None, None

    for line in output.splitlines():
        line = line.strip()
        if not re.match(r"^\d", line):
            continue
        parts = line.split()
        if len(parts) < 6:
            continue
        if not re.match(r"^[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}$", parts[1]):
            continue
        raw_mac = parts[1]
        mac = format_mac_with_dash(raw_mac)
        interface = parts[3]
        return mac, interface

    print("未找到有效 ARP 行")
    return None, None

def get_mac_interface(sw_ip, user, pwd, mac):
    """在非核心交换机上查询 MAC 地址表定位端口"""
    output = ssh_execute(sw_ip, user, pwd, f"display mac-address | include {mac}")
    for line in output.splitlines():
        line = line.strip()
        parts = line.split()
        if len(parts) >= 4 and mac in parts[0]:
            return parts[3]
    return None

def get_bagg_members(sw_ip, user, pwd, bagg):
    """当接口为动态聚合口(BAGG)时,解析出其含有的物理成员端口"""
    bagg_id = bagg.replace("BAGG", "")
    cmd = f"display link-aggregation verbose Bridge-Aggregation {bagg_id}"
    output = ssh_execute(sw_ip, user, pwd, cmd)
    members = []
    for line in output.splitlines():
        line = line.strip()
        m = re.match(r"^(X?GE\d+/\d+/\d+)", line)
        if m:
            members.append(m.group(1))
    return members

def find_lldp_neighbor(sw_ip, user, pwd, interfaces):
    """通过 LLDP 邻居信息查找下一跳交换机的管理 IP"""
    output = ssh_execute(sw_ip, user, pwd, "display lldp neighbor-information list")
    for line in output.splitlines():
        line = line.strip()
        m = re.match(r"^(X?GE\d+/\d+/\d+)\s+[0-9a-fA-F\-]+\s+\S+\s+(.+)$", line)
        if m:
            local_if = m.group(1)
            sysname = m.group(2).strip()
            if local_if in interfaces:
                # 假设交换机命名规范中自带 IP 尾数,如 SW-8.56
                ip_match = re.search(r"8\.(\d+)", sysname)
                if ip_match:
                    next_ip = f"192.168.8.{ip_match.group(1)}"
                    return sysname, next_ip
    return None, None

def trace_path(sw_ip, user, pwd, mac, terminal_ip, core_interface):
    """链路追踪的核心主循环流程"""
    CORE_IP = "192.168.8.1"
    while True:
        print(f"\n正在 {sw_ip} 上查询路径...")
        
        # 1. 核心交换机直接采用初始 ARP 端口,避免三层 MAC 表不准的问题
        if sw_ip == CORE_IP:
            interface = core_interface
            if interface.upper().startswith("BAGG"):
                members = get_bagg_members(sw_ip, user, pwd, interface)
            else:
                members = [interface]
            sysname, next_ip = find_lldp_neighbor(sw_ip, user, pwd, members)
        else:
            # 2. 非核心交换机则正常查找 MAC 地址表
            interface = get_mac_interface(sw_ip, user, pwd, mac)
            if not interface:
                print("当前交换机未发现该 MAC 地址")
                return
            if interface.upper().startswith("BAGG"):
                members = get_bagg_members(sw_ip, user, pwd, interface)
            else:
                members = [interface]
            sysname, next_ip = find_lldp_neighbor(sw_ip, user, pwd, members)

        # 3. 如果找不到 LLDP 邻居,说明已经到达接入层末端
        if not next_ip:
            print("\n最终定位结果:")
            print(f"终端 IP:{terminal_ip}")
            print(f"终端 MAC:{mac}")
            print(f"接入层交换机 IP:{sw_ip}")
            print(f"接入层具体端口:{interface}")
            return

        # 4. 将下一跳 IP 赋给 sw_ip,进入下一次循环
        sw_ip = next_ip

if __name__ == "__main__":
    core_ip = "192.168.8.1"
    username = "admin"
    password = "maifeng2021"

    target_ip = input("请输入想查询的终端 IP:").strip()
    mac, interface = get_arp_info(core_ip, username, password, target_ip)

    if mac and interface:
        print(f"\n初始查询成功:MAC 是 {mac},核心出口是 {interface}")
        trace_path(core_ip, username, password, mac, target_ip, interface)
    
    input('\n查询结束,按回车键退出。')
Created by aka.g