mapreduce下的求和(运行成功,必采纳)急急急

问题:求震级>6的地震总数
要求:map reduce job分开写

img


以上部分数据代表时刻,震级,经度,纬度,深度,地区。

针对你的地震数据,我写的代码解决方案如下,望采纳:

Map阶段代码:

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class EarthquakeMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> {

  private static final IntWritable ONE = new IntWritable(1);

  @Override
  public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    // 解析地震数据
    String[] data = value.toString().split(",");
    float magnitude = Float.parseFloat(data[1]);
    // 如果震级大于6,输出(1,1)
    if (magnitude > 6) {
      context.write(ONE, ONE);
    }
  }
}

Reduce阶段代码:

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class EarthquakeReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {

  @Override
  public void reduce(IntWritable key, Iterable<IntWritable> values, Context context)
      throws IOException, InterruptedException {
    // 计数器初始化为0
    int count = 0;
    // 遍历所有(1,1)的二元组,计数器加1
    for (IntWritable value : values) {
      count += value.get();
    }
    // 输出震级大于6的地震的总数
    context.write(key, new IntWritable(count));
  }
}

数据源在哪里吖

你有基础没,我可以把我之前写过类似的发你,你照葫芦画瓢


import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URI;
import java.util.Arrays;

public class SumValue {

    public static class SumValueMapper extends Mapper<LongWritable, Text, IntWritable, Text> {



        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {


          String level = value.toString().split(",")[1];
            Integer value1 = Integer.valueOf(level);
            if(value1>6){
                //大于6级
                context.write(new IntWritable(1),value);
            }else {
                //小于6级
                context.write(new IntWritable(0),value);
            }

        }


    }


    public static class SumValueReducer extends Reducer< IntWritable, Text,Text, IntWritable> {

        private int sumValue = 0;
        @Override
        protected void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

            if (key.equals(1)){
                context.write(new Text("地震大于6级的总数"),new IntWritable(Arrays.asList(values).size()));
            }

        }


    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {


        String input = "F:\\123\\input\\abc.txt";
        String output = "F:\\123\\output";
        Configuration conf = new Configuration();
        if (System.getProperty("os.name").toLowerCase().contains("win"))
            conf.set("mapreduce.app-submission.cross-platform", "true");

        Path path = new Path(output);


        Job job = new Job(conf, "SumValue");
        //job.setJar("./out/artifacts/hadoop_test_jar/hadoop-test.jar");
        job.setJarByClass(SumValue.class);
        job.setMapperClass(SumValueMapper.class);
        job.setReducerClass(SumValueReducer.class);
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPaths(job,  input);
        FileOutputFormat.setOutputPath(job, new Path(output));


        boolean ret = job.waitForCompletion(true);
        System.out.println(job.getJobName() + "-----" + ret);
    }
}