03_MapReduce框架原理_3.10 OutputFormat 数据输出类

发布时间:2022-06-27 发布网站:脚本宝典
脚本宝典收集整理的这篇文章主要介绍了03_MapReduce框架原理_3.10 OutputFormat 数据输出类脚本宝典觉得挺不错的,现在分享给大家,也给大家做个参考。

1. 说明

03_MapReduce框架原理_3.10 OutputFormat 数据输出类

 

 

 2. 常用实现类

03_MapReduce框架原理_3.10 OutputFormat 数据输出类

 3. 使用 自定义 OutputFormat类 

   步骤1 自定义 FileOutputFormat类 继承 FileOutputFormat

      重写 getRecordWrITer方法

         步骤2 自定义 RecordWriter类 继承 RecordWriter

      1. 创建 FileSystem对象

      2. 获取 输出流对象

      3. 写出数据

      4. 关闭流

         步骤3 驱动类中 指定OutputFormat 实现类

      1. job.setOutputFormatClass(classOf[CustOutputFormat])

      2. 指定 _SUCCESS 存放目录

          FileOutputFormat.setOutputPath(job, new Path("/output1"))

  4. 案例

package OutputFormatPk {

  import java.lang

  import org.@R_360_1057@.hadoop.conf.configuration
  import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem, Path}
  import org.apache.hadoop.io.{IOUtils, IntWritable, LongWritable, Text}
  import org.apache.hadoop.mapreduce.lib.input.FileinputFormat
  import org.apache.hadoop.maPReduce.lib.output.FileOutputFormat
  import org.apache.hadoop.mapreduce._

  import scala.util.matching.Regex

  // MapPEr 类
  class WCComMapper extends Mapper[LongWritable, Text, Text, IntWritable] {
    VAR text = new Text
    var intWritable = new IntWritable(1)

    // 每行记录调用一次map方法
    override def map(key: LongWritable, value: Text, context: Mapper[LongWritable, Text, Text, IntWritable]#Context) = {
      println("map enter .....")
      //1. 获取一行记录
      val line = value.toString

      //2. 切割
      val words = line.split(" ")

      //3. 输出到缓冲区
      words.foreach(
        key1 => {
          text.set(key1);
          context.write(text, intWritable)
        }
      )

    }
  }

  // Reducer 类
  class WCComReducer extends Reducer[Text, IntWritable, Text, IntWritable] {

    private val intWritable = new IntWritable

    // 每个key调用一次
    override def reduce(key: Text, values: lang.Iterable[IntWritable], context: Reducer[Text, IntWritable, Text, IntWritable]#Context) = {
      var sum: Int = 0
      // 1. 对词频数 求sum
      values.forEach(sum += _.get)

      // 2. 输出结果
      intWritable.set(sum)
      context.write(key, intWritable)

    }
  }


  // Driver
  object Driver {
    def main(args: Array[String]): Unit = {
      //1. 获取配置信息以及 获取job对象
      //读取配置文件  Configuration: core-default.XMl, core-site.xML
      var configuration = new Configuration
      var job: Job = Job.getInstance(configuration)

      //2. 注册本Driver程序的jar
      job.setJarByClass(this.getClass)

      job.setJobName("scala mr")

      //3. 注册 Mapper 和 Reducer的jar
      job.setMapperClass(classOf[WCComMapper])
      job.setReducerClass(classOf[WCComReducer])

      //4. 设置Mapper 类输出key-value 数据类型
      job.setMapOutputKeyClass(classOf[Text])
      job.setMapOutputValueClass(classOf[IntWritable])

      //5. 设置最终输出key-value 数据类型
      job.setOutputKeyClass(classOf[Text])
      job.setOutputValueClass(classOf[IntWritable])

      //6. 设置输入输出路
      FileInputFormat.setInputPaths(job, "src/main/data/input/1.txt")
      // 指定成功标识文件 _SUCCESS 存储目录
      //FileOutputFormat.setOutputPath(job, new Path("src/main/data/output1"))

      //7. 指定自定义的OutputFormat类
      job.setOutputFormatClass(classOf[CustOutputFormat])

      //8. 提交job
      val bool: Boolean = job.waitForCompletion(true)
      System.exit(bool match {
        case true => "0".toInt
        case false => "1".toInt
      })

    }


  }

  // 自定义 输出器
  // 泛型为 reduce输出的 key,value 类型
  class CustOutputFormat extends FileOutputFormat[Text, IntWritable] {
    // 获取 RecordWriter 对象
    override def getRecordWriter(job: TaskAttemptContext): RecordWriter[Text, IntWritable] = {
      new CustRecordWriter(job)
    }
  }

  // 自定义 RecordWriter类
  // 泛型为 reduce输出的 key,value 类型
  // 需求
  //   将 汉字存储到 ch.txt  将数字存储到 num.txt
  class CustRecordWriter(val job: TaskAttemptContext) extends RecordWriter[Text, IntWritable] {
    // 获取 FileSystem 对象(通过配置文件)
    val fs: FileSystem = FileSystem.get(job.getConfiguration)

    // 通过 FileSystem对象创建 输出流对象
    private val choutputStream: FSDataOutputStream = fs.create(new Path("src/main/data/output/ch.txt"))
    private val numoutputStream: FSDataOutputStream = fs.create(new Path("src/main/data/output/num.txt"))

    // 写出操作
    // 输入为 reduce的输出
    override def write(key: Text, value: IntWritable): Unit = {
      val regex ="""^d+$""".r

      if (regex.findFirstMatchIn(key.toString) != None) {
        numoutputStream.writeBytes(key.toString + "n")
      } else {
        choutputStream.writeUTF(key.toString)
      }

    }

    // 关流操作
    override def close(context: TaskAttemptContext): Unit = {
      IOUtils.closeStream(choutputStream)
      IOUtils.closeStream(numoutputStream)

    }
  }


}

 

脚本宝典总结

以上是脚本宝典为你收集整理的03_MapReduce框架原理_3.10 OutputFormat 数据输出类全部内容,希望文章能够帮你解决03_MapReduce框架原理_3.10 OutputFormat 数据输出类所遇到的问题。

如果觉得脚本宝典网站内容还不错,欢迎将脚本宝典推荐好友。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。