1.服务端
"""
@File: rabbitmq_server.py
@Date: 2025/6/26 10:42
@Author: xxx
@Description:
1. RabbitMQ服务端,支持多节点命令执行
2. 作为被控节点运行,可接收定向命令并返回结果
"""
import ssl
import pika
import time
import json
import socket
import logging
import subprocess
import configparser
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
file_handler = logging.FileHandler('rabbitmq_server.log', encoding='utf-8')
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(lineno)04d - %(message)s', datefmt="%Y-%m-%d %H:%M:%S"))
logger.addHandler(file_handler)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(lineno)04d - %(message)s', datefmt="%Y-%m-%d %H:%M:%S"))
logger.addHandler(console_handler)
RABBITMQ_HOST_CONF = "/etc/rabbitmq/rabbitmq.conf"
class RabbitMQServer:
"""
RabbitMQ RPC服务器类
功能:接收并执行来自客户端的定向命令
"""
def __init__(self, node_name=None, mq_user="rabbitmq", mq_password="rabbitmq@123",
mq_virtual_host="/", mq_host=None, mq_port=5671,
mq_ca="/opt/ssl/ca_certificate.pem"):
"""
初始化RabbitMQ服务端
:param node_name: 节点名称标识(唯一)
:param mq_user: RabbitMQ用户名
:param mq_password: RabbitMQ密码
:param mq_virtual_host: 虚拟主机
:param mq_host: RabbitMQ服务器IP
:param mq_port: RabbitMQ服务端口
:param mq_ca: SSL证书路径
"""
self.NODE_NAME = node_name if node_name else socket.gethostname()
self.RABBITMQ_USER = mq_user
self.RABBITMQ_UNLOCK_CODE = mq_password
self.RABBITMQ_VIRTUAL_HOST = mq_virtual_host
self.RABBITMQ_HOST = mq_host if mq_host else self.get_option(RABBITMQ_HOST_CONF, "global", "rabbitmq_host")
self.RABBITMQ_PORT = mq_port
self.SSL_CA_PATH = mq_ca
self._setup_connection()
def get_option(self, file_path, section, option):
"""
获取 file_path 配置项值,若配置文件没有,返回空字符串
:param section: section字符串,例如:'global'
:param option: key值,例如:'manage_nodes'
:return: 字符串类型数据
"""
parser = configparser.ConfigParser()
parser.read(file_path)
if not parser.has_option(section, option):
return ""
else:
return parser.get(section, option)
def _get_ssl_options(self):
"""配置SSL安全连接选项"""
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
context.load_verify_locations(self.SSL_CA_PATH)
return pika.SSLOptions(
context,
"localhost"
)
def _setup_connection(self):
"""建立RabbitMQ连接并设置队列"""
credentials = pika.PlainCredentials(
self.RABBITMQ_USER,
self.RABBITMQ_UNLOCK_CODE
)
connection_params = pika.ConnectionParameters(
host=self.RABBITMQ_HOST,
port=self.RABBITMQ_PORT,
virtual_host=self.RABBITMQ_VIRTUAL_HOST,
credentials=credentials,
ssl_options=self._get_ssl_options(),
heartbeat=600
)
self.connection = pika.BlockingConnection(connection_params)
self.channel = self.connection.channel()
self.channel.queue_declare(
queue=self.NODE_NAME,
durable=True
)
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(
queue=self.NODE_NAME,
on_message_callback=self._execute_command,
auto_ack=False
)
def _execute_command(self, ch, method, props, body):
"""执行接收到的命令并返回结果"""
try:
message = json.loads(body.decode('utf-8'))
command = message.get('command', '')
target = message.get('target', '')
logger.info(f" [x] 收到({target})命令:{command}")
if target != self.NODE_NAME:
logger.warning(f" [x] 收到非本节点({self.NODE_NAME})命令,已忽略")
ch.basic_ack(delivery_tag=method.delivery_tag)
return
logger.info(f" [*] 执行命令 【{command}】...")
try:
output = subprocess.check_output(
command,
shell=True,
stderr=subprocess.STDOUT,
timeout=60
)
response = output.decode('utf-8')
except subprocess.TimeoutExpired:
response = "Error: Command timed out"
except subprocess.CalledProcessError as e:
response = f"Error: {e.output.decode('utf-8')}"
except Exception as e:
response = f"System Error: {str(e)}"
ch.basic_publish(
exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(
correlation_id=props.correlation_id,
delivery_mode=2
),
body=response.encode('utf-8')
)
logger.info(f" [*] 命令执行完成")
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
logger.exception(f" [x] 消息处理异常: {str(e)}")
ch.basic_nack(delivery_tag=method.delivery_tag)
def start(self, max_retries=5, retry_delay=10):
"""
启动RabbitMQ服务并持续监听消息
功能:管理服务生命周期,处理连接异常和重试逻辑
:param max_retries: 最大重试次数,默认5次
:param retry_delay: 重试间隔时间(秒),默认10秒
:return:
"""
retry_count = 0
while True:
try:
logger.info(f" [*] {self.NODE_NAME} 节点服务启动 (尝试 {retry_count + 1}/{max_retries})")
logger.info(f" [*] 等待队列 {self.NODE_NAME} 中的请求...")
if not hasattr(self, 'connection') or self.connection.is_closed:
self._setup_connection()
self.channel.start_consuming()
except pika.exceptions.AMQPConnectionError as e:
retry_count += 1
logger.exception(f"连接失败: {str(e)}")
if retry_count >= max_retries:
logger.error(" [x] 达到最大重试次数,终止服务")
self.close()
break
logger.warning(f" [*] {retry_delay}秒后尝试重新连接...")
time.sleep(retry_delay)
except KeyboardInterrupt:
logger.error("\n [x] 接收到终止信号")
self.close()
logger.error(" [x] 服务已停止")
break
except Exception as e:
logger.exception(f"服务异常: {str(e)}")
time.sleep(retry_delay)
def close(self):
"""
安全关闭RabbitMQ连接
功能:清理资源,确保连接被正确关闭
:return:
"""
if hasattr(self, 'connection') and not self.connection.is_closed:
self.connection.close()
logger.info(" [x] 连接已安全关闭")
if __name__ == '__main__':
server = RabbitMQServer()
try:
server.start()
except KeyboardInterrupt:
logger.error("\n [x] 接收到终止信号")
server.close()
logger.error(" [x] 服务已停止")
2.客户端
"""
@File: rabbitmq_client.py
@Date: 2025/6/26 10:43
@Author: xxx
@Description:
1. RabbitMQ客户端类,支持向指定节点发送SSH命令
2. 作为控制端运行,可定向发送命令并接收执行结果
"""
import ssl
import pika
import time
import uuid
import json
import socket
import logging
import configparser
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
file_handler = logging.FileHandler('rabbitmq_client.log', encoding='utf-8')
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(lineno)04d - %(message)s', datefmt="%Y-%m-%d %H:%M:%S"))
logger.addHandler(file_handler)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(lineno)04d - %(message)s', datefmt="%Y-%m-%d %H:%M:%S"))
logger.addHandler(console_handler)
RABBITMQ_HOST_CONF = "/etc/rabbitmq/rabbitmq.conf"
class RabbitMQClient:
"""
RabbitMQ RPC客户端类
功能:向指定节点发送命令并获取执行结果
"""
def __init__(self, mq_user="rabbitmq", mq_password="rabbitmq@123", mq_virtual_host="/",
mq_host=None, mq_port=5671, mq_ca="/opt/ssl/ca_certificate.pem"):
"""
初始化RabbitMQ客户端
:param mq_user: RabbitMQ用户名
:param mq_password: RabbitMQ密码
:param mq_virtual_host: 虚拟主机
:param mq_host: RabbitMQ服务器IP
:param mq_port: RabbitMQ服务端口
:param mq_ca: SSL证书路径
"""
self.RABBITMQ_USER = mq_user
self.RABBITMQ_UNLOCK_CODE = mq_password
self.RABBITMQ_VIRTUAL_HOST = mq_virtual_host
self.RABBITMQ_HOST = mq_host if mq_host else self.get_option(RABBITMQ_HOST_CONF, "global", "rabbitmq_host")
self.RABBITMQ_PORT = mq_port
self.SSL_CA_PATH = mq_ca
self.response = None
self.corr_id = None
logger.info(" [x] 正在建立连接 ...")
self._connect()
logger.info(" [x] 连接建立成功")
def get_option(self, file_path, section, option):
"""
获取 file_path 配置项值,若配置文件没有,返回空字符串
:param section: section字符串,例如:'global'
:param option: key值,例如:'manage_nodes'
:return: 字符串类型数据
"""
parser = configparser.ConfigParser()
parser.read(file_path)
if not parser.has_option(section, option):
return ""
else:
return parser.get(section, option)
def _connect(self):
"""
建立RabbitMQ连接并初始化回调队列
功能:配置安全连接参数、创建通信信道、设置消息回调处理
:return:
"""
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
ssl_context.load_verify_locations(self.SSL_CA_PATH)
ssl_options = pika.SSLOptions(ssl_context, "localhost")
credentials = pika.PlainCredentials(
self.RABBITMQ_USER,
self.RABBITMQ_UNLOCK_CODE
)
connection_params = pika.ConnectionParameters(
host=self.RABBITMQ_HOST,
port=self.RABBITMQ_PORT,
virtual_host=self.RABBITMQ_VIRTUAL_HOST,
credentials=credentials,
ssl_options=ssl_options,
heartbeat=60
)
self.connection = pika.BlockingConnection(connection_params)
self.channel = self.connection.channel()
result = self.channel.queue_declare(
queue='',
exclusive=True
)
self.callback_queue = result.method.queue
self.channel.basic_consume(
queue=self.callback_queue,
on_message_callback=self._on_response,
auto_ack=False
)
def _on_response(self, ch, method, props, body):
"""
RPC模式下的响应消息回调处理函数
功能:匹配并接收服务端返回的命令执行结果
处理逻辑:
1.通过correlation_id匹配对应的请求
2.将二进制消息体解码为字符串
3.存储结果供execute_command方法获取
:param ch: (pika.channel.Channel): 接收到消息的信道对象
:param method: (pika.spec.Basic.Deliver): 包含投递信息(如delivery_tag)
:param props: (pika.spec.BasicProperties): 消息属性(含correlation_id等)
:param body: (bytes): 消息体内容(服务端返回的执行结果)
:return:
"""
try:
if self.corr_id == props.correlation_id:
self.response = body.decode('utf-8')
except UnicodeDecodeError as e:
self.response = f"解码失败: {str(e)}"
def execute_command(self, command, target_node=None, timeout=60):
"""
向指定RabbitMQ节点发送命令并获取执行结果(RPC模式)
:param command (str): 要执行的shell命令字符串(如"ls -l")
:param target_node (str): 目标节点标识,对应服务端的队列名
- 默认None表示发送到当前主机节点
:param timeout (int): 等待响应的超时时间(秒),默认60秒
:return str: 命令执行结果文本
异常:
TimeoutError: 超过指定时间未收到响应时抛出
AMQP相关异常: 消息发送失败时抛出
向指定节点执行远程命令
"""
self.response = None
self.corr_id = str(uuid.uuid4())
if not target_node:
target_node = socket.gethostname()
message = {
"command": command,
"target": target_node,
"timestamp": time.time()
}
self.channel.basic_publish(
exchange='',
routing_key=target_node,
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=json.dumps(message).encode('utf-8')
)
start_time = time.time()
while self.response is None:
self.connection.process_data_events()
if time.time() - start_time > timeout:
raise TimeoutError(f"等待节点 {target_node} 响应超时")
time.sleep(0.1)
return self.response
def close(self):
"""
安全关闭RabbitMQ连接
功能:
1. 清理网络连接资源
2. 自动删除临时队列(exclusive队列)
3. 防止资源泄漏
:return:
"""
if self.connection and not self.connection.is_closed:
self.connection.close()
logger.warning(" [x] 连接已关闭")
if __name__ == '__main__':
client = RabbitMQClient()
try:
nodes = ["node247", "node248", "node249"]
for node in nodes:
try:
logger.info(f"\n向节点 {node} 执行命令: hostname")
logger.info(client.execute_command(command="hostname", target_node=node))
except Exception as e:
logger.exception(f"节点 {node} 执行失败: {str(e)}")
try:
logger.info(f"\n向节点 {node} 执行命令: ls -l /opt/")
logger.info(client.execute_command(command="ls -l /opt/", target_node=node))
except Exception as e:
logger.exception(f"节点 {node} 执行失败: {str(e)}")
try:
logger.info(f"\n向节点 {node} 执行命令: date")
logger.info(client.execute_command(command="date", target_node=node))
except Exception as e:
logger.exception(f"节点 {node} 执行失败: {str(e)}")
finally:
client.close()
3.调用结果
192.168.120.17 node17
192.168.120.18 node18
192.168.120.19 node19
python3 rabbitmq_server.py
192.168.120.17 node17
python3 rabbitmq_client.py