import requests
from datetime import datetime, timedelta
import os
import re
ES_HOST = os.getenv("ES_HOST", "http://192.168.0.250:9200")
ES_USER = os.getenv("ES_USER", "")
ES_PASS = os.getenv("ES_PASS", "")
RETENTION_DAYS = 3
INDEX_PATTERN = r"^(.*)-(\d{8})$" def get_indices():"""获取所有Elasticsearch索引"""auth = (ES_USER, ES_PASS) if ES_USER and ES_PASS else Noneurl = f"{ES_HOST}/_cat/indices?format=json"try:response = requests.get(url, auth=auth, timeout=30)response.raise_for_status()return [idx["index"] for idx in response.json()]except Exception as e:print(f"获取索引失败: {e}")return []def is_skywalking_index(index_name):"""判断是否为SkyWalking索引(名称格式:任意前缀-8位日期)"""return re.match(INDEX_PATTERN, index_name) is not Nonedef parse_index_date(index_name):"""从索引名称中提取日期"""match = re.match(INDEX_PATTERN, index_name)return match.group(2) if match else None def delete_indices(indices_to_delete):"""删除指定索引(同上个版本,保持不变)"""auth = (ES_USER, ES_PASS) if ES_USER and ES_PASS else Nonefor index in indices_to_delete:try:url = f"{ES_HOST}/{index}"response = requests.delete(url, auth=auth, timeout=30)if response.status_code == 200:print(f"已删除索引: {index}")else:print(f"删除失败({response.status_code}): {index}")except Exception as e:print(f"删除异常: {index} - {e}")def main():cutoff_date = (datetime.utcnow() - timedelta(days=RETENTION_DAYS)).strftime("%Y%m%d")print(f"清理SkyWalking索引,保留日期 >= {cutoff_date}")all_indices = get_indices()skywalking_indices = [idx for idx in all_indices if is_skywalking_index(idx)]indices_to_delete = []for index in skywalking_indices:index_date = parse_index_date(index)if index_date and index_date < cutoff_date:indices_to_delete.append(index)if indices_to_delete:print(f"待删除索引: {', '.join(indices_to_delete)}")delete_indices(indices_to_delete)else:print("无符合清理条件的索引")if __name__ == "__main__":main()