简单的MapReduce应用实现

一个简单的 MapReduce 应用程序的实现
阅读 Filter.java 文件中提供的分析 MapReduce 应用程序
该应用程序的功能相当于以下 SQL 语句的功能:
SELECT key, value
FROM sequence-of-key-value-pairs
WHERE value > given-value;
此任务的目标是使用 Filter.java 文件中提供的 Java 代码来实现 MapReduce 应用程序 Solution2,其功能与以下 SQL 语句的功能等效:
SELECT item-name, price-per-unit * total-units
FROM sales.txt
WHERE price-per-unit * total-units > given-value;
输入数据集 sales.txt 中的单行必须具有以下格式。
item-name price-per-unit total-units
For example:
bolt 2 25
washer 3 8
screw 7 20
nail 5 10
screw 7 2
bolt 2 20
bolt 2 30
drill 10 5
washer 3 8
文件 sales.txt 的内容由您决定,只要它与上面解释的格式一致即可。
给定值的值必须通过程序的参数传递。
将解决方案保存在文件 Solution2.java 中。
准备好在终端窗口中列出 Solution2.java 后,编译、创建 jar 文件并处理应用程序。在终端窗口中列出输入数据集 sales.txt
应用程序创建的结果。完成后,将终端屏幕中的所有消息复制并粘贴到文件solution2.txt中。

Filter.java:
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class Filter {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        conf.set("limit", otherArgs[0]);
 Job job = new Job(conf, "Distributed Filter");
 job.setJarByClass(Filter.class);
 job.setMapperClass(FilterMapper.class);
 job.setOutputKeyClass(Text.class);
 job.setOutputValueClass(IntWritable.class);
 job.setNumReduceTasks(0); // Set number of reducers to zero
 FileInputFormat.addInputPath(job, new Path(args[1]));
 FileOutputFormat.setOutputPath(job, new Path(args[2]));
 System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

 public static class FilterMapper
     extends Mapper<Object, Text, Text, IntWritable>{

     private final static IntWritable counter = new IntWritable(0);
     private Text word = new Text();
     private Integer total;
     private Integer limit;
     public void map(Object key, Text value, Context context
       ) throws IOException, InterruptedException {
  StringTokenizer itr = new StringTokenizer(value.toString());

  limit = Integer.parseInt( context.getConfiguration().get("limit") );

  while (itr.hasMoreTokens()) {
      word.set(itr.nextToken());
             total = Integer.parseInt(itr.nextToken());
      if ( total > limit )    
      { counter.set( total );
        context.write(word, counter); }
  }
     }
 }

}

【以下回答由 GPT 生成】

我可以帮你实现这个MapReduce应用程序。下面是Solution2.java的代码实现。

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class Solution2 {

  public static class Map extends Mapper<Object, Text, Text, Text> {

    private Text outputKey = new Text();
    private Text outputValue = new Text();

    public void map(Object key, Text value, Context context)
        throws IOException, InterruptedException {

      String line = value.toString();
      StringTokenizer tokenizer = new StringTokenizer(line);

      // 解析每一行的数据
      String itemName = tokenizer.nextToken();
      int pricePerUnit = Integer.parseInt(tokenizer.nextToken());
      int totalUnits = Integer.parseInt(tokenizer.nextToken());

      int totalPrice = pricePerUnit * totalUnits;

      // 根据条件过滤数据,只输出满足条件的结果
      int givenValue = context.getConfiguration().getInt("given_value", 0);
      if (totalPrice > givenValue) {
        outputKey.set(itemName);
        outputValue.set(String.valueOf(totalPrice));
        context.write(outputKey, outputValue);
      }
    }
  }

  public static class Reduce extends Reducer<Text, Text, Text, Text> {

    public void reduce(Text key, Iterable<Text> values, Context context)
        throws IOException, InterruptedException {

      // 计算每个item的总价值
      int totalPrice = 0;
      for (Text value : values) {
        totalPrice += Integer.parseInt(value.toString());
      }

      context.write(key, new Text(String.valueOf(totalPrice)));
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    conf.setInt("given_value", Integer.parseInt(otherArgs[0]));

    Job job = new Job(conf, "MapReduce Solution");
    job.setJarByClass(Solution2.class);

    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);

    FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));

    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

你可以将上面的代码拷贝到Solution2.java文件中,并使用以下命令编译和运行MapReduce应用程序:

hadoop com.sun.tools.javac.Main Solution2.java
jar cf solution2.jar Solution2*.class
hadoop jar solution2.jar Solution2 <given_value> <input_path> <output_path>

其中,<given_value>是给定的值,<input_path>是输入数据集的路径,<output_path>是输出结果的路径。

请注意,在执行上述命令之前,请确保Hadoop环境已正确设置,并且你已经将Hadoop的相关库文件(例如hadoop-common-x.x.x.jarhadoop-mapreduce-client-core-x.x.x.jar)添加到编译和运行命令中。

最后,程序将结果保存在文件solution2.txt中。


如果你已经解决了该问题, 非常希望你能够分享一下解决方案, 写成博客, 将相关链接放在评论区, 以帮助更多的人 ^-^