欢迎关注大数据技术架构与案例微信公众号:过往记忆大数据
过往记忆博客公众号iteblog_hadoop
欢迎关注微信公众号:
过往记忆大数据

使用MapReduce读取XML文件

  XML(可扩展标记语言,英语:eXtensible Markup Language,简称: XML)是一种标记语言,也是行业标准数据交换交换格式,它很适合在系统之间进行数据存储和交换(话说Hadoop、Hive等的配置文件就是XML格式的)。本文将介绍如何使用MapReduce来读取XML文件。但是Hadoop内部是无法直接解析XML文件;而且XML格式中没有同步标记,所以并行地处理单个XML文件比较棘手。本文将一一介绍如何使用MapReduce处理XML文件,并且能够对其进行分割,然后并行处理这些分片。

  MapReduce里面读取文件一般都是使用InputFormat类进行的,比如读取文本文件可以使用TextInputFormat类进行(关于InputFormat类的源码分析可以参见《MapReduce数据输入中InputFormat类源码解析》),所以我们在MapReduce中解析XML文件也可以自定义类似的XMLInputFormat。熟悉Mahout的同学应该知道,它里面实现了一个XmlInputFormat(https://github.com/apache/mahout/blob/ad84344e4055b1e6adff5779339a33fa29e1265d/examples/src/main/java/org/apache/mahout/classifier/bayes/XmlInputFormat.java)类,我们可以指定指定的开始和结束标记来分隔XML文件,然后我们就可以像解析Text文件一样来解析Xml文件,如下:

Configuration conf = new Configuration();
conf.set("xmlinput.start", "<property>");
conf.set("xmlinput.end", "</property>");

Job job = new Job(conf);

 job.setInputFormatClass(XmlInputFormat.class);

指定了xmlinput.startxmlinput.end就可以搜索XML文件中的开始和结束标记,从而获取到XML里面的数据。完整的map程序如下:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.slf4j.*;

import javax.xml.stream.*;
import java.io.*;

public static class Map extends Mapper<LongWritable, Text, Text, Text> {

    @Override
    protected void map(LongWritable key, Text value,
                       Mapper.Context context)
        throws   IOException, InterruptedException {
      String document = value.toString();
      System.out.println("'" + document + "'");
      try {
        XMLStreamReader reader =
            XMLInputFactory.newInstance().createXMLStreamReader(new
                ByteArrayInputStream(document.getBytes()));
        String propertyName = "";
        String propertyValue = "";
        String currentElement = "";
        while (reader.hasNext()) {
          int code = reader.next();
          switch (code) {
            case START_ELEMENT:
              currentElement = reader.getLocalName();
              break;
            case CHARACTERS:
              if (currentElement.equalsIgnoreCase("name")) {
                propertyName += reader.getText();
              } else if (currentElement.equalsIgnoreCase("value")) {
                propertyValue += reader.getText();
              }
              break;
          }
        }
        reader.close();
        context.write(propertyName.trim(), propertyValue.trim());
      } catch (Exception e) {
        log.error("Error processing '" + document + "'", e);
      }
}

map接收一个Text的值,里面的值包含了开始和结束的标记之间的数据,然后我们就可以使用java内置的XML Streaming API解析器提取每个属性的key和value,最后输出key和value。完整的代码如下:

package com.iteblog.hadoop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.slf4j.*;

import javax.xml.stream.*;
import java.io.*;

import static javax.xml.stream.XMLStreamConstants.*;

public final class XML{
  private static final Logger log = LoggerFactory.getLogger
      (HadoopPropertyXMLMapReduce.class);

  public static class Map extends Mapper<LongWritable, Text, Text, Text> {

    @Override
    protected void map(LongWritable key, Text value,
                       Mapper.Context context)
        throws IOException, InterruptedException {
      String document = value.toString();
      System.out.println("'" + document + "'");
      try {
        XMLStreamReader reader =
            XMLInputFactory.newInstance().createXMLStreamReader(new
                ByteArrayInputStream(document.getBytes()));
        String propertyName = "";
        String propertyValue = "";
        String currentElement = "";
        while (reader.hasNext()) {
          int code = reader.next();
          switch (code) {
            case START_ELEMENT:
              currentElement = reader.getLocalName();
              break;
            case CHARACTERS:
              if (currentElement.equalsIgnoreCase("name")) {
                propertyName += reader.getText();
              } else if (currentElement.equalsIgnoreCase("value")) {
                propertyValue += reader.getText();
              }
              break;
          }
        }
        reader.close();
        context.write(propertyName.trim(), propertyValue.trim());
      } catch (Exception e) {
        log.error("Error processing '" + document + "'", e);
      }
    }
  }

  public static void main(String... args) throws Exception {
    runJob(args[0], args[1]);
  }

  public static void runJob(String input,
                            String output)
      throws Exception {
    Configuration conf = new Configuration();
    conf.set("key.value.separator.in.input.line", " ");
    conf.set("xmlinput.start", "<property>");
    conf.set("xmlinput.end", "</property>");

    Job job = new Job(conf);
    job.setJarByClass(XML.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    job.setMapperClass(Map.class);
    job.setInputFormatClass(XmlInputFormat.class);
    job.setNumReduceTasks(0);
    job.setOutputFormatClass(TextOutputFormat.class);

    FileInputFormat.setInputPaths(job, new Path(input));
    Path outPath = new Path(output);
    FileOutputFormat.setOutputPath(job, outPath);
    outPath.getFileSystem(conf).delete(outPath, true);

    job.waitForCompletion(true);
  }
}

我们可以使用hadoop内部的core-site.xml文件做测试,如下:

$ bin/hadoop jar xml.jar  com.iteblog.hadoop.XML  input/core-site.xml  output/

运行完上面的程序,可以在output目录下产生几个part*的文件,这说明解析XML文件的程序起作用了。

本博客文章除特别声明,全部都是原创!
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【使用MapReduce读取XML文件】(https://www.iteblog.com/archives/1596.html)
喜欢 (7)
分享 (0)
发表我的评论
取消评论

表情
本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!
(1)个小伙伴在吐槽
  1. 一直关注博主的这个blog和微信公众号,学习了很多东西,希望能开个复制功能,本人承诺转发时候一定带上链接,并且我也不在about云上发文章,谢谢博主 😀

    朗道二级相变2016-12-29 17:37 回复