脚本
import logging
from datetime import datetime, timedelta
from elasticsearch import Elasticsearch
# 配置日志记录
logging.basicConfig(
filename='delete_uat_indices.log',
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# Elasticsearch 集群的连接信息
ELASTICSEARCH_HOST = "http://192.178.18.209:9200" # 修改为你的 Elasticsearch 地址
USERNAME = "elastic" # 修改为你的用户名
PASSWORD = "7F9L%mYWjWtb" # 修改为你的密码
INDEX_PATTERN = "*uat*" # 匹配带有 "uat" 的索引
def delete_old_indices():
# 连接到 Elasticsearch 集群
es = Elasticsearch([ELASTICSEARCH_HOST], basic_auth=(USERNAME, PASSWORD))
# 检查连接是否成功
if not es.ping():
logging.error("无法连接到 Elasticsearch 集群")
return
logging.info("成功连接到 Elasticsearch 集群")
# 计算 5 天前的日期
five_days_ago = datetime.now() - timedelta(days=2)
date_str = five_days_ago.strftime('%Y.%m.%d')
# 获取所有索引(确保使用关键字参数)
try:
indices = es.indices.get(index="*uat*") # 使用关键字参数
except Exception as e:
logging.error(f"获取索引列表失败: {e}")
return
# 过滤出符合条件的索引
indices_to_delete = []
for index_name in indices.keys():
if not index_name.startswith(".") and "uat" in index_name:
# 提取索引名称中的日期部分
try:
index_date_str = index_name.split("-")[-1] # 假设日期在索引名称的最后部分
index_date = datetime.strptime(index_date_str, '%Y.%m.%d')
if index_date < five_days_ago:
indices_to_delete.append(index_name)
except Exception as e:
logging.warning(f"无法解析索引名称 {index_name} 的日期部分: {e}")
if not indices_to_delete:
logging.info(f"未找到符合条件的索引: 包含 'uat' 且日期为 {date_str}")
return
# 删除符合条件的索引
for index in indices_to_delete:
try:
es.indices.delete(index=index)
logging.info(f"成功删除索引: {index}")
except Exception as e:
logging.error(f"删除索引 {index} 时失败: {e}")
if __name__ == "__main__":
delete_old_indices()
定时任务
crontab -e
#每天23点30定时删除5天前的uat环境日志
30 23 * * * /usr/local/python-3.9/bin/python3.9 /data/delete_old_indices/delete_uat_old_indices.py >/dev/null 2>&1