package org.apache.ignite.examples.streaming;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.examples.ExampleNodeStartup;
import org.apache.ignite.examples.ExamplesUtils;
import org.apache.ignite.examples.IgniteConstant;
import org.apache.ignite.stream.StreamTransformer;
import java.util.List;
import java.util.Random;
/**
* Stream random numbers into the streaming cache.
* To start the example, you should:
* <ul>
* <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
* <li>Start streaming using {@link StreamTransformerExample}.</li>
* </ul>
* <p>
* You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
*/
public class StreamTransformerExample {
/** Random number generator. */
private static final Random RAND = new Random();
/** Range within which to generate numbers. */
private static final int RANGE = 1000;
/** Cache name. */
private static final String CACHE_NAME = "randomNumbers";
/** @param args Command line arguments. */
public static void main(String[] args) throws Exception {
// Mark this cluster member as client.
Ignition.setClientMode(true);
try (Ignite ignite = Ignition.start(IgniteConstant.IGNITE_CONFIG_LOCATION)) {
if (!ExamplesUtils.hasServerNodes(ignite))
return;
CacheConfiguration<Integer, Long> cfg = new CacheConfiguration<>(CACHE_NAME);
// Index key and value.
cfg.setIndexedTypes(Integer.class, Long.class);
// Auto-close cache at the end of the example.
try (IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(cfg)) {
try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer(stmCache.getName())) {
// Allow data updates.
stmr.allowOverwrite(true);
// Configure data transformation to count random numbers added to the stream.
stmr.receiver(StreamTransformer.from((e, arg) -> {
// Get current count.
Long val = e.getValue();
// Increment count by 1.
e.setValue(val == null ? 1L : val + 1);
return null;
}));
// Stream 10 million of random numbers into the streamer cache.
for (int i = 1; i <= 10_000_000; i++) {
stmr.addData(RAND.nextInt(RANGE), 1L);
if (i % 500_000 == 0)
System.out.println("Number of tuples streamed into Ignite: " + i);
}
}
// Query top 10 most popular numbers every.
SqlFieldsQuery top10Qry = new SqlFieldsQuery("select _key, _val from Long order by _val desc limit 10");
// Execute queries.
List<List<?>> top10 = stmCache.query(top10Qry).getAll();
System.out.println("Top 10 most popular numbers:");
// Print top 10 words.
ExamplesUtils.printQueryResults(top10);
}
finally {
// Distributed cache could be removed from cluster only by #destroyCache() call.
ignite.destroyCache(CACHE_NAME);
}
}
}
}
这段代码是 Apache Ignite 的另一个流处理示例程序,名为 StreamTransformerExample
。它的核心功能是:
统计在 1000 万个随机生成的数字中,哪些数字出现得最频繁(即“热门数字排行榜”),使用流式处理 + 内存更新的方式实时维护计数。
我们来一步步解析这个程序的逻辑和设计思想。
🔍 一、整体功能概览
- 生成 10,000,000 个范围在
[0, 999]
的随机整数。 - 每个数字作为缓存的 key,其出现次数作为 value。
- 使用
IgniteDataStreamer
将这些数据“流”入缓存。 - 利用
StreamTransformer
在写入时自动进行“累加”操作:- 如果这个数字第一次出现,计数设为 1;
- 如果已存在,则将其值 +1。
- 最后通过 SQL 查询出 出现次数最多的前 10 个数字。
🎯 目标:实现一个高吞吐的实时频次统计系统(类似“热词统计”)。
🧱 二、关键组件说明
1. IgniteDataStreamer<Integer, Long>
- 高效地将大量数据“流式”注入 Ignite 缓存。
- 支持批量提交、自动分区、背压控制等,适合大数据量写入。
2. StreamTransformer.from(...)
stmr.receiver(StreamTransformer.from((e, arg) -> {
Long val = e.getValue();
e.setValue(val == null ? 1L : val + 1);
return null;
}));
- 这是本例的核心!它允许你在数据真正写入缓存之前,对缓存中的值进行原子性修改。
e
是缓存条目(CacheEntry
),代表(key, value)
。- 当你执行
stmr.addData(key, 1L)
时:- 实际上不是简单地 put,而是:
- 查看缓存中是否已有该 key;
- 如果没有 → 设为 1;
- 如果有 → 原值 +1;
- 写回缓存。
- 实际上不是简单地 put,而是:
- ✅ 这个过程是线程安全且原子的(底层基于
EntryProcessor
)。
💡 类比:就像 Redis 的
INCR
命令,但这里是基于流式输入自动完成的。
3. 缓存配置
CacheConfiguration<Integer, Long> cfg = new CacheConfiguration<>(CACHE_NAME);
cfg.setIndexedTypes(Integer.class, Long.class);
- 缓存名:
randomNumbers
- Key:
Integer
(随机数本身) - Value:
Long
(出现次数) - 启用索引:支持 SQL 查询
_key
和_val
📥 三、数据流过程详解
for (int i = 1; i <= 10_000_000; i++) {
stmr.addData(RAND.nextInt(RANGE), 1L);
}
假设某次生成了数字 42
:
步骤 | 操作 |
---|---|
1️⃣ | 调用 addData(42, 1L) |
2️⃣ | StreamTransformer 触发 |
3️⃣ | 查找 key=42 是否在缓存中 |
4a️⃣ | 若不存在 → 设置 value = 1 |
4b️⃣ | 若存在(比如是 5)→ 设置 value = 6 |
5️⃣ | 更新缓存 |
✅ 所以最终每个 key 的 value 就是它在整个流中出现的总次数。
🔎 四、结果查询:Top 10 最常出现的数字
SqlFieldsQuery top10Qry = new SqlFieldsQuery(
"select _key, _val from Long order by _val desc limit 10"
);
_key
: 缓存的 key(也就是那个随机数)_val
: 缓存的 value(出现次数)from Long
: 因为 value 类型是Long
,Ignite 会自动创建类型名为Long
的 SQL 表(也可以自定义)- 按
_val
降序排列,取前 10 条
📌 输出示例:
Top 10 most popular numbers:
[42, 10234]
[123, 10187]
[999, 10056]
...
⚙️ 五、其他重要设置
设置 | 说明 |
---|---|
Ignition.setClientMode(true) |
当前节点是客户端,不存储数据,只连接集群 |
stmr.allowOverwrite(true) |
允许覆盖已有数据(配合 transformer 使用) |
try-with-resources |
自动关闭 dataStreamer 和 cache |
ignite.destroyCache(...) |
示例结束清理资源 |
🧠 六、与上一个例子(StreamVisitor)的区别
特性 | StreamVisitorExample |
StreamTransformerExample |
---|---|---|
数据处理方式 | 自定义逻辑(更新另一个缓存) | 原子性更新当前缓存的 value |
是否写入目标缓存 | ❌ 不写 mktCache |
✅ 写入 randomNumbers 缓存 |
更新逻辑位置 | 在客户端拦截处理 | 在服务端缓存条目上原地更新 |
适用场景 | 流式 ETL、跨缓存更新 | 实时计数、频次统计、累加器 |
性能 | 较低(需要显式 get/put) | 更高(原子操作,减少网络往返) |
✅ StreamTransformer
更高效,适合“计数器”类场景。
✅ 七、总结一句话
这个程序演示了如何使用 Ignite 的流式处理 + 变换器(StreamTransformer),在数据写入的同时自动完成“频次统计”,最终通过 SQL 快速查出最常出现的数字。
💡 类比理解
想象你在做一个“抽奖系统”或“彩票分析”:
- 每秒钟有成千上万次抽奖,产生随机号码;
- 你想知道:历史上哪些号码开出最多?
- 传统做法:先存所有记录,再用 MapReduce 统计 → 慢
- Ignite 做法:每来一个号码,立刻在内存中 +1 → 实时更新排行榜
👉 这就是 内存中的实时聚合分析系统。
🚀 可扩展方向
你可以基于这个例子做很多改进:
- 加时间窗口:只统计最近 1 小时的热门数字(结合
ExpiryPolicy
) - 分组统计:奇数 vs 偶数、大小区间
- 可视化:把结果推送到前端实时展示
- 持久化:将最终结果保存到数据库
需要我帮你把这个例子改成“实时单词计数”或者“用户点击排行榜”吗?