常用任务管理命令
- 列出所有任务
curl -X GET "http://<es_host>:<es_port>/_tasks?detailed=true&pretty" -H 'Content-Type: application/json'
- 获取特定类型的任务
curl -X GET "http://<es_host>:<es_port>/_tasks?actions=<action_type>" -H 'Content-Type: application/json'
- 列出所有查询任务
curl -X GET "http://<es_host>:<es_port>/_tasks?detailed=true&actions=*search" -H 'Content-Type: application/json'
- 取消所有查询任务
如果 es 查询因大任务而卡住,可以临时采取此措施
curl -X POST "http://<es_host>:<es_port>/_tasks/_cancel?actions=*search" -H 'Content-Type: application/json'
- 获取特定任务的详细信息
curl -X GET "http://<es_host>:<es_port>/_tasks/<task_id>" -H 'Content-Type: application/json'
- 取消特定任务
curl -X POST "http://<es_host>:<es_port>/_tasks/_cancel?task_id=<task_id>" -H 'Content-Type: application/json'
- 获取特定节点上的任务
curl -X GET "http://<es_host>:<es_port>/_tasks?nodes=<node_id>" -H 'Content-Type: application/json'
实战
定时检测 Elasticsearch 后台运行的查询任务,如果任务运行时间超过 59 秒,则进行企业微信群告警通知
import requests
import time
# Elasticsearch节点的URL
es_url = "http://<es_user>:<es_pwd>@<es_host>:<es_port>/_tasks?detailed=true"
# 获取任务信息
response = requests.get(es_url)
tasks_data = response.json()
# 遍历节点和任务
for node_id, node_info in tasks_data.get('nodes', {}).items():
for task_id, task_info in node_info.get('tasks', {}).items():
running_time_seconds = task_info.get('running_time_in_nanos', 0) / 1e9
description = task_info.get('description', '')
if running_time_seconds > 59 and description:
running_time_formatted = f"{running_time_seconds:.2f}"
# 准备单个任务的Markdown内容
content = (
f"# 有大任务在 Elasticsearch 上运行\n"
f"- **任务 ID**: {task_id}\n"
f" **查询语句**: {description}\n"
f" **运行时间**: {running_time_formatted} seconds\n"
)
# 发送到Webhook
QYWX_BODY = {
"msgtype": "markdown",
"markdown": {
"content": content
}
}
BOT_KEY = "xxxxxxxxxxxxxxxxx" # 企业微信群 bot key
webhook_url = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={BOT_KEY}"
headers = {'Content-Type': 'application/json; charset=utf-8'}
response = requests.post(webhook_url, json=QYWX_BODY, headers=headers)
# 检查响应状态
if response.status_code == 200:
print(f"Notification for Task ID {task_id} sent successfully.")
else:
print(f"Failed to send notification for Task ID {task_id}. Status code: {response.status_code}, Response: {response.text}")
# 等待 2 秒
time.sleep(2)
print("Processing completed.")