当前位置: 首页 > news >正文

自动化运维实验

目录

一、实验拓扑

二、实验目的

三、实验步骤

实验思路:

代码部分:

四、实验结果:


一、实验拓扑

二、实验目的

利用python脚本,在本地,或者虚拟机里实现,设备CRC数量统计,并输出成表格

三、实验步骤

实验开始之前先搭好环境,测试无误再开始

实验思路:

利用ip.txt,存放需要检查的设备IP

再一台一台登录,匹配并记录接口名,和CRC数量,最终再输出保存

代码部分:

#!/usr/bin/env python3# -*- coding: utf-8 -*-import argparseimport osimport reimport timeimport socketimport getpassimport paramikoimport pandas as pdfrom paramiko import SSHException, AuthenticationExceptionDEFAULT_IP_FILE = 'ip.txt'DEFAULT_OUTPUT = 'crc_results.xlsx'DEFAULT_COMMAND = 'dis interface'DEFAULT_CONNECT_TIMEOUT = 5DEFAULT_CMD_TIMEOUT = 30MORE_PROMPTS = [b'--More--', b'---- More ----', b'--More--', b'More:', b'== More ==', b'--More--', b'\x1b\[K--More--']def read_ip_list(path):ips = []if not os.path.exists(path):print(f"[!] IP file not found: {path}")return ipswith open(path, 'r', encoding='utf-8') as f:for raw in f:line = raw.strip()if not line:continueif line.startswith('#'):continueparts = re.split(r'[\s,]+', line)if parts and parts[0]:ips.append(parts[0])return ipsdef recv_with_paging(shell, timeout=DEFAULT_CMD_TIMEOUT):"""从交互 shell 中读取输出,自动处理分页(遇到 --More-- 等发送空格继续)。返回解码后的文本。"""buf = b''start = time.time()last_recv = time.time()shell.settimeout(1.0)while True:try:if shell.recv_ready():chunk = shell.recv(65536)if not chunk:breakbuf += chunklast_recv = time.time()# 检查是否包含常见分页提示(原始字节)for mp in MORE_PROMPTS:if mp in chunk:try:shell.send(' ')except Exception:pass# 也处理常见 ASCII 控制字符后跟 More 文本(部分设备会带 escape)# 继续循环读取else:time.sleep(0.1)except socket.timeout:# no data right nowpassexcept Exception:# 其他异常则跳出breakif (time.time() - start) > timeout:break# 当已有数据并且很久没读到新数据,认为输出结束if buf and (time.time() - last_recv) > 1.2:breaktry:return buf.decode(errors='ignore')except:return buf.decode('utf-8', 'ignore')def run_command_interactive(client, command, cmd_timeout=DEFAULT_CMD_TIMEOUT):shell = client.invoke_shell()time.sleep(0.2)# 尝试关闭分页# 兼容多厂商:terminal length 0, screen-length disable, page-off 等try:shell.send('terminal length 0\n')time.sleep(0.15)shell.send('screen-length 0 temporary\n')time.sleep(0.15)shell.send('page off\n')time.sleep(0.15)except Exception:pass# 清空 bannertry:if shell.recv_ready():_ = shell.recv(65536)except Exception:pass# 发送命令并读取,自动处理分页try:shell.send(command + '\n')except Exception:passoutput = recv_with_paging(shell, timeout=cmd_timeout)try:shell.close()except Exception:passreturn outputdef split_interface_blocks(output):"""更严格的 H3C 专用分割:- 识别常见 H3C 前缀:GigabitEthernet, Ethernet, Ten-GigabitEthernet, Eth-Trunk,Vlan-interface / Vlanif, Loopback, FortyGigabitEthernet 等。- 要求接口名包含数字,避免把 'Flow-control' 等字段当接口头。返回列表 (iface_name, block_text)"""lines = output.splitlines()blocks = []cur_iface = Nonecur_lines = []# 列出常见的接口前缀(可按需扩展)iface_prefixes = [r'GigabitEthernet', r'GigEth', r'Ten-GigabitEthernet', r'FortyGigabitEthernet',r'Ethernet', r'Eth-Trunk', r'Vlan-interface', r'Vlanif', r'Loopback', r'VEth', r'Veth']prefix_pat = r'(' + '|'.join(iface_prefixes) + r')'# 可能的 header 样式:# "GigabitEthernet1/0/1 current state : UP"# "Interface: GigabitEthernet1/0/1"# "GigabitEthernet1/0/1 is down"p1 = re.compile(r'^\s*(?:Interface[:\s]+)?' + prefix_pat + r'([A-Za-z0-9\/\-\._]*\d+)\b', re.IGNORECASE)# 另一种:全名连续(例如 "GigabitEthernet1/0/1 current state : UP")p2 = re.compile(r'^\s*' + prefix_pat + r'([A-Za-z0-9\/\-\._]*\d+)\b.*(?:current state|is up|is down|UP|DOWN|administratively down)', re.IGNORECASE)for ln in lines:ln_str = ln.rstrip('\r\n')m = Nonefor p in (p1, p2):mm = p.match(ln_str)if mm:# 名字由 mm.group(1)+group(2) 或 mm.group(1)+group(2) 组成,取匹配到的完整接口名# 某些 pattern 捕获了两部分(前缀和后缀),拼接起来groups = [g for g in mm.groups() if g]name = ''.join(groups).strip()# 最终再确认包含数字if not re.search(r'\d', name):continuem = namebreakif m:name = m.rstrip(',:')if cur_iface is not None:blocks.append((cur_iface, '\n'.join(cur_lines)))cur_iface = namecur_lines = [ln_str]else:if cur_iface is not None:cur_lines.append(ln_str)else:cur_lines.append(ln_str)if cur_iface is not None:blocks.append((cur_iface, '\n'.join(cur_lines)))else:blocks = [('Unknown', '\n'.join(cur_lines).strip())] if cur_lines else [('Unknown', output.strip())]return blocksdef extract_crc_from_block(block_text):"""多种方式提取 CRC 数字:- '4 input errors, 2 CRC, 0 frame' -> 2- 'CRC: 2' or 'CRC 2' or '2 CRC' -> 2- 'input errors: 2, CRC: 2' etc.返回整数(若能解析)或 None。"""# 常见组合patterns = [re.compile(r'(\d+)\s+input errors.*?(\d+)\s+CRC', re.IGNORECASE | re.DOTALL),re.compile(r'input errors[:\s]*\d+[,;\s]*CRC[:\s]*(\d+)', re.IGNORECASE),re.compile(r'CRC[:\s]+(\d+)', re.IGNORECASE),re.compile(r'\b(\d+)\s+CRC\b', re.IGNORECASE),re.compile(r'CRC errors[:\s]*(\d+)', re.IGNORECASE),re.compile(r'CRC\s+(\d+)', re.IGNORECASE)]for rg in patterns:m = rg.search(block_text)if m:# 找最后一组数字(某些 pattern 有两个捕获组)for g in reversed(m.groups()):if g and g.isdigit():try:return int(g)except:pass# 如果找不到数字,但出现单词 CRC,则标记为 Unknown (存在 CRC 词但无数字)if re.search(r'\bCRC\b', block_text, re.IGNORECASE):return None  # Unknown count but CRC mentioned# 否则认为没有 CRC(记录为 0)return 0def check_one_device(ip, username, password, command, connect_timeout, cmd_timeout):client = paramiko.SSHClient()client.set_missing_host_key_policy(paramiko.AutoAddPolicy())try:client.connect(hostname=ip,username=username,password=password,timeout=connect_timeout,banner_timeout=connect_timeout,auth_timeout=connect_timeout)except AuthenticationException as e:return False, f'Authentication failed: {e}'except socket.timeout:return False, 'Connection timed out'except SSHException as e:return False, f'SSHException: {e}'except Exception as e:return False, f'Connection failed: {e}'try:output = run_command_interactive(client, command, cmd_timeout=cmd_timeout)except Exception as e:try:client.close()except:passreturn False, f'Failed to run command: {e}'try:client.close()except:passif not output:return True, []  # 没有输出也返回成功但没有接口blocks = split_interface_blocks(output)results = []for iface, block in blocks:crc_val = extract_crc_from_block(block)# 规范输出值:数字 -> string of number; None -> 'Unknown'; 0 -> '0'if crc_val is None:crc_str = 'Unknown'else:crc_str = str(int(crc_val))results.append({'interface': iface, 'crc': crc_str})return True, resultsdef main():parser = argparse.ArgumentParser(description='Check interfaces CRC on devices listed in ip.txt')parser.add_argument('-i', '--ipfile', default=os.environ.get('IP_FILE', DEFAULT_IP_FILE), help='IP list file (default ip.txt)')parser.add_argument('-u', '--username', help='SSH username (or set SSH_USER env)')parser.add_argument('-p', '--password', help='SSH password (or set SSH_PASS env) - warning: visible in process list')parser.add_argument('-c', '--command', default=os.environ.get('CRC_CMD', DEFAULT_COMMAND), help=f'Command to run on device (default: "{DEFAULT_COMMAND}")')parser.add_argument('-o', '--output', default=DEFAULT_OUTPUT, help='Output Excel filename')parser.add_argument('--connect-timeout', type=int, default=int(os.environ.get('SSH_CONNECT_TIMEOUT', DEFAULT_CONNECT_TIMEOUT)), help='SSH connect timeout seconds (default 5)')parser.add_argument('--cmd-timeout', type=int, default=int(os.environ.get('SSH_CMD_TIMEOUT', DEFAULT_CMD_TIMEOUT)), help='Command read timeout seconds (default 30)')args = parser.parse_args()ips = read_ip_list(args.ipfile)if not ips:print(f'[!] No IPs found in {args.ipfile}')returnusername = args.username or os.environ.get('SSH_USER') or input('SSH username: ')password = args.password or os.environ.get('SSH_PASS') or getpass.getpass('SSH password: ')all_rows = []failures = []print(f'[+] Loaded {len(ips)} IPs, connecting with timeout {args.connect_timeout}s, command timeout {args.cmd_timeout}s')for ip in ips:print(f'[{ip}] connecting...')success, data = check_one_device(ip, username, password, args.command, args.connect_timeout, args.cmd_timeout)if not success:print(f'[{ip}] FAILED: {data}')failures.append({'ip': ip, 'reason': data})continueparsed = dataif not parsed:print(f'[{ip}] OK, but no interface blocks parsed.')# still add a row to indicate device had no data? We skip adding rows for this device (user requested only IP/interface/crc)continueprint(f'[{ip}] OK, {len(parsed)} interfaces parsed.')for ent in parsed:all_rows.append({'ip': ip, 'interface': ent['interface'], 'crc': ent['crc']})# 写 Excel:只包含 ip, interface, crc;另一个 sheet 记录失败df = pd.DataFrame(all_rows, columns=['ip', 'interface', 'crc'])df_fail = pd.DataFrame(failures, columns=['ip', 'reason'])with pd.ExcelWriter(args.output, engine='openpyxl') as writer:df.to_excel(writer, sheet_name='crc_results', index=False)df_fail.to_excel(writer, sheet_name='ssh_failures', index=False)print(f'[+] Done. Results saved to {args.output}')print(f'[+] Devices checked: {len(ips)}, interfaces rows: {len(all_rows)}, ssh failures: {len(failures)}')if __name__ == '__main__':main()

