Apache Ignite Messaging Example

发布于:2025-08-09 ⋅ 阅读:(28) ⋅ 点赞:(0)
import com.xquant.risk.infra.ignite.properties.IgniteBaseProperties;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCluster;
import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;

import java.util.Arrays;
import java.util.List;
import java.util.Objects;

import static org.apache.ignite.events.EventType.*;

@Slf4j
@AllArgsConstructor
public class Igniter {
    private final IgniteConfiguration cfg;
    private final IgniteEventListener igniteEventListener;
    private final IgniteBaseProperties basicProperties;


    public Ignite start() {
        Ignite ignite = Ignition.start(this.cfg);
        IgniteCluster cluster = ignite.cluster();

        if (Objects.nonNull(cluster) && !cluster.active()) {
            cluster.active(true);
        }

        List<Integer> joinEvent = Arrays.asList(EVT_NODE_JOINED, EVT_CLIENT_NODE_RECONNECTED);

        List<Integer> leaveEvent = Arrays.asList(EVT_NODE_LEFT, EVT_NODE_FAILED, EVT_NODE_SEGMENTED, EVT_CLIENT_NODE_DISCONNECTED);

        if (basicProperties.isListenDiscoveryEvent()) {
            ((TcpDiscoverySpi) cfg.getDiscoverySpi()).getSpiContext().addLocalEventListener(evt -> {
                if (log.isInfoEnabled()) {
                    log.info("当前Ignite回调事件: {}", evt.type());
                }
                try {
                    if (joinEvent.contains(evt.type())) {
                        igniteEventListener.onJoin(cluster, evt);
                    }
                    if (leaveEvent.contains(evt.type())) {
                        igniteEventListener.onLeave(cluster, evt);
                    }
                } catch (Throwable e) {
                    log.error("当前Ignite回调服务发生异常", e);
                    IgniteEventListener.exit("1");
                }
            }, EVTS_DISCOVERY);
        }
        log.info("Ignite start success !");

        return ignite;
    }

}

这是一个 Apache Ignite消息通信示例(Messaging Example),用于展示如何在集群节点之间通过消息传递进行通信。我们来逐步分析这个代码的结构和逻辑,帮助你深入理解其工作原理。


🧩 一、整体目标

该示例演示了:

  • 在 Ignite 集群中发送和接收两种类型的消息:
    • 有序消息(Ordered)
    • 无序消息(Unordered)
  • 使用 IgniteMessaging API 实现跨节点通信。
  • 利用 CountDownLatch 等待所有消息被确认处理完毕。

⚠️ 前提:至少启动两个节点(一个本地 + 一个或多个远程),且远程节点需使用 P2P 类加载配置(如 example-ignite.xml)。


📦 二、关键类与接口说明

类/接口 作用
Ignite 核心入口,代表一个节点实例
Ignition.start(...) 启动一个 Ignite 节点
IgniteMessaging 提供消息发送/监听功能
ClusterGroup 表示一组集群节点(比如所有远程节点)
CountDownLatch 并发工具,用于主线程等待其他线程完成任务

🔍 三、代码逐段解析

1️⃣ 静态常量定义

private static final int MESSAGES_NUM = 10;
  • 每种消息发送 10 条。
private enum TOPIC { ORDERED, UNORDERED }
  • 定义两个消息主题(可以理解为“频道”),分别用于区分有序和无序消息。

2️⃣ 主函数入口 main

