欢迎关注大数据技术架构与案例微信公众号:过往记忆大数据
过往记忆博客公众号iteblog_hadoop
欢迎关注微信公众号:
过往记忆大数据

Flume-ng与Mysql整合开发

  我们知道,Flume可以和许多的系统进行整合,包括了Hadoop、Spark、Kafka、Hbase等等;当然,强悍的Flume也是可以和Mysql进行整合,将分析好的日志存储到Mysql(当然,你也可以存放到pg、oracle等等关系型数据库)。

  不过我这里想多说一些:Flume是分布式收集日志的系统;既然都分布式了,数据量应该很大,为什么你要将Flume分析出来的数据用Mysql进行储存?能否在下面评论处留下你的使用场景呢?

如果想及时了解Spark、Hadoop或者Hbase相关的文章,欢迎关注微信公共帐号:iteblog_hadoop

其实,Flume和Mysql进行整合开发的过程也是相当的简单的。代码如下:

package com.iteblog.flume;

/**
 * User: 过往记忆
 * Date: 14-9-4
 * Time: 下午13:16
 * bolg: 
 * 本文地址:/archives/1109
 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
 * 过往记忆博客微信公共帐号:iteblog_hadoop
 */

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;

public class MysqlSink extends AbstractSink implements Configurable {

    private Logger LOG = LoggerFactory.getLogger(MysqlSink.class);
    private String hostname;
    private String port;
    private String databaseName;
    private String tableName;
    private String user;
    private String password;
    private PreparedStatement preparedStatement;
    private Connection conn;
    private int batchSize;

    public MysqlSink() {
        LOG.info("MysqlSink start...");
    }

    @Override
    public void configure(Context context) {
        hostname = context.getString("hostname");
        Preconditions.checkNotNull(hostname, "hostname must be set!!");
        port = context.getString("port");
        Preconditions.checkNotNull(port, "port must be set!!");
        databaseName = context.getString("databaseName");
        Preconditions.checkNotNull(databaseName, "databaseName must be set!!");
        tableName = context.getString("tableName");
        Preconditions.checkNotNull(tableName, "tableName must be set!!");
        user = context.getString("user");
        Preconditions.checkNotNull(user, "user must be set!!");
        password = context.getString("password");
        Preconditions.checkNotNull(password, "password must be set!!");
        batchSize = context.getInteger("batchSize", 100);
        Preconditions.checkNotNull(batchSize > 0, "batchSize must be a positive number!!");
    }

    @Override
    public void start() {
        super.start();
        try {
            //调用Class.forName()方法加载驱动程序
            Class.forName("com.mysql.jdbc.Driver");
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }

        String url = "jdbc:mysql://" + hostname + ":" + port + "/" + databaseName; 
        //调用DriverManager对象的getConnection()方法,获得一个Connection对象

        try {
            conn = DriverManager.getConnection(url, user, password);
            conn.setAutoCommit(false);
            //创建一个Statement对象
            preparedStatement = conn.prepareStatement("insert into " + tableName + 
                                               " (content) values (?)");

        } catch (SQLException e) {
            e.printStackTrace();
            System.exit(1);
        }

    }