四、实验结果:

建议在终端运行,pycharm运行,getpass会卡住

最终生成excel表:

http://www.dtcms.com/a/327960.html

相关文章:

  • Baumer高防护相机如何通过YoloV8深度学习模型实现纸箱的实时检测计数(C#代码UI界面版)
  • 备份单表的方法
  • 工业相机镜头选型
  • HTTPS加密与私有CA配置全攻略
  • AI智能体平台大爆发,2025AI智能体平台TOP30
  • 【Unity3D实例-功能-下蹲】角色下蹲(二)穿越隧道
  • Python爬虫获取淘宝店铺所有商品信息API接口
  • IoTDB与传统数据库的核心区别
  • 【Linux系列】服务器 IP 地址查询
  • OpenBMC中C++单例模式架构与实现全解析
  • 站在Vue的角度,对比鸿蒙开发中的递归渲染
  • 线缆桥架、管道设计规范详解
  • 异步并发×编译性能:Dart爬虫的实战突围
  • USB 2.0 3.0 插拔 ftrace 详解
  • MySQL相关概念和易错知识点(5)(索引、事务、MVCC)
  • LintCode第1526-N叉树的前序遍历
  • MongoDB 入门指南(一):从 0 到 1 学会文档数据库
  • QT之问题解决记录1:上下位机通信中断而不自知
  • react+redux+toolkit来实现公共数据的处理-对比vuex
  • 深度学习日志及可视化过程
  • 【机器学习深度学习】归一化层
  • Java 编程每日一题:实现一个简易的 LRU 缓存
  • JavaSE:数据类型与变量
  • 13-docker的轻量级私有仓库之docker-registry
  • 网络安全第1—2天笔记
  • 【19】万集科技——万集科技嵌入式,校招 一面,二面,面试问答记录
  • 数据分析与可视化
  • Unity数据可视化图表插件XCharts
  • CS2服务器是何方神圣
  • 21.Linux HTTPS服务