欢迎关注大数据技术架构与案例微信公众号:过往记忆大数据
过往记忆博客公众号iteblog_hadoop
欢迎关注微信公众号:
过往记忆大数据

Flink batch模式多路文件输出(MultipleTextOutputFormat)

  昨天我提到了如何在《Flink Streaming中实现多路文件输出(MultipleTextOutputFormat)》,里面我们实现了一个MultipleTextOutputFormatSinkFunction类,其中封装了mutable.Map[String, TextOutputFormat[String]],然后根据key的不一样选择不同的TextOutputFormat从而实现了文件的多路输出。本文将介绍如何在Flink batch模式下实现文件的多路输出,这种模式下比较简单,因为Flink内部提供了相应的API支持。

首先我们先实现一个自定义的IteblogMultipleTextOutputFormat类,具体实现如下:

class IteblogMultipleTextOutputFormat[K, V] 
    extends MultipleTextOutputFormat[K, V] {
  override def generateActualKey(key: K, value: V): K =
    NullWritable.get().asInstanceOf[K]

  override def generateFileNameForKeyValue(key: K, value: V, name: String): String =
    key.asInstanceOf[String]
}

我们将Key作为文件的名称,然后我们可以将这个类对象封装到HadoopOutputFormat中,如下:

val multipleTextOutputFormat = new IteblogMultipleTextOutputFormat[String, String]()
val jc = new JobConf()
val format = new HadoopOutputFormat[String, String](multipleTextOutputFormat, jc)

DataSet类中有个output的方法,它可以将数据写入到实现了org.apache.flink.api.common.io.OutputFormat接口的地方,而HadoopOutputFormat类就是实现了这个接口,所以我们可以将HadoopOutputFormat对象传进output方法中,如下:

batch.output(format)

最后我们别忘记指定存放到HDFS的什么路径:

FileOutputFormat.setOutputPath(jc, new Path("hdfs:///user/iteblog/"))

完整代码:

package com.iteblog

import org.apache.flink.api.scala._
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{NullWritable, Text}
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
import org.apache.hadoop.mapred.{FileOutputFormat, JobConf}

/**
  * Created by  on 2016/5/11.
  */
object FlinkBatch {
  class IteblogMultipleTextOutputFormat[K, V] extends MultipleTextOutputFormat[K, V] {
    override def generateActualKey(key: K, value: V): K =
      NullWritable.get().asInstanceOf[K]

    override def generateFileNameForKeyValue(key: K, value: V, name: String): String =
      key.asInstanceOf[String]
  }

  def main(args: Array[String]) {
    val env = ExecutionEnvironment.getExecutionEnvironment

    val multipleTextOutputFormat = new IteblogMultipleTextOutputFormat[String, String]()
    val jc = new JobConf()
    FileOutputFormat.setOutputPath(jc, new Path("hdfs:///user/iteblog/"))
    val format = new HadoopOutputFormat[String, String](multipleTextOutputFormat, jc)
    val batch = env.fromCollection(List(("A", "1"), ("A", "2"), ("A", "3"),
      ("B", "1"), ("B", "2"), ("C", "1"), ("D", "2")))
    batch.output(format)
    env.execute("MultipleTextOutputFormat")
  }
}

运行这个程序,我们可以在hdfs:///user/iteblog/路径下看到如下的输出结果:

[iteblog@www.iteblog.com ~]$ hadoop fs -ls /user/iteblog/
Found 5 items
-rw-r--r--   3 iteblog supergroup          6 2016-05-11 19:05 /user/iteblog/A
-rw-r--r--   3 iteblog supergroup          4 2016-05-11 19:05 /user/iteblog/B
-rw-r--r--   3 iteblog supergroup          2 2016-05-11 19:05 /user/iteblog/C
-rw-r--r--   3 iteblog supergroup          2 2016-05-11 19:05 /user/iteblog/D
-rw-r--r--   3 iteblog supergroup          0 2016-05-11 19:05 /user/iteblog/_SUCCESS

可以看出已经根据key的不一样将数据输入到相应的文件了。

本博客文章除特别声明,全部都是原创!
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Flink batch模式多路文件输出(MultipleTextOutputFormat)】(https://www.iteblog.com/archives/1667.html)
喜欢 (6)
分享 (0)
发表我的评论
取消评论

表情
本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!
(3)个小伙伴在吐槽
  1. 楼主问下,MultipleTextOutputFormat可以用新版本中的MultipleOutputs去实现吗?能的话怎么做呢?求解谢谢

    :)2019-02-22 17:26 回复
    • 这个不是用新版的 MultipleOutputs 实现的,你可以试试,应该也是可以实现的。
      不过社区好像已经有人要做这个了 :https://issues.apache.org/jira/browse/FLINK-11737

      w3970907702019-02-25 09:51 回复
      • 嗷嗷 好的吧~ 感谢大佬

        :)2019-02-25 10:14 回复