在大数据系统的开发中,数据收集工作无疑是开发者首要解决的一个难题,但由于生产数据的源头丰富多样,其中包含网站日志数据、后台监控数据、用户浏览网页数据等,数据工程师要想将它们分门别类的采集到HDFS系统中,就可以使用Apache Flume(数据采集)系统。
Flume简介
Flume原是Cloudera公司提供的一个高可用的、高可靠的、分布式海量日志采集、聚合和传输系统,而后纳入到了Apache旗下,作为一个顶级开源项目。Apache Flume不仅只限于日志数据的采集,由于Flume采集的数据源是可定制的,因此Flume还可用于传输大量事件数据,包括但不限于网络流量数据、社交媒体生成的数据、电子邮件消息以及几乎任何可能的数据源。
Flume运行机制
Flume的核心是把数据从数据源(例如Web服务器)通过数据采集器(Source)收集过来,再将收集的数据通过缓冲通道(Channel)汇集到指定的接收器(Sink)。
Flume基本架构中有一个Agent(代理),它是Flume的核心角色,Flume Agent是一个JVM进程,它承载着数据从外部源流向下一个目标的三个核心组件:Source、Channel和Sink。
Flume日志采集系统结构图
在实际开发中, Flume需要采集数据的类型多种多样,同时还会进行不同的中间操作,所以根据具体需求,可以将Flume日志采集系统分为简单结构和复杂结构。
复杂的Flume日志采集系统的结构
Flume系统要求、
使用Flume进行开发,必须满足一定的系统要求,这里以官方说明为准,具体要求如下。
Flume安装配置
1.下载 Flume 1.8.0 安装包并解压
2.配置flume-env.sh文件,添加JDK环境变量。
3.配置 /etc/profile文件,添加Flume环境变量。
Flume入门使用
使用Flume系统,只需要创建一个配置文件,用来配置Flume Source、Flume Channel和Flume Sink三大组件的属性即可。例如编写一个采集netcat源数据的采集方案netcat-logger.conf(见教材文件8-1),然后输入启动Flume指令,具体效果如下。
启动Flume程序后,在CRT工具中克隆会话窗口,启动telnet 工具,具体效果如下所示。
Flume Sources
在编写Flume采集方案时,首先必须明确采集的数据源类型、出处;接着,根据这些信息与Flume已提供支持的Flume Sources进行匹配,选择对应的数据采集器类型(即sources.type);再根据选择的数据采集器类型,配置必要和非必要的数据采集器属性,Flume提供并支持的Flume Sources种类如下所示。
监听Avro端口并从外部Avro客户端流中接收event数据,当与另一个Flume Agent上的Avro Sink配对时,可创建分层集合拓扑,利用Avro Source可以实现多级流动、扇出流、扇入流等效果,Avro Source常用配置属性如下。

