文章目录
对那些想快速把数据传输到其Hadoop集群的企业来说,Kafka是一个非常合适的选择。关于什么是Kafka我就不介绍了,大家可以参见我之前的博客:《Apache kafka入门篇:工作原理简介》
本文是面向技术人员编写的。阅读本文你将了解到我是如何通过Kafka把关系数据库管理系统(RDBMS)中的数据实时写入到Hive中,这将使得实时分析的用列得以实现。作为参考,本文使用的组件版本分别是Hive 1.2.1、 Flume 1.6 以及 Kafka 0.9。
总体解决架构
下图展示RDBMS中的事务数据如何结合Kafka、Flume以及Hive的事务特性写入到Hive表的总体解决方案。
7步把RDBMS的数据实时写入Hadoop
现在我们来深入这个解决方案的细节,我将展示你如何可以通过仅仅几步就把数据实时导入到Hadoop中。
1、从RDBMS中抽取数据
所有的关系型数据库都有一个日志文件用于记录最新的事务信息。我们流解决方案的第一步就是获取这些事务数据,并使得Hadoop可以解析这些事务格式。(关于如何解析这些事务日志,原作者并没有介绍,可能涉及到商业信息。)
2、启动Kafka Producer
将消息发送到Kafka主题的进程成为生产者。Topic将Kafka中同类消息写入到一起。RDBMS中的事务消息将被转换到Kafka的Topic中。对于我们的例子来说,我们有一个销售团队的数据库,其中的事务信息都会被发布到Kafka的Topic中,下面步骤是启动Kafka producer的必要步骤:
$ cd /usr/hdp/2.4.0.0-169/kafka $ bin/kafka-topics.sh --create --zookeeper www.iteblog.com:2181 --replication-factor 1 --partitions 1 --topic SalesDBTransactions Created topic "SalesDBTransactions". $ bin/kafka-topics.sh --list --zookeeper www.iteblog.com:2181 SalesDBTransactions
3、设置Hive
我们在Hive中创建一个表用于接收销售团队数据库的事务信息。这例子中我们将重建一个名为customers
的表:
[iteblog@sandbox ~]$ beeline -u jdbc:hive2:// -n hive -p hive 0: jdbc:hive2://> use raj; create table customers (id string, name string, email string, street_address string, company string) partitioned by (time string) clustered by (id) into 5 buckets stored as orc location '/user/iteblog/salescust' TBLPROPERTIES ('transactional'='true');
为了在Hive中启用事务,我们需要在Hive中进行如下的配置:
hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager
4、启动会一个Flume Agent用于将Kafka的数据写入到Hive
下面我们将创建一个Flume Agent,其将会把Kafka主题中的数据发送到Hive相应的表中。按照下面步骤在启动Flume agent之前设置好相关的环境变量:
$ pwd /home/iteblog/streamingdemo $ mkdir flume/checkpoint $ mkdir flume/data $ chmod 777 -R flume $ export HIVE_HOME=/usr/hdp/current/hive-server2 $ export HCAT_HOME=/usr/hdp/current/hive-webhcat $ pwd /home/iteblog/streamingdemo/flume $ mkdir logs
然后创建一个log4j properties文件:
[iteblog@sandbox conf]$ vi log4j.properties flume.root.logger=INFO,LOGFILE flume.log.dir=/home/iteblog/streamingdemo/flume/logs flume.log.file=flume.log
最后我们的Flume Agent配置如下:
$ vi flumetohive.conf flumeagent1.sources = source_from_kafka flumeagent1.channels = mem_channel flumeagent1.sinks = hive_sink # Define / Configure source flumeagent1.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSource flumeagent1.sources.source_from_kafka.zookeeperConnect = sandbox.hortonworks.com:2181 flumeagent1.sources.source_from_kafka.topic = SalesDBTransactions flumeagent1.sources.source_from_kafka.groupID = flume flumeagent1.sources.source_from_kafka.channels = mem_channel flumeagent1.sources.source_from_kafka.interceptors = i1 flumeagent1.sources.source_from_kafka.interceptors.i1.type = timestamp flumeagent1.sources.source_from_kafka.consumer.timeout.ms = 1000 # Hive Sink flumeagent1.sinks.hive_sink.type = hive flumeagent1.sinks.hive_sink.hive.metastore = thrift://sandbox.hortonworks.com:9083 flumeagent1.sinks.hive_sink.hive.database = raj flumeagent1.sinks.hive_sink.hive.table = customers flumeagent1.sinks.hive_sink.hive.txnsPerBatchAsk = 2 flumeagent1.sinks.hive_sink.hive.partition = %y-%m-%d-%H-%M flumeagent1.sinks.hive_sink.batchSize = 10 flumeagent1.sinks.hive_sink.serializer = DELIMITED flumeagent1.sinks.hive_sink.serializer.delimiter = , flumeagent1.sinks.hive_sink.serializer.fieldnames = id,name,email,street_address,company # Use a channel which buffers events in memory flumeagent1.channels.mem_channel.type = memory flumeagent1.channels.mem_channel.capacity = 10000 flumeagent1.channels.mem_channel.transactionCapacity = 100 # Bind the source and sink to the channel flumeagent1.sources.source_from_kafka.channels = mem_channel flumeagent1.sinks.hive_sink.channel = mem_channel
5、启动Flume Agent
使用下面的命令启动Flume Agent:
$ /usr/hdp/apache-flume-1.6.0/bin/flume-ng agent -n flumeagent1 -f ~/streamingdemo/flume/conf/flumetohive.conf
6、启动Kafka Stream
作为示例,下面是模拟交易的信息,在实际系统中这些信息将会被数据库产生:
$ cd /usr/hdp/2.4.0.0-169/kafka $ bin/kafka-console-producer.sh --broker-list sandbox.hortonworks.com:6667 --topic SalesDBTransactions 1,"Nero Morris","porttitor.interdum@Sedcongue.edu","P.O. Box 871, 5313 Quis Ave","Sodales Company" 2,"Cody Bond","ante.lectus.convallis@antebibendumullamcorper.ca","232-513 Molestie Road","Aenean Eget Magna Incorporated" 3,"Holmes Cannon","a@metusAliquam.edu","P.O. Box 726, 7682 Bibendum Rd.","Velit Cras LLP" 4,"Alexander Lewis","risus@urna.edu","Ap #375-9675 Lacus Av.","Ut Aliquam Iaculis Inc." 5,"Gavin Ortiz","sit.amet@aliquameu.net","Ap #453-1440 Urna. St.","Libero Nec Ltd" 6,"Ralph Fleming","sociis.natoque.penatibus@quismassaMauris.edu","363-6976 Lacus. St.","Quisque Fringilla PC" 7,"Merrill Norton","at.sem@elementum.net","P.O. Box 452, 6951 Egestas. St.","Nec Metus Institute" 8,"Nathaniel Carrillo","eget@massa.co.uk","Ap #438-604 Tellus St.","Blandit Viverra Corporation" 9,"Warren Valenzuela","tempus.scelerisque.lorem@ornare.co.uk","Ap #590-320 Nulla Av.","Ligula Aliquam Erat Incorporated" 10,"Donovan Hill","facilisi@augue.org","979-6729 Donec Road","Turpis In Condimentum Associates" 11,"Kamal Matthews","augue.ut@necleoMorbi.org","Ap #530-8214 Convallis, St.","Tristique Senectus Et Foundation"
7、接收Hive数据
完成上面所有步骤之后,现在你往Kafka发送数据,你将在几秒内看到数据被发送到Hive中。
本文翻译自:https://dzone.com/articles/kafka-in-action-7-steps-to-real-time-streaming-fro
本博客文章除特别声明,全部都是原创!原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Kafka实战:七步将RDBMS中的数据实时传输到Hadoop】(https://www.iteblog.com/archives/1771.html)
就是RDBMS已经传入的数据做了update
这样能够进行解析处理吗
不管是 Insert、Update还是 Delete,都是可以解析的。
OK .都是通过解析WAL来进行的吧 准备去尝试一下
Binlog里面有这些信息的,你可以去看下。
😮 如果数据有update这样应该如何处理
最关键的一步竟然不说了...binlog是怎么实时写到kafka的?