清理skywalking历史索引

发布于:2025-05-28 ⋅ 阅读:(24) ⋅ 点赞:(0)
import requests
from datetime import datetime, timedelta
import os
import re

# 配置参数
ES_HOST = os.getenv("ES_HOST", "http://192.168.0.250:9200")  # ES地址
ES_USER = os.getenv("ES_USER", "")                      # 用户名(无认证则留空)
ES_PASS = os.getenv("ES_PASS", "")                      # 密码(无认证则留空)
RETENTION_DAYS = 3                                     # 保留天数
INDEX_PATTERN = r"^(.*)-(\d{8})$"                       # 匹配所有以日期结尾的索引(兼容SkyWalking所有类型)

def get_indices():
    """获取所有Elasticsearch索引"""
    auth = (ES_USER, ES_PASS) if ES_USER and ES_PASS else None
    url = f"{ES_HOST}/_cat/indices?format=json"
    try:
        response = requests.get(url, auth=auth, timeout=30)
        response.raise_for_status()
        return [idx["index"] for idx in response.json()]
    except Exception as e:
        print(f"获取索引失败: {e}")
        return []

def is_skywalking_index(index_name):
    """判断是否为SkyWalking索引(名称格式:任意前缀-8位日期)"""
    return re.match(INDEX_PATTERN, index_name) is not None

def parse_index_date(index_name):
    """从索引名称中提取日期"""
    match = re.match(INDEX_PATTERN, index_name)
    return match.group(2) if match else None  # group(2) 对应日期部分

def delete_indices(indices_to_delete):
    """删除指定索引(同上个版本,保持不变)"""
    auth = (ES_USER, ES_PASS) if ES_USER and ES_PASS else None
    for index in indices_to_delete:
        try:
            url = f"{ES_HOST}/{index}"
            response = requests.delete(url, auth=auth, timeout=30)
            if response.status_code == 200:
                print(f"已删除索引: {index}")
            else:
                print(f"删除失败({response.status_code}): {index}")
        except Exception as e:
            print(f"删除异常: {index} - {e}")

def main():
    # 计算15天前的日期
    cutoff_date = (datetime.utcnow() - timedelta(days=RETENTION_DAYS)).strftime("%Y%m%d")
    print(f"清理SkyWalking索引,保留日期 >= {cutoff_date}")

    # 获取并过滤索引
    all_indices = get_indices()
    skywalking_indices = [idx for idx in all_indices if is_skywalking_index(idx)]
    indices_to_delete = []
    
    for index in skywalking_indices:
        index_date = parse_index_date(index)
        if index_date and index_date < cutoff_date:
            indices_to_delete.append(index)

    # 执行删除
    if indices_to_delete:
        print(f"待删除索引: {', '.join(indices_to_delete)}")
        delete_indices(indices_to_delete)
    else:
        print("无符合清理条件的索引")

if __name__ == "__main__":
    main()


网站公告

今日签到

点亮在社区的每一天
去签到