问题:求震级>6的地震总数
要求:map reduce job分开写
针对你的地震数据,我写的代码解决方案如下,望采纳:
Map阶段代码:
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class EarthquakeMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> {
private static final IntWritable ONE = new IntWritable(1);
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 解析地震数据
String[] data = value.toString().split(",");
float magnitude = Float.parseFloat(data[1]);
// 如果震级大于6,输出(1,1)
if (magnitude > 6) {
context.write(ONE, ONE);
}
}
}
Reduce阶段代码:
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class EarthquakeReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
@Override
public void reduce(IntWritable key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
// 计数器初始化为0
int count = 0;
// 遍历所有(1,1)的二元组,计数器加1
for (IntWritable value : values) {
count += value.get();
}
// 输出震级大于6的地震的总数
context.write(key, new IntWritable(count));
}
}
数据源在哪里吖
你有基础没,我可以把我之前写过类似的发你,你照葫芦画瓢
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
public class SumValue {
public static class SumValueMapper extends Mapper<LongWritable, Text, IntWritable, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String level = value.toString().split(",")[1];
Integer value1 = Integer.valueOf(level);
if(value1>6){
//大于6级
context.write(new IntWritable(1),value);
}else {
//小于6级
context.write(new IntWritable(0),value);
}
}
}
public static class SumValueReducer extends Reducer< IntWritable, Text,Text, IntWritable> {
private int sumValue = 0;
@Override
protected void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
if (key.equals(1)){
context.write(new Text("地震大于6级的总数"),new IntWritable(Arrays.asList(values).size()));
}
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
String input = "F:\\123\\input\\abc.txt";
String output = "F:\\123\\output";
Configuration conf = new Configuration();
if (System.getProperty("os.name").toLowerCase().contains("win"))
conf.set("mapreduce.app-submission.cross-platform", "true");
Path path = new Path(output);
Job job = new Job(conf, "SumValue");
//job.setJar("./out/artifacts/hadoop_test_jar/hadoop-test.jar");
job.setJarByClass(SumValue.class);
job.setMapperClass(SumValueMapper.class);
job.setReducerClass(SumValueReducer.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPaths(job, input);
FileOutputFormat.setOutputPath(job, new Path(output));
boolean ret = job.waitForCompletion(true);
System.out.println(job.getJobName() + "-----" + ret);
}
}