我们知道,Flume可以和许多的系统进行整合,包括了Hadoop、Spark、Kafka、Hbase等等;当然,强悍的Flume也是可以和Mysql进行整合,将分析好的日志存储到Mysql(当然,你也可以存放到pg、oracle等等关系型数据库)。
不过我这里想多说一些:Flume是分布式收集日志的系统;既然都分布式了,数据量应该很大,为什么你要将Flume分析出来的数据用Mysql进行储存?能否在下面评论处留下你的使用场景呢?
其实,Flume和Mysql进行整合开发的过程也是相当的简单的。代码如下:
package com.iteblog.flume; /** * User: 过往记忆 * Date: 14-9-4 * Time: 下午13:16 * bolg: * 本文地址:/archives/1109 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货 * 过往记忆博客微信公共帐号:iteblog_hadoop */ import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.Lists; import org.apache.flume.*; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.List; public class MysqlSink extends AbstractSink implements Configurable { private Logger LOG = LoggerFactory.getLogger(MysqlSink.class); private String hostname; private String port; private String databaseName; private String tableName; private String user; private String password; private PreparedStatement preparedStatement; private Connection conn; private int batchSize; public MysqlSink() { LOG.info("MysqlSink start..."); } @Override public void configure(Context context) { hostname = context.getString("hostname"); Preconditions.checkNotNull(hostname, "hostname must be set!!"); port = context.getString("port"); Preconditions.checkNotNull(port, "port must be set!!"); databaseName = context.getString("databaseName"); Preconditions.checkNotNull(databaseName, "databaseName must be set!!"); tableName = context.getString("tableName"); Preconditions.checkNotNull(tableName, "tableName must be set!!"); user = context.getString("user"); Preconditions.checkNotNull(user, "user must be set!!"); password = context.getString("password"); Preconditions.checkNotNull(password, "password must be set!!"); batchSize = context.getInteger("batchSize", 100); Preconditions.checkNotNull(batchSize > 0, "batchSize must be a positive number!!"); } @Override public void start() { super.start(); try { //调用Class.forName()方法加载驱动程序 Class.forName("com.mysql.jdbc.Driver"); } catch (ClassNotFoundException e) { e.printStackTrace(); } String url = "jdbc:mysql://" + hostname + ":" + port + "/" + databaseName; //调用DriverManager对象的getConnection()方法,获得一个Connection对象 try { conn = DriverManager.getConnection(url, user, password); conn.setAutoCommit(false); //创建一个Statement对象 preparedStatement = conn.prepareStatement("insert into " + tableName + " (content) values (?)"); } catch (SQLException e) { e.printStackTrace(); System.exit(1); } } @Override public void stop() { super.stop(); if (preparedStatement != null) { try { preparedStatement.close(); } catch (SQLException e) { e.printStackTrace(); } } if (conn != null) { try { conn.close(); } catch (SQLException e) { e.printStackTrace(); } } } @Override public Status process() throws EventDeliveryException { Status result = Status.READY; Channel channel = getChannel(); Transaction transaction = channel.getTransaction(); Event event; String content; List<String> actions = Lists.newArrayList(); transaction.begin(); try { for (int i = 0; i < batchSize; i++) { event = channel.take(); if (event != null) { content = new String(event.getBody()); actions.add(content); } else { result = Status.BACKOFF; break; } } if (actions.size() > 0) { preparedStatement.clearBatch(); for (String temp : actions) { preparedStatement.setString(1, temp); preparedStatement.addBatch(); } preparedStatement.executeBatch(); conn.commit(); } transaction.commit(); } catch (Throwable e) { try { transaction.rollback(); } catch (Exception e2) { LOG.error("Exception in rollback. Rollback might not have been" + "successful.", e2); } LOG.error("Failed to commit transaction." + "Transaction rolled back.", e); Throwables.propagate(e); } finally { transaction.close(); } return result; } }
pom文件中的依赖:
<dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> </dependency> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-configuration</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.25</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <scope>test</scope> </dependency> </dependencies>
运行程序时,先在Mysql中创建一个表
mysql> create table mysqltest( -> id int(11) NOT NULL AUTO_INCREMENT, -> content varchar(50000) NOT NULL, -> PRIMARY KEY (`id`) -> ) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8; Query OK, 0 rows affected, 1 warning (0.05 sec)
然后在flume中创建以下配置
# User: 过往记忆 # Date: 14-9-4 # Time: 下午13:16 # bolg: # 本文地址:/archives/1109 # 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货 # 过往记忆博客微信公共帐号:iteblog_hadoop agent.sinks.mysqlSink.type = com.iteblog.flume.MysqlSink agent.sinks.mysqlSink.hostname=localhost agent.sinks.mysqlSink.port=3306 agent.sinks.mysqlSink.databaseName=ngmonitor agent.sinks.mysqlSink.tableName=mysqltest agent.sinks.mysqlSink.user=root agent.sinks.mysqlSink.password=123456 agent.sinks.mysqlSink.channel = c1
用下面的命令就可以启动:
bin/flume-ng agent -c conf/ -f conf/mysql_test.conf -n agent
再看下Mysql中的情况:
mysql> select count(*) from mysqltest; +----------+ | count(*) | +----------+ | 98300 | +----------+
好了,开发完成!上面的程序还可以改进,可以用Mybatis进行编写,将Flume处理逻辑和业务的处理逻辑分离开,这样下次只需要处理业务,Flume那块都不需要我们去考虑了,大大降低了编程的难度。具体怎么开发我就不说了,有需要请线下讨论。
本博客文章除特别声明,全部都是原创!原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Flume-ng与Mysql整合开发】(https://www.iteblog.com/archives/1109.html)
请问不报错也导不进去是什么原因?
你好,我想问一下channel和sources怎么配置
2015-12-07 01:22:50,250 (conf-file-poller-0) [ERROR - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:142)] Failed to load configuration data. Exception follows.
org.apache.flume.FlumeException: Unable to load sink type: com.flume.flumemysql.FlumeSinkMysql, class: com.flume.flumemysql.FlumeSinkMysql
at org.apache.flume.sink.DefaultSinkFactory.getClass(DefaultSinkFactory.java:71)
at org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:43)
at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:410)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:98)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: com.flume.flumemysql.FlumeSinkMysql
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:195)
at org.apache.flume.sink.DefaultSinkFactory.getClass(DefaultSinkFactory.java:69)
这是我全部改好之后遇到的错误 楼主 这是什么意思
你这没配对吧?
应该把编译好的包放到FLUME_HOME/lib目录下。
配置文件写错了 问题已经解决
请问什么配置写错了啊
希望能参考一下我的邮箱:1048277927@qq.com
我其实有个问题 上面的例子是将收集的日志直接存储到mysql里吧 但是Flume要怎么收集数据呢 初学者 抱歉
楼主你好,你这把能否提供本项目的源码文件 我最近也在做这个功能,但是由于刚接触flume 很多不会,希望能参考一下我的邮箱:hsq1194273706@qq.com
上面已经贴出本项目的全部代码呢,你自己去创建工程吧。。
楼主你好,你是把项目打包成jar包放到flume的lib下面执行flume启动命令么?
是的,然后可以和配置内置的sink一样去配置musqlsink
楼主 问你个很白痴的问题 我完全照着你的文档写的 但是我flume启动的时候指定监听的目录有去监听 但是数据库没有数据 试为什么
楼主 你flume配置文件里面 数据传输配置的是 channel吧 ? 如果在线希望速回
是channel的,数据库没有数据你看下是不是没有监听到。
那你的数据读取配置的是sources 还是source 还有 如果配置channel 那一条语句该怎么写 我申请了加群 希望楼主能审批下
监听到了 只是我配置文件写的是channels 这样写是不是上面的代码就找不到?所以没有存储成功
我不刚学哈
请问我要怎么测试呢
博主写得很好,多谢分享。
我这边使用Flume,是自定义Source,处理HTTP请求;自定义KafkaSink。Flume很优秀
kafkaSink我也写了。Flume功能是很强大。 😳