阿里云-ECS实例信息统计并发送统计报告到企业微信
文章目录
- 一、脚本功能描述
- 1. 配置管理
- 2. 阿里云ECS监控
- 3. 数据分析功能
- 4. 企业微信集成
- 二、脚本内容
- 三、示例
- 四、使用场景
免费个人运维知识库,欢迎您的订阅:literator_ray.flowus.cn
一、脚本功能描述
1. 配置管理
-
支持两种配置文件格式(
.config.py
或config.py
) -
优先使用隐藏配置文件
.config.py
-
需要配置阿里云访问密钥和企业微信Webhook
2. 阿里云ECS监控
-
区域监控:专门监控北京区域的ECS实例
-
分页查询:自动处理分页,获取所有ECS实例
-
实例信息收集:获取实例ID、名称、IP地址、创建时间等关键信息
3. 数据分析功能
-
总量统计:统计全部ECS服务器数量
-
新增检测:识别近1个月内新增的ECS实例
-
时间格式处理:支持多种时间格式解析,确保兼容性
4. 企业微信集成
-
消息推送:将监控结果推送到企业微信机器人
-
格式化输出:生成清晰的表格格式报告
-
错误通知:执行过程中的错误也会发送到企业微信
可以根据自己的实际需求修改脚本
二、脚本内容
脚本共182行,动动你的小手滚动查看
#!/usr/bin/env python3
# -*- coding: utf-8 -*-import sys
import json
import os
import argparse
from dotenv import load_dotenv # 用于加载.env文件# 检查并尝试导入所需模块
try:from aliyunsdkcore.client import AcsClientfrom aliyunsdkcore.acs_exception.exceptions import ClientException, ServerExceptionfrom aliyunsdkalidns.request.v20150109.DescribeDomainRecordsRequest import DescribeDomainRecordsRequest
except ImportError as e:print(f"导入模块失败: {e}")print("请确保已安装阿里云SDK: pip install aliyun-python-sdk-core aliyun-python-sdk-alidns")sys.exit(1)# 加载.env文件(如果存在)
load_dotenv()def split_domain(full_domain):"""拆分完整域名为主域名和子域名部分例如: www.example.com -> (example.com, www)example.com -> (example.com, '@')"""parts = full_domain.split('.')# 如果域名部分少于2,则无法拆分if len(parts) < 2:raise ValueError("无效的域名格式")# 尝试识别主域名(通常为最后两部分,但需要考虑如.co.uk的情况)# 这里使用简单策略:取最后两部分作为主域名domain = '.'.join(parts[-2:])subdomain = '.'.join(parts[:-2]) if len(parts) > 2 else '@'return domain, subdomaindef query_dns_records(access_key_id, access_key_secret, region_id, full_domain):"""查询阿里云DNS记录"""try:# 拆分域名domain, rr_keyword = split_domain(full_domain)print(f"查询域名: {domain}, 子域名: {rr_keyword}")# 创建AcsClient实例client = AcsClient(access_key_id, access_key_secret, region_id)# 创建请求对象request = DescribeDomainRecordsRequest()request.set_accept_format('json')# 设置请求参数request.set_DomainName(domain)if rr_keyword != '@':request.set_RRKeyWord(rr_keyword)# 发起请求并获取响应response = client.do_action_with_exception(request)result = json.loads(response.decode('utf-8'))# 处理响应records = result.get('DomainRecords', {}).get('Record', [])if not records:print("未找到匹配的DNS记录")returnprint(f"\n找到 {len(records)} 条DNS记录:")print("-" * 80)for record in records:# 只输出指定的字段print(f"子域名: {record.get('RR')}")print(f"记录类型: {record.get('Type')}")print(f"记录值: {record.get('Value')}")print(f"TTL: {record.get('TTL')}")print(f"状态: {record.get('Status')}")# 备注字段可能不存在,使用get方法安全获取print(f"备注: {record.get('Remark', '无')}")print("-" * 80)except ClientException as e:print(f"客户端异常: {e}")print(f"错误代码: {e.error_code}")print(f"错误消息: {e.message}")print(f"请求ID: {e.request_id}")if "Forbidden" in str(e) or "not authorized" in str(e):print("\n权限错误提示:")print("1. 请确认权限策略已正确附加到RAM用户")print("2. 确认权限策略内容正确: alidns:*, pubdns:*")print("3. 权限策略更改后可能需要几分钟生效")print("4. 检查是否有其他限制策略")print("5. 尝试使用主账户AccessKey测试")except ServerException as e:print(f"服务器异常: {e}")print(f"错误代码: {e.error_code}")print(f"错误消息: {e.message}")print(f"请求ID: {e.request_id}")if "Forbidden" in str(e) or "not authorized" in str(e):print("\n权限错误提示:")print("1. 请确认权限策略已正确附加到RAM用户")print("2. 确认权限策略内容正确: alidns:*, pubdns:*")print("3. 权限策略更改后可能需要几分钟生效")print("4. 检查是否有其他限制策略")print("5. 尝试使用主账户AccessKey测试")except ValueError as e:print(f"域名格式错误: {e}")except Exception as e:print(f"发生未知错误: {e}")def get_credentials(cred_file=None):"""获取阿里云凭证:从文件、环境变量或用户输入"""# 1. 如果提供了凭证文件if cred_file:try:with open(cred_file, 'r') as f:data = json.load(f)access_key_id = data.get('access_key_id')access_key_secret = data.get('access_key_secret')if not access_key_id or not access_key_secret:print(f"凭证文件格式错误:缺少access_key_id或access_key_secret字段")return None, Noneprint(f"已从文件 {cred_file} 读取凭证")return access_key_id, access_key_secretexcept FileNotFoundError:print(f"错误:凭证文件 {cred_file} 不存在")return None, Noneexcept json.JSONDecodeError:print(f"错误:凭证文件 {cred_file} 不是有效的JSON格式")return None, Noneexcept Exception as e:print(f"读取凭证文件时出错: {e}")return None, None# 2. 尝试从环境变量获取(包括从.env文件加载的变量)access_key_id = os.environ.get('ALIYUN_ACCESS_KEY_ID')access_key_secret = os.environ.get('ALIYUN_ACCESS_KEY_SECRET')if access_key_id and access_key_secret:print("已从环境变量读取凭证")return access_key_id, access_key_secret# 3. 手动输入凭证print("请手动输入阿里云凭证")access_key_id = input("请输入阿里云AccessKey ID: ").strip()access_key_secret = input("请输入阿里云AccessKey Secret: ").strip()return access_key_id, access_key_secretdef main():# 配置阿里云区域REGION_ID = "global" # 阿里云区域ID# 设置命令行参数解析parser = argparse.ArgumentParser(description='阿里云DNS查询工具')parser.add_argument('domain', help='要查询的完整域名 (例如: www.example.com)')parser.add_argument('-f', '--file', metavar='FILE', help='从JSON文件读取凭证的路径')args = parser.parse_args()# 获取凭证access_key_id, access_key_secret = get_credentials(args.file)if not access_key_id or not access_key_secret:print("无法获取有效的凭证,退出程序")sys.exit(1)query_dns_records(access_key_id, access_key_secret, REGION_ID, args.domain)if __name__ == "__main__":main()
三、示例
四、使用场景
-
日常监控:定期检查ECS资源使用情况
-
成本控制:监控新增实例,避免资源浪费
-
安全审计:及时发现未经授权的实例创建
-
自动化运维:集成到CI/CD流程中实现自动监控
欢迎您提出问题,并指正代码中的不足
请不要以此视为定论,这只是我的个人经验