    @Override
    public void stop() {
        super.stop();
        if (preparedStatement != null) {
            try {
                preparedStatement.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }

        if (conn != null) {
            try {
                conn.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }

    @Override
    public Status process() throws EventDeliveryException {
        Status result = Status.READY;
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        Event event;
        String content;

        List<String> actions = Lists.newArrayList();
        transaction.begin();
        try {
            for (int i = 0; i < batchSize; i++) {
                event = channel.take();
                if (event != null) {
                    content = new String(event.getBody());
                    actions.add(content);
                } else {
                    result = Status.BACKOFF;
                    break;
                }
            }

            if (actions.size() > 0) {
                preparedStatement.clearBatch();
                for (String temp : actions) {
                    preparedStatement.setString(1, temp);
                    preparedStatement.addBatch();
                }
                preparedStatement.executeBatch();

                conn.commit();
            }
            transaction.commit();
        } catch (Throwable e) {
            try {
                transaction.rollback();
            } catch (Exception e2) {
                LOG.error("Exception in rollback. Rollback might not have been" +
                        "successful.", e2);
            }
            LOG.error("Failed to commit transaction." +
                    "Transaction rolled back.", e);
            Throwables.propagate(e);
        } finally {
            transaction.close();
        }

        return result;
    }
}

pom文件中的依赖:

<dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-configuration</artifactId>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.25</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <scope>test</scope>
        </dependency>
</dependencies>

运行程序时,先在Mysql中创建一个表

mysql> create table mysqltest(
    -> id int(11) NOT NULL AUTO_INCREMENT,
    -> content varchar(50000) NOT NULL,
    -> PRIMARY KEY (`id`)
    -> ) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8;  
Query OK, 0 rows affected, 1 warning (0.05 sec)

然后在flume中创建以下配置

# User: 过往记忆
# Date: 14-9-4
# Time: 下午13:16
# bolg: 
# 本文地址:/archives/1109
# 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
# 过往记忆博客微信公共帐号:iteblog_hadoop

agent.sinks.mysqlSink.type = com.iteblog.flume.MysqlSink
agent.sinks.mysqlSink.hostname=localhost
agent.sinks.mysqlSink.port=3306
agent.sinks.mysqlSink.databaseName=ngmonitor
agent.sinks.mysqlSink.tableName=mysqltest
agent.sinks.mysqlSink.user=root
agent.sinks.mysqlSink.password=123456
agent.sinks.mysqlSink.channel = c1

用下面的命令就可以启动:

bin/flume-ng agent -c conf/ -f conf/mysql_test.conf  -n agent

再看下Mysql中的情况:

mysql> select count(*) from mysqltest;
+----------+
| count(*) |
+----------+
|    98300 | 
+----------+

  好了,开发完成!上面的程序还可以改进,可以用Mybatis进行编写,将Flume处理逻辑和业务的处理逻辑分离开,这样下次只需要处理业务,Flume那块都不需要我们去考虑了,大大降低了编程的难度。具体怎么开发我就不说了,有需要请线下讨论。

本博客文章除特别声明,全部都是原创!
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Flume-ng与Mysql整合开发】(https://www.iteblog.com/archives/1109.html)
喜欢 (40)
分享 (0)
发表我的评论
取消评论

表情
本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!
(21)个小伙伴在吐槽
  1. 请问不报错也导不进去是什么原因?

    勿忘心安❁҉҉҉҉2016-06-28 16:16 回复
  2. 你好,我想问一下channel和sources怎么配置

    你是我的遇见2016-04-15 15:59 回复
  3. 2015-12-07 01:22:50,250 (conf-file-poller-0) [ERROR - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:142)] Failed to load configuration data. Exception follows.
    org.apache.flume.FlumeException: Unable to load sink type: com.flume.flumemysql.FlumeSinkMysql, class: com.flume.flumemysql.FlumeSinkMysql
    at org.apache.flume.sink.DefaultSinkFactory.getClass(DefaultSinkFactory.java:71)
    at org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:43)
    at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:410)
    at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:98)
    at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
    Caused by: java.lang.ClassNotFoundException: com.flume.flumemysql.FlumeSinkMysql
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:195)
    at org.apache.flume.sink.DefaultSinkFactory.getClass(DefaultSinkFactory.java:69)
    这是我全部改好之后遇到的错误 楼主 这是什么意思

       疚痂2015-12-07 17:32 回复
    • 你这没配对吧?
      应该把编译好的包放到FLUME_HOME/lib目录下。

      w3970907702015-12-08 13:38 回复
      • 配置文件写错了 问题已经解决

           疚痂2015-12-08 13:39 回复
        • 请问什么配置写错了啊

          づ相识☆遇见℃2016-07-25 23:48 回复
        • 希望能参考一下我的邮箱:1048277927@qq.com

          づ相识☆遇见℃2016-07-25 23:49 回复
  4. 我其实有个问题 上面的例子是将收集的日志直接存储到mysql里吧 但是Flume要怎么收集数据呢 初学者 抱歉

    PERFECTISNOT2015-12-04 21:37 回复
  5. 楼主你好,你这把能否提供本项目的源码文件 我最近也在做这个功能,但是由于刚接触flume 很多不会,希望能参考一下我的邮箱:hsq1194273706@qq.com

       疚痂2015-12-04 17:40 回复
    • 上面已经贴出本项目的全部代码呢,你自己去创建工程吧。。

      w3970907702015-12-04 18:29 回复
      • 楼主你好,你是把项目打包成jar包放到flume的lib下面执行flume启动命令么?

           疚痂2015-12-07 09:58 回复
        • 是的,然后可以和配置内置的sink一样去配置musqlsink

          w3970907702015-12-07 10:25 回复
          • 楼主 问你个很白痴的问题 我完全照着你的文档写的 但是我flume启动的时候指定监听的目录有去监听 但是数据库没有数据 试为什么

               疚痂2015-12-07 10:35
          • 楼主 你flume配置文件里面 数据传输配置的是 channel吧 ? 如果在线希望速回

               疚痂2015-12-07 15:03
        • 是channel的,数据库没有数据你看下是不是没有监听到。

          左手牵右手TEL2015-12-07 16:10 回复
          • 那你的数据读取配置的是sources 还是source 还有 如果配置channel 那一条语句该怎么写 我申请了加群 希望楼主能审批下

               疚痂2015-12-07 16:20
          • 监听到了 只是我配置文件写的是channels 这样写是不是上面的代码就找不到?所以没有存储成功

               疚痂2015-12-07 16:30
  6. 我不刚学哈

    PERFECTISNOT2015-12-04 17:31 回复
  7. 请问我要怎么测试呢

    PERFECTISNOT2015-12-04 17:14 回复
  8. 博主写得很好,多谢分享。
    我这边使用Flume,是自定义Source,处理HTTP请求;自定义KafkaSink。Flume很优秀

    aaronHadoop2014-09-06 16:24 回复
    • kafkaSink我也写了。Flume功能是很强大。 😳

      w3970907702014-09-07 09:55 回复