package org.apache.ignite.examples.streaming.wordcount;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.affinity.AffinityUuid;
import org.apache.ignite.examples.ExampleNodeStartup;
import org.apache.ignite.examples.ExamplesUtils;
import org.apache.ignite.examples.IgniteConstant;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.LineNumberReader;
/**
* Stream words into Ignite cache.
* To start the example, you should:
* <ul>
* <li>Start a few nodes using {@link ExampleNodeStartup}.</li>
* <li>Start streaming using {@link StreamWords}.</li>
* <li>Start querying popular words using {@link QueryWords}.</li>
* </ul>
*/
public class StreamWords {
/**
* Starts words streaming.
*
* @param args Command line arguments (none required).
* @throws Exception If failed.
*/
public static void main(String[] args) throws Exception {
// Mark this cluster member as client.
Ignition.setClientMode(true);
try (Ignite ignite = Ignition.start(IgniteConstant.IGNITE_CONFIG_LOCATION)) {
if (!ExamplesUtils.hasServerNodes(ignite))
return;
// The cache is configured with sliding window holding 1 second of the streaming data.
IgniteCache<AffinityUuid, String> stmCache = ignite.getOrCreateCache(CacheConfig.wordCache());
try (IgniteDataStreamer<AffinityUuid, String> stmr = ignite.dataStreamer(stmCache.getName())) {
// Stream words from "alice-in-wonderland" book.
while (true) {
InputStream in = StreamWords.class.getResourceAsStream("alice-in-wonderland.txt");
try (LineNumberReader rdr = new LineNumberReader(new InputStreamReader(in))) {
for (String line = rdr.readLine(); line != null; line = rdr.readLine()) {
for (String word : line.split(" "))
if (!word.isEmpty())
// Stream words into Ignite.
// By using AffinityUuid we ensure that identical
// words are processed on the same cluster node.
stmr.addData(new AffinityUuid(word), word);
}
}
}
}
}
}
}
这段代码是使用 Apache Ignite 实现一个 流式数据处理 的示例程序,主要功能是从《爱丽丝梦游仙境》这本书中读取文本内容,逐词提取并持续不断地将这些单词“流式”地发送到 Ignite 集群中进行分布式处理。
下面我们逐段分析并解释其含义和作用:
✅ 1. public static void main(String[] args) throws Exception {
标准的 Java 主函数入口,允许抛出异常(用于简化示例)。
✅ 2. Ignition.setClientMode(true);
Ignition.setClientMode(true);
- 作用:将当前这个节点设置为 客户端模式(Client Node)。
- 在 Ignite 集群中,有两种角色:
- Server Node:负责存储数据、执行计算任务。
- Client Node:不存储数据,仅用于连接集群、发送请求或接收数据。
- 这行代码表示:本程序不会参与数据存储,只作为生产者向集群发送数据。
📌 注意:通常在部署时,服务端节点会先启动,然后客户端再连接上去。
✅ 3. try (Ignite ignite = Ignition.start(IgniteConstant.IGNITE_CONFIG_LOCATION)) {
try (Ignite ignite = Ignition.start(IgniteConstant.IGNITE_CONFIG_LOCATION)) {
- 使用
Ignition.start()
启动一个 Ignite 实例(这里是客户端节点),并加载配置文件。 IgniteConstant.IGNITE_CONFIG_LOCATION
是一个常量,指向 XML 或其他格式的 Ignite 配置文件路径(如example-ignite.xml
)。- 使用了 try-with-resources 语法,确保
Ignite
实例在退出时自动关闭,释放资源。
✅ 4. if (!ExamplesUtils.hasServerNodes(ignite)) return;
if (!ExamplesUtils.hasServerNodes(ignite))
return;
- 工具方法判断当前集群中是否有 Server 节点存在。
- 如果没有 Server 节点(即集群为空或只启动了客户端),则直接退出程序。
- 目的:避免向空集群发送数据,防止数据丢失或错误。
✅ 5. 创建缓存(Cache)
IgniteCache<AffinityUuid, String> stmCache = ignite.getOrCreateCache(CacheConfig.wordCache());
- 获取或创建一个名为
wordCache
的分布式缓存。 - 缓存的键类型是
AffinityUuid
,值类型是String
(即单词本身)。 CacheConfig.wordCache()
返回一个预定义的缓存配置对象,可能设置了如下属性:- 缓存名称
- 是否为流式缓存
- 过期策略(比如滑动窗口保留 1 秒数据)
- 分区分布方式等
💡 提示:这个缓存被设计成一个“滑动窗口”(sliding window),只保留最近 1 秒的数据 —— 常用于实时流处理场景(如统计高频词)。
✅ 6. 使用 IgniteDataStreamer
流式写入数据
try (IgniteDataStreamer<AffinityUuid, String> stmr = ignite.dataStreamer(stmCache.getName())) {
IgniteDataStreamer
是 Ignite 提供的高性能工具,用于将大量数据高效地注入缓存,特别适合流式场景。- 它内部会批量提交、自动重试、控制内存使用,比直接
put
更高效。 stmr
是一个流处理器,负责把单词逐步“灌入”缓存。
✅ 7. 循环读取文本文件并发送数据
while (true) {
InputStream in = StreamWords.class.getResourceAsStream("alice-in-wonderland.txt");
try (LineNumberReader rdr = new LineNumberReader(new InputStreamReader(in))) {
for (String line = rdr.readLine(); line != null; line = rdr.readLine()) {
for (String word : line.split(" "))
if (!word.isEmpty())
stmr.addData(new AffinityUuid(word), word);
}
}
}
🔍 详细解释:
while (true)
:无限循环 —— 表示程序会不断重复发送这本书的内容,模拟持续的数据流。getResourceAsStream(...)
:从类路径下读取名为alice-in-wonderland.txt
的文本文件(通常是资源文件夹resources/
下)。LineNumberReader
:可以读取每一行,并知道当前行号(调试用)。- 对每一行进行
split(" ")
拆分成单词。 - 跳过空字符串(避免多个空格导致的空词)。
stmr.addData(...)
:将每个单词添加到数据流中。
✅ 8. 使用 AffinityUuid
实现 数据亲和性(Affinity Routing)
new AffinityUuid(word)
这是关键点!
AffinityUuid
是 Ignite 提供的一个特殊 ID 类型,结合 缓存键(key)的哈希值 来决定这条数据应该分配到哪个节点上。- 所有相同的单词(如 “the”)会被分配到 同一个集群节点 上。
- 这样做的好处是:
- 可以在固定节点上对某个单词做聚合操作(比如计数)
- 避免跨节点通信开销
- 实现“相同单词 → 相同处理节点”的一致性哈希机制
🎯 应用场景举例:如果你想统计每个单词出现的频率,就可以利用这个特性,在每个节点本地统计自己负责的单词数量。
🧠 总结:整体逻辑流程
- 启动一个 Ignite 客户端节点。
- 连接到已有的 Ignite 集群。
- 检查是否存在 Server 节点,若无则退出。
- 获取或创建一个用于流式处理的缓存
wordCache
。 - 创建一个
DataStreamer
用于高效写入数据。 - 不断循环读取《爱丽丝梦游仙境》文本文件。
- 将每行拆分为单词,去除空白。
- 使用
AffinityUuid(word)
作为 key,保证相同单词总被发送到同一节点。 - 通过
stmr.addData()
把单词推入缓存,供后续处理(如实时词频统计)。
📌 典型应用场景
这是一段典型的 流式数据生成器(Streaming Producer) 示例,常用于:
- 实时日志分析
- 文本流处理(如舆情监控)
- 单词计数(Word Count)—— 类似 MapReduce 经典案例
- 实时推荐系统中的行为流注入
⚠️ 注意事项(潜在问题)
问题 | 说明 |
---|---|
while(true) |
程序永远不会停止,除非手动中断(Ctrl+C)。生产环境应加退出条件或信号监听。 |
文件编码 | InputStreamReader 默认编码可能与文件不一致,建议显式指定 UTF-8。 |
分词粗糙 | 仅按空格分割,未处理标点符号(如 “hello!” 和 “hello” 被视为不同词)。 |
内存压力 | DataStreamer 虽然有缓冲机制,但无限发送仍可能导致背压(backpressure),需配置合理参数。 |
✅ 改进建议(可选)
// 显式指定字符集
new InputStreamReader(in, StandardCharsets.UTF_8)
// 更智能的分词(去掉标点)
Arrays.stream(line.split("\\W+")).filter(w -> !w.isEmpty())
// 控制流速(防止太快)
Thread.sleep(10); // 每处理完一轮暂停一下
📚 相关概念图解(简化)
[ alice-in-wonderland.txt ]
↓
逐行读取 + 分词
↓
每个单词 → AffinityUuid(word)
↓
DataStreamer 批量发送
↓
Ignite Cluster (Server Nodes)
┌─────────────────┐
│ Node 1: "the", "a" ... ← 相同单词始终到同一节点
│ Node 2: "hello", "world" ...
└─────────────────┘
↓
可用于实时计算:词频、趋势...
如果你还想知道如何 消费这些流数据(比如统计词频),我可以继续提供对应的 Continuous Query
或 Stream Listener
示例代码。需要吗?