昨天我提到了如何在《Flink Streaming中实现多路文件输出(MultipleTextOutputFormat)》,里面我们实现了一个MultipleTextOutputFormatSinkFunction
类,其中封装了mutable.Map[String, TextOutputFormat[String]]
,然后根据key的不一样选择不同的TextOutputFormat
从而实现了文件的多路输出。本文将介绍如何在Flink batch模式下实现文件的多路输出,这种模式下比较简单,因为Flink内部提供了相应的API支持。
首先我们先实现一个自定义的IteblogMultipleTextOutputFormat
类,具体实现如下:
class IteblogMultipleTextOutputFormat[K, V] extends MultipleTextOutputFormat[K, V] { override def generateActualKey(key: K, value: V): K = NullWritable.get().asInstanceOf[K] override def generateFileNameForKeyValue(key: K, value: V, name: String): String = key.asInstanceOf[String] }
我们将Key作为文件的名称,然后我们可以将这个类对象封装到HadoopOutputFormat
中,如下:
val multipleTextOutputFormat = new IteblogMultipleTextOutputFormat[String, String]() val jc = new JobConf() val format = new HadoopOutputFormat[String, String](multipleTextOutputFormat, jc)
在DataSet
类中有个output
的方法,它可以将数据写入到实现了org.apache.flink.api.common.io.OutputFormat
接口的地方,而HadoopOutputFormat
类就是实现了这个接口,所以我们可以将HadoopOutputFormat
对象传进output方法中,如下:
batch.output(format)
最后我们别忘记指定存放到HDFS的什么路径:
FileOutputFormat.setOutputPath(jc, new Path("hdfs:///user/iteblog/"))
完整代码:
package com.iteblog import org.apache.flink.api.scala._ import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat import org.apache.hadoop.fs.Path import org.apache.hadoop.io.{NullWritable, Text} import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat import org.apache.hadoop.mapred.{FileOutputFormat, JobConf} /** * Created by on 2016/5/11. */ object FlinkBatch { class IteblogMultipleTextOutputFormat[K, V] extends MultipleTextOutputFormat[K, V] { override def generateActualKey(key: K, value: V): K = NullWritable.get().asInstanceOf[K] override def generateFileNameForKeyValue(key: K, value: V, name: String): String = key.asInstanceOf[String] } def main(args: Array[String]) { val env = ExecutionEnvironment.getExecutionEnvironment val multipleTextOutputFormat = new IteblogMultipleTextOutputFormat[String, String]() val jc = new JobConf() FileOutputFormat.setOutputPath(jc, new Path("hdfs:///user/iteblog/")) val format = new HadoopOutputFormat[String, String](multipleTextOutputFormat, jc) val batch = env.fromCollection(List(("A", "1"), ("A", "2"), ("A", "3"), ("B", "1"), ("B", "2"), ("C", "1"), ("D", "2"))) batch.output(format) env.execute("MultipleTextOutputFormat") } }
运行这个程序,我们可以在hdfs:///user/iteblog/
路径下看到如下的输出结果:
[iteblog@www.iteblog.com ~]$ hadoop fs -ls /user/iteblog/ Found 5 items -rw-r--r-- 3 iteblog supergroup 6 2016-05-11 19:05 /user/iteblog/A -rw-r--r-- 3 iteblog supergroup 4 2016-05-11 19:05 /user/iteblog/B -rw-r--r-- 3 iteblog supergroup 2 2016-05-11 19:05 /user/iteblog/C -rw-r--r-- 3 iteblog supergroup 2 2016-05-11 19:05 /user/iteblog/D -rw-r--r-- 3 iteblog supergroup 0 2016-05-11 19:05 /user/iteblog/_SUCCESS
可以看出已经根据key的不一样将数据输入到相应的文件了。
本博客文章除特别声明,全部都是原创!原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Flink batch模式多路文件输出(MultipleTextOutputFormat)】(https://www.iteblog.com/archives/1667.html)
楼主问下,MultipleTextOutputFormat可以用新版本中的MultipleOutputs去实现吗?能的话怎么做呢?求解谢谢
这个不是用新版的 MultipleOutputs 实现的,你可以试试,应该也是可以实现的。
不过社区好像已经有人要做这个了 :https://issues.apache.org/jira/browse/FLINK-11737
嗷嗷 好的吧~ 感谢大佬