Apache Ignite流处理StreamTransformer示例程序

发布于:2025-08-07 ⋅ 阅读:(19) ⋅ 点赞:(0)
package org.apache.ignite.examples.streaming;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.examples.ExampleNodeStartup;
import org.apache.ignite.examples.ExamplesUtils;
import org.apache.ignite.examples.IgniteConstant;
import org.apache.ignite.stream.StreamTransformer;

import java.util.List;
import java.util.Random;

/**
 * Stream random numbers into the streaming cache.
 * To start the example, you should:
 * <ul>
 *     <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
 *     <li>Start streaming using {@link StreamTransformerExample}.</li>
 * </ul>
 * <p>
 * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
 */
public class StreamTransformerExample {
    /** Random number generator. */
    private static final Random RAND = new Random();

    /** Range within which to generate numbers. */
    private static final int RANGE = 1000;

    /** Cache name. */
    private static final String CACHE_NAME = "randomNumbers";

    /** @param args Command line arguments. */
    public static void main(String[] args) throws Exception {
        // Mark this cluster member as client.
        Ignition.setClientMode(true);

        try (Ignite ignite = Ignition.start(IgniteConstant.IGNITE_CONFIG_LOCATION)) {
            if (!ExamplesUtils.hasServerNodes(ignite))
                return;

            CacheConfiguration<Integer, Long> cfg = new CacheConfiguration<>(CACHE_NAME);

            // Index key and value.
            cfg.setIndexedTypes(Integer.class, Long.class);

            // Auto-close cache at the end of the example.
            try (IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(cfg)) {
                try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer(stmCache.getName())) {
                    // Allow data updates.
                    stmr.allowOverwrite(true);

                    // Configure data transformation to count random numbers added to the stream.
                    stmr.receiver(StreamTransformer.from((e, arg) -> {
                        // Get current count.
                        Long val = e.getValue();

                        // Increment count by 1.
                        e.setValue(val == null ? 1L : val + 1);

                        return null;
                    }));

                    // Stream 10 million of random numbers into the streamer cache.
                    for (int i = 1; i <= 10_000_000; i++) {
                        stmr.addData(RAND.nextInt(RANGE), 1L);

                        if (i % 500_000 == 0)
                            System.out.println("Number of tuples streamed into Ignite: " + i);
                    }
                }

                // Query top 10 most popular numbers every.
                SqlFieldsQuery top10Qry = new SqlFieldsQuery("select _key, _val from Long order by _val desc limit 10");

                // Execute queries.
                List<List<?>> top10 = stmCache.query(top10Qry).getAll();

                System.out.println("Top 10 most popular numbers:");

                // Print top 10 words.
                ExamplesUtils.printQueryResults(top10);
            }
            finally {
                // Distributed cache could be removed from cluster only by #destroyCache() call.
                ignite.destroyCache(CACHE_NAME);
            }
        }
    }
} 

这段代码是 Apache Ignite 的另一个流处理示例程序,名为 StreamTransformerExample。它的核心功能是:

统计在 1000 万个随机生成的数字中,哪些数字出现得最频繁(即“热门数字排行榜”),使用流式处理 + 内存更新的方式实时维护计数。


我们来一步步解析这个程序的逻辑和设计思想。


🔍 一、整体功能概览

  • 生成 10,000,000 个范围在 [0, 999] 的随机整数。
  • 每个数字作为缓存的 key,其出现次数作为 value
  • 使用 IgniteDataStreamer 将这些数据“流”入缓存。
  • 利用 StreamTransformer 在写入时自动进行“累加”操作:
    • 如果这个数字第一次出现,计数设为 1;
    • 如果已存在,则将其值 +1。
  • 最后通过 SQL 查询出 出现次数最多的前 10 个数字

🎯 目标:实现一个高吞吐的实时频次统计系统(类似“热词统计”)。


🧱 二、关键组件说明

1. IgniteDataStreamer<Integer, Long>

  • 高效地将大量数据“流式”注入 Ignite 缓存。
  • 支持批量提交、自动分区、背压控制等,适合大数据量写入。

2. StreamTransformer.from(...)

