Spark和Kafka都是比较常用的两个大数据框架,Spark里面提供了对Kafka读写的支持。默认情况下我们Kafka只能写Byte数组到Topic里面,如果我们想往Topic里面读写String类型的消息,可以分别使用Kafka里面内置的StringEncoder编码类和StringDecoder解码类。那如果我们想往Kafka里面写对象怎么办?
别担心,Kafka中的kafka.serializer里面有Decoder和Encoder两个trait,这两个trait就是Kafka Topic消息相关的解码类和编码类,内置的StringDecoder和StringEncoder类分别都是继承那两个trait的。直接将String对象用给定的编码转换成Byte数组。来看下Decoder和Encoder两个trait的实现:
/** * A decoder is a method of turning byte arrays into objects. * An implementation is required to provide a constructor that * takes a VerifiableProperties instance. */ trait Decoder[T] { def fromBytes(bytes: Array[Byte]): T } /** * An encoder is a method of turning objects into byte arrays. * An implementation is required to provide a constructor that * takes a VerifiableProperties instance. */ trait Encoder[T] { def toBytes(t: T): Array[Byte] }
也就是说,我们自定义的编码和解码类只需要分别实现toBytes和fromBytes函数即可。那我们如何将对象转换成Byte数组,并且如何将Byte数组转换回对象呢?记得Java中写对象的类没?我们可以用ByteArrayOutputStream
并结合ObjectOutputStream
类将对象转换成Byte数组;并用ByteArrayInputStream
结合ObjectInputStream
类将Byte数组转换回对象。这不就实现了吗??废话不多说,来看看怎么实现:
class IteblogDecoder[T](props: VerifiableProperties = null) extends Decoder[T] { def fromBytes(bytes: Array[Byte]): T = { var t: T = null.asInstanceOf[T] var bi: ByteArrayInputStream = null var oi: ObjectInputStream = null try { bi = new ByteArrayInputStream(bytes) oi = new ObjectInputStream(bi) t = oi.readObject().asInstanceOf[T] } catch { case e: Exception => { e.printStackTrace(); null } } finally { bi.close() oi.close() } t } } class IteblogEncoder[T](props: VerifiableProperties = null) extends Encoder[T] { override def toBytes(t: T): Array[Byte] = { if (t == null) null else { var bo: ByteArrayOutputStream = null var oo: ObjectOutputStream = null var byte: Array[Byte] = null try { bo = new ByteArrayOutputStream() oo = new ObjectOutputStream(bo) oo.writeObject(t) byte = bo.toByteArray } catch { case ex: Exception => return byte } finally { bo.close() oo.close() } byte } } }
这样我们就定义了自己的编码和解码器。那如何使用呢??假设我们有一个Person类。如下:
case class Person(var name: String, var age: Int)
我们可以在发送数据这么使用:
def getProducerConfig(brokerAddr: String): Properties = { val props = new Properties() props.put("metadata.broker.list", brokerAddr) props.put("serializer.class", classOf[IteblogEncoder[Person]].getName) props.put("key.serializer.class", classOf[StringEncoder].getName) props } def sendMessages(topic: String, messages: List[Person], brokerAddr: String) { val producer = new Producer[String, Person]( new ProducerConfig(getProducerConfig(brokerAddr))) producer.send(messages.map { new KeyedMessage[String, Person](topic, "Iteblog", _) }: _*) producer.close() } def main(args: Array[String]) { val sparkConf = new S parkConf().setAppName(this.getClass.getSimpleName) val ssc = new StreamingContext(sparkConf, Milliseconds(500)) val topic = args(0) val brokerAddr = ":9092" val data = List(Person("wyp", 23), Person("spark", 34), Person("kafka", 23), Person("iteblog", 23)) sendMessages(topic, data, brokerAddr) }
在接收端可以这么使用
val sparkConf = new S parkConf().setAppName(this.getClass.getSimpleName) val ssc = new StreamingContext(sparkConf, Milliseconds(500)) val (topic, groupId) = (args(0), args(1)) val kafkaParams = Map("zookeeper.connect" -> ":2181", "group.id" -> groupId, "auto.offset.reset" -> "smallest") val stream = KafkaUtils.createStream[String, Person, StringDecoder, IteblogDecoder[Person]](ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY) stream.foreachRDD(rdd => { if (rdd.count() != 0) { rdd.foreach(item => if (item != null) println(item)) } else { println("Empty rdd!!") } }) ssc.start() ssc.awaitTermination() ssc.stop()
这样可以发送任意可序列化的对象了。下面是效果:
Empty rdd!! (Iteblog,Person(wyp,23)) (Iteblog,Person(spark,34)) (Iteblog,Person(kafka,23)) (Iteblog,Person(iteblog,23)) Empty rdd!! Empty rdd!! Empty rdd!! Empty rdd!!
在例子中我们只是简单的将接收到的消息打印到控制台。如果没有接收到消息,则打印Empty rdd!!。
本博客文章除特别声明,全部都是原创!原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【用Spark往Kafka里面写对象设计与实现】(https://www.iteblog.com/archives/1296.html)
producer.send(messages.map {
}: _*)
大哥这块代码没看懂 {new KeyedMessage[String, Person](topic, "Iteblog", _)} 后面这个: _*怎么理解
这个是scala语法, :_* 作为一个整体,告诉编译器你希望将某个参数当作参数序列处理!例如val s = sum(1 to 5:_*)就是将1 to 5当作参数序列处理。可以看下下面的例子:
您好,我用java实现了一个类似的功能,但是spark streaming却不能接收kafka中自定义的对象,能否帮忙看下是什么问题呢?具体情况我在帖子中描述了,http://www....com/thread-13405-1-1.html, 谢谢
不能接收是不能解析接收到的内容,还是根本就没读到数?如果是前者,那么很有可能是因为你的Encoder没起作用;如果是后者,那么你得看看你Kafka里面是否真的有内容。
除了这些,你运行程序里面是否有什么异常信息呢?
日志上出现了以下错误:
我使用的SerializationUtils.serialize和SerializationUtils.deserialize进行序列化和反序列化,为什么会报上面的错误?
从你的错误看,是反序列化的时候出错;好像是在解析‘;’符号出现问题的,我建议你看下你日志数据本身是否有问题,是否有字符集的问题。
好的,谢谢,我再检查下
您好,我修改了字符集之后,可以正常接收到数据,但是解码的时候出现了新的问题:
15/05/30 11:09:03 ERROR KafkaReceiver: Error handling message; exiting
org.apache.commons.lang.SerializationException: java.lang.ClassNotFoundException: com.chinatime.spark.streaming.kafka.simpleTest.Student
at org.apache.commons.lang.SerializationUtils.deserialize(SerializationUtils.java:166)
at org.apache.commons.lang.SerializationUtils.deserialize(SerializationUtils.java:193)
at com.chinatime.spark.streaming.kafka.simpleTest.StudentDecoder.fromBytes(StudentDecoder.java:66)
at com.chinatime.spark.streaming.kafka.simpleTest.StudentDecoder.fromBytes(StudentDecoder.java:1)
at kafka.message.MessageAndMetadata.message(MessageAndMetadata.scala:32)
at org.apache.spark.streaming.kafka.KafkaReceiver$MessageHandler.run(KafkaInputDStream.scala:134)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
我是使用如下命令提交的任务,而且上面所说的Student.class就在KafkaTest.jar包中
bin/spark-submit --jars /opt/cloudera/parcels/CDH/lib/spark/lib/spark-streaming-kafka-assembly_2.10-1.3.1.jar --class com.chinatime.spark.streaming.kafka.simpleTest.KafkaTest /tmp/KafkaTest.jar
并且在构建sparkContext时,使用sparkConf.setJars(new String[] { "/tmp/KafkaTest.jar", "/opt/cloudera/parcels/CDH/lib/spark/lib/spark-streaming-kafka-assembly_2.10-1.3.1.jar"});添加了jar
在Application UI中的Classpath Entries中看到如下信息
http://192.168.0.53:60922/jars/KafkaTest.jar Added By User
http://192.168.0.53:60922/jars/spark-streaming-kafka-assembly_2.10-1.3.1.jar Added By User
对于上面的问题,您有什么建议吗?谢谢^_^
您好,我使用您文章中提到的“用ByteArrayOutputStream并结合ObjectOutputStream类将对象转换成Byte数组;并用ByteArrayInputStream结合ObjectInputStream类将Byte数组转换回对象”方法替换了SerializationUtils.serialize和SerializationUtils.deserialize,可以正常解码了,可这是为什么呢?
感謝分享!
另外,在文章右上角的twitter連結,都會連結到google+
若能提供正確tiwwter之連結分享,將會更方便!TKS!
hi,非常感谢你的建议,在国内那两个网站是'不存在'的,所以当时就随便添加了。