Apache Ignite Data Streaming 案例 StreamWords

发布于:2025-08-02 ⋅ 阅读:(16) ⋅ 点赞:(0)
package org.apache.ignite.examples.streaming.wordcount;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.affinity.AffinityUuid;
import org.apache.ignite.examples.ExampleNodeStartup;
import org.apache.ignite.examples.ExamplesUtils;
import org.apache.ignite.examples.IgniteConstant;

import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.LineNumberReader;

/**
 * Stream words into Ignite cache.
 * To start the example, you should:
 * <ul>
 *     <li>Start a few nodes using {@link ExampleNodeStartup}.</li>
 *     <li>Start streaming using {@link StreamWords}.</li>
 *     <li>Start querying popular words using {@link QueryWords}.</li>
 * </ul>
 */
public class StreamWords {
    /**
     * Starts words streaming.
     *
     * @param args Command line arguments (none required).
     * @throws Exception If failed.
     */
    public static void main(String[] args) throws Exception {
        // Mark this cluster member as client.
        Ignition.setClientMode(true);

        try (Ignite ignite = Ignition.start(IgniteConstant.IGNITE_CONFIG_LOCATION)) {
            if (!ExamplesUtils.hasServerNodes(ignite))
                return;

            // The cache is configured with sliding window holding 1 second of the streaming data.
            IgniteCache<AffinityUuid, String> stmCache = ignite.getOrCreateCache(CacheConfig.wordCache());

            try (IgniteDataStreamer<AffinityUuid, String> stmr = ignite.dataStreamer(stmCache.getName())) {
                // Stream words from "alice-in-wonderland" book.
                while (true) {
                    InputStream in = StreamWords.class.getResourceAsStream("alice-in-wonderland.txt");

                    try (LineNumberReader rdr = new LineNumberReader(new InputStreamReader(in))) {
                        for (String line = rdr.readLine(); line != null; line = rdr.readLine()) {
                            for (String word : line.split(" "))
                                if (!word.isEmpty())
                                    // Stream words into Ignite.
                                    // By using AffinityUuid we ensure that identical
                                    // words are processed on the same cluster node.
                                    stmr.addData(new AffinityUuid(word), word);
                        }
                    }
                }
            }
        }
    }
}

这段代码是使用 Apache Ignite 实现一个 流式数据处理 的示例程序,主要功能是从《爱丽丝梦游仙境》这本书中读取文本内容,逐词提取并持续不断地将这些单词“流式”地发送到 Ignite 集群中进行分布式处理。

下面我们逐段分析并解释其含义和作用:


