`

Flume源代码解读一

阅读更多

     [ xcly原创于iteye,见http://xcly.iteye.com ]

 

   年初团队接了搭建公司Hadoop平台的研发计划,负责公司产品日志的收集,分析两个工作。

   日志收集准备搭建flume(0.9.3)这个分布式日志收集集群.背景介绍完毕,马上开始。

 

Flume分为agent,collector,master三个概念节点,agent负责收集日志,发到collector, collector一般负责接收agent发过来的事件流,写到存储器,这里一般是hdfs. master负责配置及通信管理,类似于hadoop中的NameNode,是心脏,是整个集群控制器。

   此次业务收集的日志主要是公司上千台服务器的产品日志,包括访问日志,搜索日志,下载日志,全部要入到hdfs, agent收集日志的source准备用tailDir来处理,今天我们就来看看tailDir的实现。 

 

   tailDir为EventSource的一种方式,source指日志信息产生源,在SourceFactoryImpl定义了我们可以使用的各种事件源,SourceFactoryImpl集成SourceFactory,是典型的工厂方法,通过getSource方法返回EventSource。 这里getSource的实现方法并不是简单的返回具体EventSource实现类,而是调用SourceBuilder的build方法返回。 SourceBuilder的实现类似于工厂方法,只是在每一个EventSource都必须含有builder的静态方法返回具体SourceBuilder的实现,这种方式实现有一定的优雅性,各个EventSource封装了自己对参数的不同处理,又可以根据不同的情况调用不同的构造函数,甚至是调用另一个EventSource的实现。 

 

 

   tailDir的实现类为TailDirSource, 有4个参数,dir要监听的文件夹,regex正则表达式限定命名文件,startFromEnd启动时是否从文件尾开始读取,recurseDepth要监听的文件夹级数。 在配置tailDir的时候容易被flume user guide误导,犯一个错误,参数的传递不需要参数名=,直接用逗号隔开的配置值即可,否则是使用默认值的。而且四个参数的顺序是固定的,必须是dir,regex,startFromEnd,recurseDepth,如果只使用前两个,可只传递两个参数,如果要使用recurseDepth,必须传递4个参数。

    TailDirSource对文件的监听调用 DirWatcher实现,在open方法中调用,每一个文件夹包括子文件夹对应一个DirWatcher实例,而DirWather用Periodic监听文件夹,每250毫秒遍历文件夹下所有文件一次, 同第一次新建DirWatcher实例一样,用fileCreated方法新建Cursor,Cursor是读取遍历文件的关键类。

   TailDirSource open方法还新建了TailSource实例,而TailSource新建了TailThread线程,TailThread线程不断循环从Cursor队列数组中遍历读取每一行事件,放到TailSource的队列中中。而TailDirSource 的next方法调用TailSource的next方法,  每100ms从TailSource的队列中取事件。主线程取数据,分线程放入数据。

    Cursor是读取文件信息的关键类,核心是用RandomAccessFile来读取文件,RandomAccessFile支持任意位置的随机读取,非常强大。默认每次是文件开头或者文件结尾读取,我们此次对于过大的文件加入了文件点读取记录功能,防止重复读取。

 

 

 

 

分享到:
评论
2 楼 cunsky 2012-12-02  
lakeblur 写道
您好,能否请教一个flume的问题,collector的sink我用escapedFormatDfs,如果我想每分钟生成一个文件,配成:escapedFormatDfs("hdfs://localhost:9000/flume/%Y%m%d/%{category}/", "%M.log",raw())' ,如果不加roll参数,hdfs中产生的文件似乎一直都没有关闭,无法读取,而我配置了roll的参数小于1分钟的话,则会造成数据丢失,想问一下roll的作用到底是什么?roll的机制和根据时间段自定义文件夹或文件名的机制是否会有冲突?如能告知,不胜感激!

路过看到了,根据我的了解说下,roll是根据时间长度或者接收日志总量大小对sink进行关闭和重开启,反映到底层就是dfs文件的关闭和重新打开一个文件,你不加roll参数的话,文件必然是无法关闭的,至于数据有丢失,不知道具体情况不敢妄测。
1 楼 lakeblur 2012-09-28  
您好,能否请教一个flume的问题,collector的sink我用escapedFormatDfs,如果我想每分钟生成一个文件,配成:escapedFormatDfs("hdfs://localhost:9000/flume/%Y%m%d/%{category}/", "%M.log",raw())' ,如果不加roll参数,hdfs中产生的文件似乎一直都没有关闭,无法读取,而我配置了roll的参数小于1分钟的话,则会造成数据丢失,想问一下roll的作用到底是什么?roll的机制和根据时间段自定义文件夹或文件名的机制是否会有冲突?如能告知,不胜感激!

相关推荐

Global site tag (gtag.js) - Google Analytics