import requests
from datetime import datetime, timedelta
import os
import re
ES_HOST = os.getenv("ES_HOST", "http://192.168.0.250:9200")
ES_USER = os.getenv("ES_USER", "")
ES_PASS = os.getenv("ES_PASS", "")
RETENTION_DAYS = 3
INDEX_PATTERN = r"^(.*)-(\d{8})$"
def get_indices():
"""获取所有Elasticsearch索引"""
auth = (ES_USER, ES_PASS) if ES_USER and ES_PASS else None
url = f"{ES_HOST}/_cat/indices?format=json"
try:
response = requests.get(url, auth=auth, timeout=30)
response.raise_for_status()
return [idx["index"] for idx in response.json()]
except Exception as e:
print(f"获取索引失败: {e}")
return []
def is_skywalking_index(index_name):
"""判断是否为SkyWalking索引(名称格式:任意前缀-8位日期)"""
return re.match(INDEX_PATTERN, index_name) is not None
def parse_index_date(index_name):
"""从索引名称中提取日期"""
match = re.match(INDEX_PATTERN, index_name)
return match.group(2) if match else None
def delete_indices(indices_to_delete):
"""删除指定索引(同上个版本,保持不变)"""
auth = (ES_USER, ES_PASS) if ES_USER and ES_PASS else None
for index in indices_to_delete:
try:
url = f"{ES_HOST}/{index}"
response = requests.delete(url, auth=auth, timeout=30)
if response.status_code == 200:
print(f"已删除索引: {index}")
else:
print(f"删除失败({response.status_code}): {index}")
except Exception as e:
print(f"删除异常: {index} - {e}")
def main():
cutoff_date = (datetime.utcnow() - timedelta(days=RETENTION_DAYS)).strftime("%Y%m%d")
print(f"清理SkyWalking索引,保留日期 >= {cutoff_date}")
all_indices = get_indices()
skywalking_indices = [idx for idx in all_indices if is_skywalking_index(idx)]
indices_to_delete = []
for index in skywalking_indices:
index_date = parse_index_date(index)
if index_date and index_date < cutoff_date:
indices_to_delete.append(index)
if indices_to_delete:
print(f"待删除索引: {', '.join(indices_to_delete)}")
delete_indices(indices_to_delete)
else:
print("无符合清理条件的索引")
if __name__ == "__main__":
main()