stmr.receiver(StreamTransformer.from((e, arg) -> {
    Long val = e.getValue();
    e.setValue(val == null ? 1L : val + 1);
    return null;
}));
  • 这是本例的核心!它允许你在数据真正写入缓存之前,对缓存中的值进行原子性修改
  • e 是缓存条目(CacheEntry),代表 (key, value)
  • 当你执行 stmr.addData(key, 1L) 时:
    • 实际上不是简单地 put,而是:
      1. 查看缓存中是否已有该 key;
      2. 如果没有 → 设为 1;
      3. 如果有 → 原值 +1;
      4. 写回缓存。
  • ✅ 这个过程是线程安全且原子的(底层基于 EntryProcessor)。

💡 类比:就像 Redis 的 INCR 命令,但这里是基于流式输入自动完成的。


3. 缓存配置

CacheConfiguration<Integer, Long> cfg = new CacheConfiguration<>(CACHE_NAME);
cfg.setIndexedTypes(Integer.class, Long.class);
  • 缓存名:randomNumbers
  • Key:Integer(随机数本身)
  • Value:Long(出现次数)
  • 启用索引:支持 SQL 查询 _key_val

📥 三、数据流过程详解

for (int i = 1; i <= 10_000_000; i++) {
    stmr.addData(RAND.nextInt(RANGE), 1L);
}

假设某次生成了数字 42

步骤 操作
1️⃣ 调用 addData(42, 1L)
2️⃣ StreamTransformer 触发
3️⃣ 查找 key=42 是否在缓存中
4a️⃣ 若不存在 → 设置 value = 1
4b️⃣ 若存在(比如是 5)→ 设置 value = 6
5️⃣ 更新缓存

✅ 所以最终每个 key 的 value 就是它在整个流中出现的总次数。


🔎 四、结果查询:Top 10 最常出现的数字

SqlFieldsQuery top10Qry = new SqlFieldsQuery(
    "select _key, _val from Long order by _val desc limit 10"
);
  • _key: 缓存的 key(也就是那个随机数)
  • _val: 缓存的 value(出现次数)
  • from Long: 因为 value 类型是 Long,Ignite 会自动创建类型名为 Long 的 SQL 表(也可以自定义)
  • _val 降序排列,取前 10 条

📌 输出示例:

Top 10 most popular numbers:
[42, 10234]
[123, 10187]
[999, 10056]
...

⚙️ 五、其他重要设置

设置 说明
Ignition.setClientMode(true) 当前节点是客户端,不存储数据,只连接集群
stmr.allowOverwrite(true) 允许覆盖已有数据(配合 transformer 使用)
try-with-resources 自动关闭 dataStreamercache
ignite.destroyCache(...) 示例结束清理资源

🧠 六、与上一个例子(StreamVisitor)的区别

特性 StreamVisitorExample StreamTransformerExample
数据处理方式 自定义逻辑(更新另一个缓存) 原子性更新当前缓存的 value
是否写入目标缓存 ❌ 不写 mktCache ✅ 写入 randomNumbers 缓存
更新逻辑位置 在客户端拦截处理 在服务端缓存条目上原地更新
适用场景 流式 ETL、跨缓存更新 实时计数、频次统计、累加器
性能 较低(需要显式 get/put) 更高(原子操作,减少网络往返)

StreamTransformer 更高效,适合“计数器”类场景。


✅ 七、总结一句话

这个程序演示了如何使用 Ignite 的流式处理 + 变换器(StreamTransformer),在数据写入的同时自动完成“频次统计”,最终通过 SQL 快速查出最常出现的数字。


💡 类比理解

想象你在做一个“抽奖系统”或“彩票分析”:

  • 每秒钟有成千上万次抽奖,产生随机号码;
  • 你想知道:历史上哪些号码开出最多?
  • 传统做法:先存所有记录,再用 MapReduce 统计 → 慢
  • Ignite 做法:每来一个号码,立刻在内存中 +1 → 实时更新排行榜

👉 这就是 内存中的实时聚合分析系统


🚀 可扩展方向

你可以基于这个例子做很多改进:

  1. 加时间窗口:只统计最近 1 小时的热门数字(结合 ExpiryPolicy
  2. 分组统计:奇数 vs 偶数、大小区间
  3. 可视化:把结果推送到前端实时展示
  4. 持久化:将最终结果保存到数据库

需要我帮你把这个例子改成“实时单词计数”或者“用户点击排行榜”吗?


网站公告

今日签到

点亮在社区的每一天
去签到