写在前面的话,最近发现有很多网站转载我博客的文章,这个我都不介意的,但是这些网站转载我博客都将文章的出处去掉了,直接变成自己的文章了!!我强烈谴责他们,鄙视那些转载文章去掉出处的人!所以为了防止这些,我以后发表文章的时候,将会在文章里面加入一些回复之后才可见的内容!!请大家不要介意,本博客Hadoop相关的文章都是原创,每篇文章都花了很长时间编写,这些都是无偿的,所以转载本文章请标明出处:
本篇博客地址:《基于flume-ng 1.4.0的TailSource程序开发》: /archives/1034
博客地址:过往记忆:
本篇博客地址:《基于flume-ng 1.4.0的TailSource程序开发》: /archives/1034
博客地址:过往记忆:
用过Flume 0.9.4的用户应该知道,在它里面自带了一个TailSource以及TailDirSource,这个Source是负责读取一个文件,并一行一行的发送到sink端,而在flume-ng 1.4.0里面没有自带TailSource,更别说TailDirSource了,虽然我们可以在flume-ng 1.4.0里面用exec的tail -F来一行一行的发送文件,但是还是不太好。而且公司最近flume需要升级,之前的程序用到了TailSource和TailDirSource,没办法只能再次开发了。在flume-ng里面开发一个source需要遵循一定的规则,我们可以通过以下两种方法来开发自己的source:
package org.apache.flume; import org.apache.flume.conf.Configurable; import org.apache.flume.source.AbstractSource; /** * User: wyp * /archives/1034 * 版权所有,翻版不究 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干活 * Date: 14-5-20 * Time: 下午11:15 */ public class TailSource extends AbstractSource implements EventDrivenSource, Configurable { @Override public void configure(Context context) { } @Override public synchronized void start() { } @Override public synchronized void stop() { } }
或者
package org.apache.flume; import org.apache.flume.conf.Configurable; import org.apache.flume.source.AbstractSource; /** * User: wyp * /archives/1034 * 版权所有,翻版不究 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干活 * Date: 14-5-20 * Time: 下午11:15 */ public class TailSource extends AbstractSource implements Configurable, PollableSource { @Override public void configure(Context context) { } @Override public Status process() throws EventDeliveryException { return null; } @Override public synchronized void start() { } @Override public synchronized void stop() { } }
这两个类虽然继承关系不一样,但是都可以实现。下面的方法中多了一个process()方法,这个方法在PollableSourceRunner类中会被一个线程不断的调用。
关于如何具体开发TailSource,这个我就不介绍了。下面主要介绍一下如何使用我这个TailSource,在本文的后面会附上TailSource插件的下载地址,大家如有需要可以自行下载使用。TailSource支持waitTime、file、startFromEnd以及offset四个参数的配置。
- waitTime:指的是隔多久读一次文件,程序是按照一块一块的读取文件内容到缓冲区,然后一行一行的解析;可以不配置,默认1秒读一次。
- file:指的是需要读取的文件绝对路径,一定需要配置;
- startFromEnd:是否从文件结尾处读取,默认为false;
- offset:从offset偏移量开始读取,可以不配置,默认是从偏移量为0开始读。
目前本版本的TailSource不支持正则表达式的配置。如你需要使用,可以按照下面的配置进行:
agent.sources = r1 agent.sources.r1.type = org.apache.flume.TailSource agent.sources.r1.channels = channel agent.sources.r1.waitTime = 100 agent.sources.r1.file = /export1/spark/apache-flume-1.4.0-bin/test agent.sources.r1.startFromEnd = false agent.sources.r1.offset = 0
然后就可以监听到test文件的变化,并放到channel里面。下面是TailSource的下载地址
本博客文章除特别声明,全部都是原创!原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【基于flume-ng 1.4.0的TailSource程序开发】(https://www.iteblog.com/archives/1034.html)
学习一下 💡
学习一下,flume企业中不知道常是监控目录还是文件。
都有,看你具体使用情况。本文的TailSource是监控文件的。
具体情况?能不能举个场景?谢谢。
请问代码能共享吗?
学习一下
学习一下
遇到需要TailDirSource的场景,借鉴一下,谢谢
正好最近开始搞flume,能放出tailsource的源码么?
希望楼主分享下设计思路,之前自己尝试的写过,效率好低。。。。
楼主能说下的设计思路吗???
最近也想自己尝试下写一个tailsource实现一些特定的功能,要是楼主能将开发过程记录下来参考下就好了
用楼主的tailSource会出现Implementing class的问题,不知道应该如何解决呢?
很好,希望楼主能够详细说说开发过程
嗯 不错啊
被救一命
能否提供一下src
tail -F支持变量后,不能自动切换
写得不错
hao~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
LZ好人
看看
学习,谢谢分享
感谢分享
看看,谢谢分享!!
谢谢分享!!
刚学习Flume
想看一下评论
最近在学习这个软件产品,感谢楼主写了这么多文章。
看看,支持楼主
回复了一遍,没看到啊
看看,支持楼主
支持楼主!!!!
我在使用最新的Flume遇到Tail -F命令时,会无缘无故停止发送数据,一直纠结不知道该如何解决,不知道楼主的这个自己实现Tail -F的功能是否可以解决该问题?“
我这个只要文件里面有数据就可以发送。
点赞。
不错
看看怎么弄的
能把tailDirSource也放出来么?
TailDirSource已经开发完成,不过我们做了修改,加入了断点续传以及恢复等功能,不过我可以将最原始的放出来。
那很强大啊,期待……
请问你们开发的tailSource支持断点续传吗?
我开发的TailSource支持断点续传,每条日志都可以追踪来源。
现在我需要使用flume实时收集日志,使用tail -F 会遇到,如果agent死掉重启后不能从断点处继续收集日志的问题,请问你的tailSource是否在agent重启时,可以继续从断点处收集呢?
支持断点续传意思就是无论是你Flume挂掉了,还是你机器挂掉了,都可以从断点恢复继续发送数据。
感谢,强大
可为何我测试的并不支持断点续传呢?可以加QQ,详聊吗
Sorry,忘了和你说,这个公开的版本不支持断点续传,只能支持简单的实时发送数据。
what ? 还有收费版的?
不收费,只不过这个版本公司不让公布。
可以加QQ 私聊下吗,想请教几个问题QQ:314109012
看看
很不错,了解一下和exec source的区别
exec 使用tail -F + file的方式太不靠谱了,经常自己就停止采集数据,没有遇到过么?
很不错,支持一下
·看看
看看
写的很好