在Flink中我们可以很容易的使用内置的API来读取HDFS上的压缩文件,内置支持的压缩格式包括.deflate,.gz, .gzip,.bz2以及.xz等。
但是如果我们想使用Flink内置sink API将数据以压缩的格式写入到HDFS上,好像并没有找到有API直接支持(如果不是这样的,欢迎留言纠正)。本文将介绍如何将数据以gz压缩格式将处理后的数据写入到HDFS上。主要实现代码如下:
import org.apache.hadoop.fs.Path import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.{Text, LongWritable} import org.apache.hadoop.mapred.{FileOutputFormat, JobConf, TextOutputFormat, TextInputFormat} imoprt org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat val benv = ExecutionEnvironment.getExecutionEnvironment val input = benv.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath) val text = input map { _._2.toString } val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } .map { (_, 1) } .groupBy(0) .sum(1) val words = counts map { t => (new Text(t._1), new LongWritable(t._2)) } val hadoopOutputFormat = new HadoopOutputFormat[Text,LongWritable]( new TextOutputFormat[Text, LongWritable], new JobConf) val c = classOf[org.apache.hadoop.io.compress.GzipCodec] hadoopOutputFormat.getJobConf.set("mapred.textoutputformat.separator", " ") hadoopOutputFormat.getJobConf.setCompressMapOutput(true) hadoopOutputFormat.getJobConf.set("mapred.output.compress", "true") hadoopOutputFormat.getJobConf.setMapOutputCompressorClass(c) hadoopOutputFormat.getJobConf.set("mapred.output.compression.codec", c.getCanonicalName) hadoopOutputFormat.getJobConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString) FileOutputFormat.setOutputPath(hadoopOutputFormat.getJobConf, new Path("/tmp/iteblog/")) words.output(hadoopOutputFormat) benv.execute("Hadoop Compat WordCount")
关键就是上面高亮的几行代码,然后运行上面的程序,即可以gz压缩方式将最后处理完的数据写入到HDFS上。
本博客文章除特别声明,全部都是原创!原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【通过Flink将数据以压缩的格式写入HDFS】(https://www.iteblog.com/archives/1875.html)