1. 背景
OPPO很多线上业务每天会产生海量数据,如日志数据、监控数据、调用链数据。我们需要把这些数据进行归类、聚合、过滤、存储。例如将不同的日志数据写入到不同的存储系统中。如果这些日志数据同步写入到数据库中,则会降低服务的性能。如果采用异步发送,先将数据写入本地缓存队列,然后再启动一个线程从队列中获取数据,写入到数据库中,这样处理不会将影响对外服务的性能,但是如果数据量过大时容易造成进程OOM,重启时则数据丢失。
DataFlow是由OPPO互联网自研的一款高性能的数据流采集、聚合和传输框架,它通过将日志写入问题件,同时利用文件系统的顺序写入、内存缓存和内存映射文件技术、预写日志WAL等方式来提高写入的效率。
2、架构
DataEvent
DataEvent是DataFlow端到端传输的基本单元,它由body和headers信息构成,由K-V构成的Map信息,主要用于数据信息的传递。
private Map<String, String> headers = new HashMap<>();
private List<T> body = new ArrayList<>();
Source
它是数据源,从特定通道(如Http)接受数据,把消息路由分发到Channel中。开发者通过继承SourceBase实现Source的功能。
Channel
它保存接收到的DataEvent直到它们被所有Sink节点消费完成,Channel传输时需要序列化及反序列化,默认采用的是Kryo,开发者可以根据实际情况使用其它序列化方式,如protobuf。开发者通过继承ChannelBase实现Channel的功能以及序列化和反序列化。
Sink
它主要从Channel中获取数据,将数据传输到下一个目的地,如Elasticsearch、RocksDB。一个Sink有且只有一个Channel。开发者通过继承SinkBase实现Sink的功能。
用户在使用DataFlow时,需要自己实现继承一个SourceBase的类,调用里面的put方法将DataEvent写入到Channel中。Channel默认采用系统自带的FileChannel,将用户调用的put方法写入的数据存储到本地磁盘中。然后用户只需要调用task方法就可以从Channel中获取数据,进行数据的分析、存储。
3、FileChannel
FileChannel写流程
FileChanel在运行之前,需要配置两个文件夹,一个是数据文件夹,用来存放用户写入的数据和数据的索引信息;另一个是checkpoint文件夹,用来定时持久化元数据信息。在FileChannel方法中,调用put方法来写event数据。每一次的写入操作都需要再一个事务中进行。先执行begin开启事务,然后多次执行put操作,将多条数据写入到文件对应的page-cache,然后执行commit,将数据由page-cache刷新到磁盘中,并在文件中写入commit的标志位。如果中途出现异常,则需要执行rollback回滚操作,在已经写入的数据后面写入rollback标志位。
每开启一个事务,都会申请一个事务号,事务号由每一个channel来产生,类似于雪花算法。在put方法中,将事务号,唯一性id,event数据一起序列化,然后写入到数据文件对应的page-cache中。此时会返回文件的id和数据在文件中的offset。数据所在的文件id和offset存放在对象Long对象中(Long对象的前32位表示文件id,后32位表示offset),这个Long对象会写入到一个内存队列putList中。
在commit方法中,会写一个commit的操作记录到page-cache中,然后将之前写在page-cache中的数据刷新到磁盘中,最后将putList中的数据取出来写入到内存队列queue中。
例如在文件中依次写入"a","b"两条数据,事务id为1,会写两条记录(1,1,put a),(1,2,put b)。然后执行commit,会写一个commit操作,这样page-cache中就有(1,1,put a),(1,2,put b),(1,3,commit)这三条数据,最后刷盘就会将这三条数据写入到磁盘中。同时queue中会有2条数据,分别表示写入数据在文件中的位置。
FileChannel消费流程
FileChannel通过task操作获取数据,进行消费。对FileChannel中数据的消费也需要再一个事务中进行,先开启一个事务,然后从内存队列queue中获取数据的元数据信息,即上面提到的Long对象,然后从这个Long对象中解析获取文件id的数据的offset,最后就可以从这个文件中读取到数据。每一次从文件中读取数据之前,需要将一个task的操作记录写入到文件中,然后再从文件中读取数据。最后执行commit操作,将commit的记录写入到page-cache中,最后刷盘。
接上面写入的例子,FileChannel已经写入了"a","b"两个文件,则文件中的记录为:
(1,1,put a),(1,2,put b),(1,3,commit)
然后从FileChannel中执行task操作,消费一个数据a。则先开启一个事务,事务id为2,然后写入一个task操作,
(1,1,put a),(1,2,put b),(1,3,commit),(2,5,take a)
task操作取出数据后消费,然后commit,commit后也要有操作记录,这样文件的内容如下:
(1,1,put a),(1,2,put b),(1,3,commit),(2,5,take a),(2,6,commit)