try (Ignite ignite = Ignition.start(IgniteConstant.IGNITE_CONFIG_LOCATION)) {
  • 启动当前节点,加载配置文件(通常是 example-ignite.xml)。
  • 使用 try-with-resources 自动关闭。
if (!ExamplesUtils.checkMinTopologySize(ignite.cluster(), 2))
    return;
  • 检查集群是否至少有 2 个节点(本节点 + 至少一个远程节点),否则退出。
ClusterGroup rmtGrp = ignite.cluster().forRemotes();
  • 获取除当前节点外的所有远程节点组成的集群组。
int msgCnt = rmtGrp.nodes().size() * MESSAGES_NUM;
CountDownLatch orderedLatch = new CountDownLatch(msgCnt);
CountDownLatch unorderedLatch = new CountDownLatch(msgCnt);
  • 假设有 N 个远程节点,每个会收到 10 条消息 → 共 N * 10 条。
  • Latch 用来等待这些消息被“回执”确认。
localListen(ignite.message(ignite.cluster().forLocal()), orderedLatch, unorderedLatch);
  • 本地节点注册监听器,用来接收来自远程节点的“回声”消息(即确认),每收到一条就 countDown()
startListening(ignite, ignite.message(rmtGrp));
  • 所有远程节点上注册监听器,当它们收到消息时打印并回传给发送方。

✅ 这是关键点:远程节点的监听器是通过 remoteListen() 动态部署的(借助 P2P 类加载)。


3️⃣ 发送消息阶段

发送无序消息(Unordered)
for (int i = 0; i < MESSAGES_NUM; i++)
    ignite.message(rmtGrp).send(TOPIC.UNORDERED, Integer.toString(i));
  • 使用 .send() 发送,不保证顺序。
  • 所有远程节点都会收到编号 0~9 的消息,但可能乱序。
发送有序消息(Ordered)
for (int i = 0; i < MESSAGES_NUM; i++)
    ignite.message(rmtGrp).sendOrdered(TOPIC.ORDERED, Integer.toString(i), 0);
  • 使用 .sendOrdered(...) 发送,保证同一主题下的消息按发送顺序被处理。
  • 第三个参数 0 是“顺序化键”(order key),决定哪些消息共享同一个顺序队列。

💡 举例:如果多个线程发消息,但用相同的 order key,则这些消息会被串行化处理。


4️⃣ 等待确认

orderedLatch.await();
unorderedLatch.await();
  • 主线程阻塞,直到所有远程节点发回来的“回执”都被本地监听器接收完毕。
  • 每个远程节点收到消息后会回传一次,所以总共要等 N * 10 次。

🎯 四、核心方法详解

startListening(...) —— 远程节点监听设置

imsg.remoteListen(TOPIC.ORDERED, (nodeId, msg) -> { ... });
  • 在远程节点上注册一个持久化监听器,一旦收到 TOPIC.ORDERED 的消息就执行 Lambda。
  • 打印日志,并回传消息给原节点(作为 ACK 确认)。
  • 返回 true 表示继续监听。

🔁 回传路径:ignite.message(ignite.cluster().forNodeId(nodeId)).send(...)

这一步实现了“消息回响(Echo)”,让发送方知道消息已被接收。


localListen(...) —— 本地监听确认消息

imsg.localListen(TOPIC.ORDERED, (nodeId, msg) -> {
    orderedLatch.countDown();
    return orderedLatch.getCount() > 0;
});
  • 监听本地收到的来自远程节点的“回执”消息。
  • 每收到一条,计数器减一。
  • 当计数归零时自动停止监听(返回 false)。

🔄 五、消息流动流程图

[本地节点] 
   │
   ├── send(TOPIC.UNORDERED, "0") ──────────────┐
   ├── send(TOPIC.UNORDERED, "1") ──────────────┤
   │                                            ↓
   │                                    [远程节点A/B/C...]
   │                                           │
   │                                           ├── 打印:"Received unordered message [msg=0]"
   │                                           └── 回传 "0" 给本地节点
   │
   ├── sendOrdered(TOPIC.ORDERED, "0") ────────┐
   ├── sendOrdered(TOPIC.ORDERED, "1") ────────┤
   │                                           ↓
   │                                    [远程节点A/B/C...]
   │                                           │
   │                                           ├── 打印:"Received ordered message [msg=0]"
   │                                           └── 回传 "0" 给本地节点
   │
   ←────────────────────────────────────────────
   ← 回执消息到达本地节点 → latch.countDown()
   │
   ↓
当 latch 归零 → 输出 “Messaging example finished.”

⚖️ 六、有序 vs 无序 消息对比

特性 无序消息 .send() 有序消息 .sendOrdered()
是否保证顺序 ❌ 不保证 ✅ 同一 order key 下保证顺序
性能 高(异步并行) 稍低(需排队)
适用场景 日志广播、通知等 事件流、状态变更序列等

🛠️ 七、注意事项 & 常见问题

  1. P2P 类加载必须开启

    • 远程节点必须使用 example-ignite.xml 启动,其中启用了 <property name="peerClassLoadingEnabled" value="true"/>
    • 否则 remoteListen() 无法部署监听器类(序列化失败)
  2. 防火墙/网络问题

    • 多节点通信需要端口互通(默认 47100+)
  3. 消息可靠性

    • Ignite 消息是“最多一次”(at-most-once),不保证送达。
    • 若需可靠传递,应结合缓存事件或自定义重试机制。
  4. 资源清理

    • remoteListen() 注册的监听器会在节点重启或显式取消前一直存在。
    • 可通过返回的 UUID 调用 stopRemoteListen() 清理。

✅ 八、如何运行这个例子?

  1. 先启动一个或多个远程节点:

    ignite.sh examples/config/example-ignite.xml
    
  2. 再运行本例(MessagingExample.java)作为客户端或另一个服务端节点。

  3. 观察控制台输出,你会看到:

    • 本地节点打印:“Finished sending…”
    • 远程节点打印:“Received unordered/ordered message…”
    • 最终本地节点打印:“Messaging example finished.”

🧠 九、实际应用场景

场景 说明
分布式事件通知 某个节点状态变化,通知其他节点刷新缓存
日志聚合 收集各节点日志到中心节点
分布式协调 触发所有节点执行某操作(如重新加载配置)
实时消息推送 构建轻量级发布/订阅系统

✅ 总结

这个例子展示了 Apache Ignite 的 分布式消息通信能力,核心要点如下:

要点 说明
✅ 消息分类 支持有序/无序消息
✅ 动态监听 remoteListen() 可远程部署监听器
✅ 主题机制 使用枚举或字符串作为消息主题
✅ 回调确认 结合 CountDownLatch 实现同步等待
✅ 易于集成 适合做集群内轻量级通信

💬 一句话总结
这是一个典型的“广播 + 回执 + 同步等待”的分布式消息模式,利用 Ignite 的 Messaging API 实现跨节点通信,适用于需要节点间松耦合交互的场景。


如果你还想了解:

  • 如何实现“可靠消息”?
  • 如何用 Topic 做发布订阅?
  • 如何监听缓存事件?

欢迎继续提问!


网站公告

今日签到

点亮在社区的每一天
去签到