在本博客的《Hadoop多文件输出:MultipleOutputFormat和MultipleOutputs深究(一)》《Hadoop多文件输出:MultipleOutputFormat和MultipleOutputs深究(二)》两篇文章中我介绍了如何在Hadoop中根据Key或者Value的不同将属于不同的类型记录写到不同的文件中。在里面用到了MultipleOutputFormat这个类。
因为Spark内部写文件方式其实调用的都是Hadoop那一套东西,所以我们也可以通过Spark实现多文件输出。不过遗憾的是,Spark内部没有多文件输出的函数供大家直接调用,值得欣慰的是,我们自己实现这个功能也是很简单的。我们可以通过调用saveAsHadoopFile函数并自定义一个OutputFormat类即可,代码如下:
/** * User: 过往记忆 * Date: 15-03-11 * Time: 上午08:24 * bolg: * 本文地址:/archives/1281 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货 * 过往记忆博客微信公共帐号:iteblog_hadoop */ import org.apache.hadoop.io.NullWritable import org.apache.spark._ import org.apache.spark.SparkContext._ import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] { override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = key.asInstanceOf[String] } object Split { def main(args: Array[String]) { val conf = new SparkConf().setAppName("SplitTest") val sc = new SparkContext(conf) sc.parallelize(List(("w", "www"), ("b", "blog"), ("c", "com"), ("w", "bt"))) .map(value => (value._1, value._2 + "Test")) .partitionBy(new HashPartitioner(3)) .saveAsHadoopFile("/iteblog", classOf[String], classOf[String], classOf[RDDMultipleTextOutputFormat]) sc.stop() } }
RDDMultipleTextOutputFormat类中的generateFileNameForKeyValue函数有三个参数,key和value就是我们RDD的Key和Value,而name参数是每个Reduce的编号。本例中没有使用该参数,而是直接将同一个Key的数据输出到同一个文件中。执行:
bin/spark-submit --master yarn-cluster --class Split ./iteblog-1.0-SNAPSHOT.jar
然后我们可以看到在HDFS上输出的文件列表如下:
[iteblog@master ]$ bin/hadoop fs -ls /iteblog Found 4 items -rw-r--r-- 3 iteblog hadoop2 0 2015-03-09 11:26 /iteblog/_SUCCESS -rw-r--r-- 3 iteblog hadoop2 11 2015-03-09 11:26 /iteblog/b -rw-r--r-- 3 iteblog hadoop2 10 2015-03-09 11:26 /iteblog/c -rw-r--r-- 3 iteblog hadoop2 19 2015-03-09 11:26 /iteblog/w [iteblog@master ]$ bin/hadoop fs -cat /iteblog/w w btTest w wwwTest
从上面的输出可以看出key为w的记录全部输出到文件名为w的文件中去了。
不过社区已经有人建议开发出saveAsTextFileByKey函数来实现该功能(SPARK-3533,
本博客文章除特别声明,全部都是原创!https://github.com/apache/spark/pull/4895
),很有可能会在Spark 1.4.0版本添加。原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Spark多文件输出(MultipleOutputFormat)】(https://www.iteblog.com/archives/1281.html)
文件的数量好像不能太多,否则好像会占用很多内存,导致outofmemoryerror
我采用了你的方法去保存数据,saveAsHadoopFile但是每次执行都会覆盖上一次的,我希望把同一天的数据追加到一个文件中怎么做呢?
内置的无法做到文件追加,需要你自己实现。可以做到同一天的数据放到同一个目录下,但是不是追加的形式。
访问日志需要按照/日期/访问IP.log形式汇总,提供给客户下载,不知道怎么办好了
这个也是可以实现的,但是需要你自己编写一个类似于HadoopWriter,然后按天切割。
需要怎么做呢?
@Override
protected RecordWriter<String, String> getBaseRecordWriter(FileSystem fs,
JobConf job, String name, Progressable arg3) throws IOException {
if (theTextOutputFormat == null) {
theTextOutputFormat = new MyTextOutputFormat<String, String>();
}
return theTextOutputFormat.getRecordWriter(fs, job, name, arg3);
}
private class MyTextOutputFormat<K,V> extends TextOutputFormat<K,V>{
@Override
public RecordWriter<K, V> getRecordWriter(FileSystem ignored,
JobConf job, String name, Progressable progress)
throws IOException {
boolean isCompressed = getCompressOutput(job);
String keyValueSeparator = job.get("mapreduce.output.textoutputformat.separator",
"\t");
if (!isCompressed) {
Path file = FileOutputFormat.getTaskOutputPath(job, name);
FileSystem fs = file.getFileSystem(job);
FSDataOutputStream fileOut = fs.create(file,false,fs.getConf().getInt("io.file.buffer.size", 4096),progress);
return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
} else {
Class<? extends CompressionCodec> codecClass =
getOutputCompressorClass(job, GzipCodec.class);
// create the named codec
CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job);
// build the filename including the extension
Path file =
FileOutputFormat.getTaskOutputPath(job,
name + codec.getDefaultExtension());
FileSystem fs = file.getFileSystem(job);
FSDataOutputStream fileOut = fs.create(file,false,fs.getConf().getInt("io.file.buffer.size", 4096),progress);
return new LineRecordWriter<K, V>(new DataOutputStream
(codec.createOutputStream(fileOut)),
keyValueSeparator);
}
}
}
我覆盖了 MultipleTextOutputFormat的getBaseRecordWriter方法依然不行
😈 xxxasfasdfg 艾丝凡
最后输出我希望是这样的
[iteblog@master ]$ bin/hadoop fs -ls /iteblog
Found 4 items
-rw-r--r-- 3 iteblog hadoop2 0 2015-03-09 11:26 /iteblog/_SUCCESS
-rw-r--r-- 3 iteblog hadoop2 11 2015-03-09 11:26 /iteblog/b
-rw-r--r-- 3 iteblog hadoop2 10 2015-03-09 11:26 /iteblog/c
-rw-r--r-- 3 iteblog hadoop2 19 2015-03-09 11:26 /iteblog/w
[iteblog@master ]$ bin/hadoop fs -cat /iteblog/w
btTest
wwwTest
很好办,把上面的RDDMultipleTextOutputFormat类定义成下面即可:
class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
override def generateActualKey(key: K, value: V): K =
NullWritable.get().asInstanceOf[K]
override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String =
key.asInstanceOf[String]
}
问题解决了 十分感谢博主! 请问如何学习spark和hadoop相关比较好 有什么可以推荐的书么?
博主 按照这篇文章提供的方式处理一个100G的数据文件时,发现输出的多个文件总记录书远远小于原文件,我猜测可能是spark在分块(128M)处理时,默认不是追加写文件的模式,应该是覆盖的方式,请问这个能够在哪里修改?
是因为漏掉了partitionBy这个步骤导致的,添加后没问题了。再次感谢博主:)
为啥漏掉 partitionBy结果不对呢,麻烦解答下哈,谢谢
为啥漏掉 partitionBy结果不对呢,麻烦解答下哈,谢谢
您好! 如果我只想在多输出文件中输入value,不输出key,如何实现呢? 本人刚刚接触spark,请多指教。
这个例子里输出文件的分隔符是\t,我怎么样可以换位:。比如w btTest
唤作w:btTest
如果你是Hadoop 1.x版本,可以设置mapred.textoutputformat.separator;如果你是Hadoop 2.x可以设置mapreduce.output.textoutputformat.separator。这个值的默认值就是制表符(\t)。
conf.set("mapreduce.textoutputformat.separator", ";");在hadoop下可以实现。也可在hdfs的配置文件里配置,但是现在在scala写的spark代码里怎么写呢?我用了sparkconf去set,好像不管用,不知道怎么弄?
直接写到hdfs的配置文件里应该也可以实现吧?