【作者主页】Francek Chen
【专栏介绍】 ⌈ ⌈ ⌈智能大数据分析 ⌋ ⌋ ⌋ 智能大数据分析是指利用先进的技术和算法对大规模数据进行深入分析和挖掘,以提取有价值的信息和洞察。它结合了大数据技术、人工智能(AI)、机器学习(ML)和数据挖掘等多种方法,旨在通过自动化的方式分析复杂数据集,发现潜在的价值和关联性,实现数据的自动化处理和分析,从而支持决策和优化业务流程。与传统的人工分析相比,智能大数据分析具有自动化、深度挖掘、实时性和可视化等特点。智能大数据分析广泛应用于各个领域,包括金融服务、医疗健康、零售、市场营销等,帮助企业做出更为精准的决策,提升竞争力。
【GitCode】专栏资源保存在我的GitCode仓库:https://gitcode.com/Morse_Chen/Intelligent_bigdata_analysis。
文章目录
一、实验目的
掌握如何用 Java 代码来实现 Storm 任务的拓扑,掌握一个拓扑中 Spout 和 Bolt 的关系及如何组织它们之间的关系,掌握如何将 Storm 任务提交到集群。
二、实验要求
编写一个 Storm 拓扑,一个 Spout 每个一秒钟随机生成一个单词并发射给 Bolt,Bolt 统计接收到的每个单词出现的频率并每隔一秒钟实时打印一次统计结果,最后将任务提交到集群运行,并通过日志查看任务运行结果。
三、实验原理
Storm 集群和 Hadoop 集群表面上看很类似。但是 Hadoop 上运行的是 MapReduce jobs,而在 Storm 上运行的是拓扑(topology
),这两者之间是非常不一样的。一个关键的区别是: 一个 MapReduce job 最终会结束, 而一个 topology 永远会运行(除非你手动 kill 掉)。
(一)Topologies
一个 topology 是spouts
和bolts
组成的图,通过 stream groupings 将图中的 spouts 和 bolts 连接起来,如图所示。
一个 topology 会一直运行直到你手动 kill 掉,Storm 自动重新分配执行失败的任务, 并且 Storm 可以保证你不会有数据丢失(如果开启了高可靠性的话)。如果一些机器意外停机它上面的所有任务会被转移到其他机器上。
运行一个 topology 很简单。首先,把你所有的代码以及所依赖的 jar 打进一个 jar 包。然后运行类似下面的这个命令:
storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2
这个命令会运行主类:backtype.strom.MyTopology
,参数是arg1
,arg2
。这个类的 main 函数定义这个 topology 并且把它提交给 Nimbus。storm jar
负责连接到 Nimbus 并且上传 jar 包。
Topology 的定义是一个 Thrift 结构,并且 Nimbus 就是一个 Thrift 服务, 你可以提交由任何语言创建的 topology。上面的方面是用 JVM-based 语言提交的最简单的方法。
(二)Spouts
消息源 spout 是 Storm 里面一个 topology 里面的消息生产者。一般来说消息源会从一个外部源读取数据并且向 topology 里面发出消息:tuple。Spout 可以是可靠的也可以是不可靠的。如果这个 tuple 没有被 storm 成功处理,可靠的消息源 spouts 可以重新发射一个 tuple, 但是不可靠的消息源 spouts 一旦发出一个 tuple 就不能重发了。
消息源可以发射多条消息流 stream。使用OutputFieldsDeclarer.declareStream
来定义多个 stream,然后使用SpoutOutputCollector
来发射指定的 stream。
Spout
类里面最重要的方法是nextTuple
。要么发射一个新的 tuple 到 topology 里面或者简单的返回如果已经没有新的 tuple。要注意的是 nextTuple 方法不能阻塞,因为 storm 在同一个线程上面调用所有消息源 spout 的方法。
另外两个比较重要的 spout 方法是ack
和fail
。storm 在检测到一个 tuple 被整个 topology 成功处理的时候调用 ack,否则调用 fail。storm 只对可靠的 spout 调用 ack 和 fail。
(三)Bolts
所有的消息处理逻辑被封装在 bolts 里面。Bolts 可以做很多事情:过滤,聚合,查询数据库等等。Bolts 可以简单的做消息流的传递。复杂的消息流处理往往需要很多步骤,从而也就需要经过很多 bolts。比如算出一堆图片里面被转发最多的图片就至少需要两步:第一步算出每个图片的转发数量。第二步找出转发最多的前10个图片。(如果要把这个过程做得更具有扩展性那么可能需要更多的步骤)。
Bolts 可以发射多条消息流, 使用OutputFieldsDeclarer.declareStream
定义 stream,使用OutputCollector.emit
来选择要发射的 stream。
Bolts 的主要方法是execute
,它以一个 tuple 作为输入,bolts 使用OutputCollector
来发射 tuple,bolts 必须要为它处理的每一个 tuple 调用OutputCollector
的 ack 方法,以通知 Storm 这个 tuple 被处理完成了,从而通知这个 tuple 的发射者 spouts。 一般的流程是: bolts 处理一个输入 tuple,发射0个或者多个 tuple,然后调用 ack 通知 storm 自己已经处理过这个 tuple 了。storm 提供了一个 IBasicBolt 会自动调用 ack。
四、实验环境
云创大数据实验平台:
Java 版本:jdk1.7.0_79
Hadoop 版本:hadoop-2.7.1
ZooKeeper 版本:zookeeper-3.4.6
Storm 版本:storm-0.10.0
五、实验内容和步骤
本实验主要演示一个完整的 Storm 拓扑编码过程,主要包含 Spout、Bolt 和构建 Topology 几个步骤。
(一)启动 Storm 集群
首先,启动 Storm 集群。
实验的准备工作是:域名映射、免密登录、JDK 配置、部署 ZooKeeper、部署 Storm 等。该实验可以点击一键搭建后能看到搭建成功,即可自动搭建好环境。
(二)导入依赖 jar 包
其次,将 Storm 安装包的 lib 目录内如下 jar 包导入到开发工具:
然后再在 Eclipse 中对每个 jar 执行如下操作进行添加配置:
出现这样即可:
(三)编写程序
我们在项目的 src 中首先创建一个cproc.word
包。
然后,编写代码,实现一个完整的 Topology,内容如下:
Spout 随机发送单词,代码实现:
package cproc.word;
import java.util.Map;
import java.util.Random;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
public class WordReaderSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector)
{
this.collector = collector;
}
@Override
public void nextTuple() {
//这个方法会不断被调用,为了降低它对CPU的消耗,让它sleep一下
Utils.sleep(1000);
final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
Random rand = new Random();
String word = words[rand.nextInt(words.length)];
collector.emit(new Values(word));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
Bolt 单词计数,并每隔一秒打印一次,代码实现:
package cproc.word;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
public class WordCounterBolt extends BaseBasicBolt {
private static final long serialVersionUID = 5683648523524179434L;
private HashMap<String, Integer> counters = new HashMap<String, Integer>();
private volatile boolean edit = false;
@Override
public void prepare(Map stormConf, TopologyContext context) {
//定义一个线程1秒钟打印一次统计的信息
new Thread(new Runnable() {
public void run() {
while (true) {
if (edit) {
for (Entry<String, Integer> entry : counters.entrySet())
{
System.out.println(entry.getKey() + " : " + entry.getValue());
}
edit = false;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
}
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
String str = input.getString(0);
if (!counters.containsKey(str)) {
counters.put(str, 1);
} else {
Integer c = counters.get(str) + 1;
counters.put(str, c);
}
edit = true;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
构建 Topology 并提交到集群主函数,代码实现:
package cproc.word;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.AuthorizationException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
public class WordCountTopo {
public static void main(String[] args) throws Exception{
//构建Topology
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word-reader", new WordReaderSpout());
builder.setBolt("word-counter", new WordCounterBolt())
.shuffleGrouping("word-reader");
Config conf = new Config();
//集群方式提交
StormSubmitter.submitTopologyWithProgressBar("wordCount", conf,
builder.createTopology());
}
}
(四)打包上传并运行
将 Storm 代码打成wordCount-Storm.jar
(打包的时候不要包含 storm 中的 jar,不然会报错的,将无法运行,即:wordCount-Storm.jar
中只包含上面三个类的代码) 上传到主节点的/usr/cstor/storm/bin
目录下。
这里需要注意的是我们不勾选上图框选的选项,这样就不会打包项目中的jar包。
在主节点进入 Storm 安装目录的 bin 下面用以下命令提交任务:
cd /usr/cstor/storm/bin
./storm jar wordCount-Storm.jar cproc.word.WordCountTopo wordCount
因为 topology 会永远运行,需要手动 kill 掉,使用以下命令结束 storm 任务:
./storm kill wordCount
六、实验结果
Storm 任务执行时,可以查看 Storm 日志文件,日志里面打印了统计的单词结果,日志内容如图。注意要到 slave1 服务器上查看日志文件。
cd /usr/cstor/storm/logs
ls
cat wordCount-1-1728785733-worker-6703.log
……
七、实验心得
在本次 Storm 实验中,我深入了解了如何使用 Apache Storm 实现一个实时 WordCountTopology。Apache Storm 是一个开源的分布式实时计算系统,用于处理大量的数据流。通过本次实验,我不仅掌握了 Storm 的基本概念,还学会了如何使用 Java 代码来实现 Storm 任务的拓扑,以及如何将 Storm 任务提交到集群中运行。
实验的核心是创建一个能够实时统计单词频率的 Topology。这个 Topology 由一个 Spout 和多个 Bolt 组成。Spout 负责生成或接收外部数据流,并将其转换为 Storm 内部的 Tuple(消息传递的基本单元)。在这个实验中,Spout 每秒随机生成一个单词,并将其发送给 Bolt。Bolt 则负责处理接收到的 Tuple,进行单词统计,并每隔一秒打印一次统计结果。
在实验过程中,我首先通过 Eclipse 创建了一个 StormTest 项目,并导入了所需的依赖 jar 包。然后,我创建了三个 Java 类:WordReaderSpout
、WordCounterBolt
和WordCountTopo
。WordReaderSpout 负责生成单词,WordCounterBolt 负责将单词拆分和统计单词频率。最后,在 WordCountTopo 类中定义了 Topology 的结构,并将这些组件组织起来。
在将 Topology 提交到 Storm 集群之前,我首先在本地模式下进行了测试。通过运行storm jar
命令,我成功地将 Topology 提交给 Nimbus(Storm 的主节点),并在本地机器上模拟了 Storm 集群的运行环境。测试结果显示,Topology 能够正确地生成单词、拆分单词并统计单词频率。
接下来,我将 Topology 提交到了实际的 Storm 集群中运行。在集群模式下,我需要注意一些额外的配置,如设置 worker 的数量、executor 的数量以及 task 的数量等。这些配置对于优化 Topology 的性能至关重要。通过合理地配置并行度,我成功地提高了 Topology 的处理效率。
总的来说,这次 Storm 实验让我对分布式实时计算系统有了更深入的了解。通过实践,我不仅掌握了 Storm 的基本概念和操作方法,还学会了如何优化 Topology 的性能和解决实际问题。我相信这些经验和知识将对我未来的学习和工作产生积极的影响。
附:以上文中的数据文件及相关资源下载地址:
链接:https://pan.quark.cn/s/002139a4dc81
提取码:V7B3