✅ 1. public static void main(String[] args) throws Exception {

标准的 Java 主函数入口,允许抛出异常(用于简化示例)。


✅ 2. Ignition.setClientMode(true);

Ignition.setClientMode(true);
  • 作用:将当前这个节点设置为 客户端模式(Client Node)
  • 在 Ignite 集群中,有两种角色:
    • Server Node:负责存储数据、执行计算任务。
    • Client Node:不存储数据,仅用于连接集群、发送请求或接收数据。
  • 这行代码表示:本程序不会参与数据存储,只作为生产者向集群发送数据

📌 注意:通常在部署时,服务端节点会先启动,然后客户端再连接上去。


✅ 3. try (Ignite ignite = Ignition.start(IgniteConstant.IGNITE_CONFIG_LOCATION)) {

try (Ignite ignite = Ignition.start(IgniteConstant.IGNITE_CONFIG_LOCATION)) {
  • 使用 Ignition.start() 启动一个 Ignite 实例(这里是客户端节点),并加载配置文件。
  • IgniteConstant.IGNITE_CONFIG_LOCATION 是一个常量,指向 XML 或其他格式的 Ignite 配置文件路径(如 example-ignite.xml)。
  • 使用了 try-with-resources 语法,确保 Ignite 实例在退出时自动关闭,释放资源。

✅ 4. if (!ExamplesUtils.hasServerNodes(ignite)) return;

if (!ExamplesUtils.hasServerNodes(ignite))
    return;
  • 工具方法判断当前集群中是否有 Server 节点存在
  • 如果没有 Server 节点(即集群为空或只启动了客户端),则直接退出程序。
  • 目的:避免向空集群发送数据,防止数据丢失或错误。

✅ 5. 创建缓存(Cache)

IgniteCache<AffinityUuid, String> stmCache = ignite.getOrCreateCache(CacheConfig.wordCache());
  • 获取或创建一个名为 wordCache 的分布式缓存。
  • 缓存的键类型是 AffinityUuid,值类型是 String(即单词本身)。
  • CacheConfig.wordCache() 返回一个预定义的缓存配置对象,可能设置了如下属性:
    • 缓存名称
    • 是否为流式缓存
    • 过期策略(比如滑动窗口保留 1 秒数据)
    • 分区分布方式等

💡 提示:这个缓存被设计成一个“滑动窗口”(sliding window),只保留最近 1 秒的数据 —— 常用于实时流处理场景(如统计高频词)。


✅ 6. 使用 IgniteDataStreamer 流式写入数据

try (IgniteDataStreamer<AffinityUuid, String> stmr = ignite.dataStreamer(stmCache.getName())) {
  • IgniteDataStreamer 是 Ignite 提供的高性能工具,用于将大量数据高效地注入缓存,特别适合流式场景。
  • 它内部会批量提交、自动重试、控制内存使用,比直接 put 更高效。
  • stmr 是一个流处理器,负责把单词逐步“灌入”缓存。

✅ 7. 循环读取文本文件并发送数据

while (true) {
    InputStream in = StreamWords.class.getResourceAsStream("alice-in-wonderland.txt");
    try (LineNumberReader rdr = new LineNumberReader(new InputStreamReader(in))) {
        for (String line = rdr.readLine(); line != null; line = rdr.readLine()) {
            for (String word : line.split(" "))
                if (!word.isEmpty())
                    stmr.addData(new AffinityUuid(word), word);
        }
    }
}
🔍 详细解释:
  • while (true):无限循环 —— 表示程序会不断重复发送这本书的内容,模拟持续的数据流。
  • getResourceAsStream(...):从类路径下读取名为 alice-in-wonderland.txt 的文本文件(通常是资源文件夹 resources/ 下)。
  • LineNumberReader:可以读取每一行,并知道当前行号(调试用)。
  • 对每一行进行 split(" ") 拆分成单词。
  • 跳过空字符串(避免多个空格导致的空词)。
  • stmr.addData(...):将每个单词添加到数据流中。

✅ 8. 使用 AffinityUuid 实现 数据亲和性(Affinity Routing)

new AffinityUuid(word)

这是关键点!

  • AffinityUuid 是 Ignite 提供的一个特殊 ID 类型,结合 缓存键(key)的哈希值 来决定这条数据应该分配到哪个节点上。
  • 所有相同的单词(如 “the”)会被分配到 同一个集群节点 上。
  • 这样做的好处是:
    • 可以在固定节点上对某个单词做聚合操作(比如计数)
    • 避免跨节点通信开销
    • 实现“相同单词 → 相同处理节点”的一致性哈希机制

🎯 应用场景举例:如果你想统计每个单词出现的频率,就可以利用这个特性,在每个节点本地统计自己负责的单词数量。


🧠 总结:整体逻辑流程

  1. 启动一个 Ignite 客户端节点
  2. 连接到已有的 Ignite 集群。
  3. 检查是否存在 Server 节点,若无则退出。
  4. 获取或创建一个用于流式处理的缓存 wordCache
  5. 创建一个 DataStreamer 用于高效写入数据。
  6. 不断循环读取《爱丽丝梦游仙境》文本文件。
  7. 将每行拆分为单词,去除空白。
  8. 使用 AffinityUuid(word) 作为 key,保证相同单词总被发送到同一节点。
  9. 通过 stmr.addData() 把单词推入缓存,供后续处理(如实时词频统计)。

📌 典型应用场景

这是一段典型的 流式数据生成器(Streaming Producer) 示例,常用于:

  • 实时日志分析
  • 文本流处理(如舆情监控)
  • 单词计数(Word Count)—— 类似 MapReduce 经典案例
  • 实时推荐系统中的行为流注入

⚠️ 注意事项(潜在问题)

问题 说明
while(true) 程序永远不会停止,除非手动中断(Ctrl+C)。生产环境应加退出条件或信号监听。
文件编码 InputStreamReader 默认编码可能与文件不一致,建议显式指定 UTF-8。
分词粗糙 仅按空格分割,未处理标点符号(如 “hello!” 和 “hello” 被视为不同词)。
内存压力 DataStreamer 虽然有缓冲机制,但无限发送仍可能导致背压(backpressure),需配置合理参数。

✅ 改进建议(可选)

// 显式指定字符集
new InputStreamReader(in, StandardCharsets.UTF_8)

// 更智能的分词(去掉标点)
Arrays.stream(line.split("\\W+")).filter(w -> !w.isEmpty())

// 控制流速(防止太快)
Thread.sleep(10); // 每处理完一轮暂停一下

📚 相关概念图解(简化)

[ alice-in-wonderland.txt ]
           ↓
     逐行读取 + 分词
           ↓
   每个单词 → AffinityUuid(word)
           ↓
   DataStreamer 批量发送
           ↓
Ignite Cluster (Server Nodes)
   ┌─────────────────┐
   │ Node 1: "the", "a" ...     ← 相同单词始终到同一节点
   │ Node 2: "hello", "world" ...
   └─────────────────┘
           ↓
   可用于实时计算:词频、趋势...

如果你还想知道如何 消费这些流数据(比如统计词频),我可以继续提供对应的 Continuous QueryStream Listener 示例代码。需要吗?