Taildir Source用于观察指定的文件,几乎可以实时监测到添加到每个文件的新行。如果文件正在写入新行,则此采集器将重试采集它们以等待写入完成,Source常用配置属性如下所示。
HTTP Source可以通过HTTP POST和GET请求方式接收event数据,GET通常只能用于测试使用,POST请求发送的所有的events都被认为是一个批次,会在一个事务中插入channel,Taildir Source常用配置属性如下所示。
Flume Channels
Channels通道是event在Agent上暂存的存储库,Source向Channel中添加event,Sink在读取完数据后再删除它。在配置Channels时,需要明确的就是将要传输的sources数据源类型;根据这些信息结合开发中的实际需求,选择Flume已提供的支持的Flume Channels;再根据选择的Channel类型,配置必要和非必要的Channel属性,Flume提供并支持的Flume Channels种类如下所示。
Memory Channel会将event存储在具有可配置最大尺寸的内存队列中,适用于需要更高吞吐量的流量,但在Agent发生故障时会丢失部分阶段数据,下表为Memory Channel常用配置属性。
File Channel是Flume的持久通道,它将所有event写入磁盘,因此不会丢失进程或机器关机、崩溃时的数据。File Channel通过在一次事务中提交多个event来提高吞吐量,做到了只要事务被提交,那么数据就不会有丢失,File Channel常用配置属性如下所示。
Flume Sinks
Flume Soures采集的数据通过Channels通道流向Sink中,此时Sink类似一个集结的递进中心,它需要根据需求进行配置,从而最终选择发送目的地。
配置Sinks时,明确将要传输的数据目的地、结果类型;然后根据实际需求信息,选择Flume已提供支持的Flume Sinks;再根据选择的Sinks类型,配置必要和非必要的Sinks属性。Flume提供并支持的Flume Sinks种类如下所示。
HDFS Sink将event写入Hadoop分布式文件系统(HDFS),它目前支持创建文本和序列文件,以及两种类型的压缩文件,下表为HDFS Sink常用配置属性。
Logger Sink用于记录INFO级别event,它通常用于调试。Logger Sink接收器的不同之处是它不需要在“记录原始数据”部分中说明额外的配置,Logger Sink常用配置属性如下所示。
Avro Sink形成Flume分层收集支持的一半,发送到此接收器的Flume event转换为Avro event,并发送到对应配置的主机名/端口,event将从配置的channel中批量获取配置的批处理大小,Avro Sink常用配置属性如下所示。
负载均衡
配置的采集方案是通过唯一一个Sink作为接收器接收后续需要的数据,但会出现当前Sink故障或数据收集请求量较大的情况,这时单一Sink配置可能就无法保证Flume开发的可靠性。因此,Flume 提供Flume Sink Processors解决上述问题。
Sink处理器允许定义Sink groups,将多个sink分组到一个实体中,Sink处理器就可通过组内多个sink为服务提供负载均衡功能。
负载均衡接收器处理器(Load balancing sink processor)提供了在多个sink上进行负载均衡流量的功能,它维护一个活跃的sink索引列表,需在其上分配负载,还支持round_robin(轮询)和random(随机)选择机制进行流量分配,默认选择机制为round_robin。Load balancing sink processor提供的配置属性如下所示。
故障转移
故障转移接收器处理器(Failover Sink Processor)维护一个具有优先级的sink列表,保证在处理event时,只需有一个可用的sink即可。
故障转移机制工作原理是将故障的sink降级到故障池中,在池中为它们分配一个冷却期,在重试之前冷却时间会增加,当sink成功发送event后,它将恢复到活跃池中。Failover Sink Processor提供的配置属性如下所示。
拦截器
Flume Interceptors(拦截器)用于对Flume系统数据流中event的修改操作。使用Flume拦截器时,只需参考官方配置属性在采集方案中选择性的配置即可,当涉及到配置多个拦截器时,拦截器名称间需用空格分隔,且拦截器配置顺序就是拦截顺序。Flume 1.8.0版本中,Flume提供并支持的拦截器有很多,具体如下所示。
Timestamp Interceptor(时间戳拦截器)将流程执行时间插入到event的header头部,此拦截器插入带有timestamp键的标头,其值为对应时间戳。若配置中已存在时间戳时,此拦截器可保留现有时间戳,Timestamp Interceptor提供的常用配置属性如下所示。
Static Interceptor(静态拦截器)允许用户将具有静态值的静态头附加到所有event。当前不支持一次指定多个header头,但是用户可定义多个Static Interceptor来为每一个拦截器都追加一个header,Static Interceptor提供的常用配置属性如下所示。
日志数据采集流程图
1.服务系统搭建与配置
根据案例需求启动3台服务器,同时搭建Flume系统和Hadoop集群。此案例将hadoop02和hadoop03分别作为A服务器和B服务器进行第一阶段的日志数据采集,将hadoop01作为C服务器进行日志数据汇总并上传至HDFS。
2.配置采集方案
在hadoop02和hadoop03的/flume/conf目录下编写相同日志采集方案exec-avro_logCollection.conf(参考教材中文件8-5的内容)。
在hadoop01机器的/flume/conf目录下编写第二级日志采集方案avro-hdfs_logCollection.conf(参考教材中文件8-6的内容)。
3.启动日志采集系统
1.在Hadoop集群主节点hadoop01机器上启动Hadoop集群。
2.在hadoop01、hadoop02及hadoop03服务器上分别启动Flume系统。
3.查看hadoop01界面启动Flume效果(见下页)。
4.在hadoop02和hadoop03机器上分别克隆/新建3个会话窗口,生产日志数据。
4.日志采集系统测试
(1)查看hadoop01会话窗口信息。hadoop01的Flume系统已将hadoop2和hadoop03上采集的数据进行汇总并上传至HDSF。
(2)查看Hadoop集群UI界面。Hadoop集群下已新添加一个source目录。
(3)进入source目录,查看其内部文件存储结构,完成日志采集需求。