简单的MapReduce应用实现

一个简单的 MapReduce 应用程序的实现
阅读 Filter.java 文件中提供的分析 MapReduce 应用程序
该应用程序的功能相当于以下 SQL 语句的功能:
SELECT key, value
FROM sequence-of-key-value-pairs
WHERE value > given-value;
此任务的目标是使用 Filter.java 文件中提供的 Java 代码来实现 MapReduce 应用程序 Solution2,其功能与以下 SQL 语句的功能等效:
SELECT item-name, price-per-unit * total-units
FROM sales.txt
WHERE price-per-unit * total-units > given-value;
输入数据集 sales.txt 中的单行必须具有以下格式。
item-name price-per-unit total-units
For example:
bolt 2 25
washer 3 8
screw 7 20
nail 5 10
screw 7 2
bolt 2 20
bolt 2 30
drill 10 5
washer 3 8
文件 sales.txt 的内容由您决定,只要它与上面解释的格式一致即可。
给定值的值必须通过程序的参数传递。
将解决方案保存在文件 Solution2.java 中。
准备好在终端窗口中列出 Solution2.java 后,编译、创建 jar 文件并处理应用程序。在终端窗口中列出输入数据集 sales.txt
应用程序创建的结果。完成后,将终端屏幕中的所有消息复制并粘贴到文件solution2.txt中。

Filter.java:
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class Filter {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        conf.set("limit", otherArgs[0]);
 Job job = new Job(conf, "Distributed Filter");
 job.setJarByClass(Filter.class);
 job.setMapperClass(FilterMapper.class);
 job.setOutputKeyClass(Text.class);
 job.setOutputValueClass(IntWritable.class);
 job.setNumReduceTasks(0); // Set number of reducers to zero
 FileInputFormat.addInputPath(job, new Path(args[1]));
 FileOutputFormat.setOutputPath(job, new Path(args[2]));
 System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

 public static class FilterMapper
     extends Mapper<Object, Text, Text, IntWritable>{

     private final static IntWritable counter = new IntWritable(0);
     private Text word = new Text();
     private Integer total;
     private Integer limit;
     public void map(Object key, Text value, Context context
       ) throws IOException, InterruptedException {
  StringTokenizer itr = new StringTokenizer(value.toString());

  limit = Integer.parseInt( context.getConfiguration().get("limit") );

  while (itr.hasMoreTokens()) {
      word.set(itr.nextToken());
             total = Integer.parseInt(itr.nextToken());
      if ( total > limit )    
      { counter.set( total );
        context.write(word, counter); }
  }
     }
 }

}

【以下回答由 GPT 生成】

对于优化这个问题,我们可以考虑以下几个方面:

  1. 优化MapReduce代码逻辑:在Filter.java文件中,我们可以对代码进行简化和优化,尽量减少不必要的计算和数据传输。

  2. 并行处理数据:我们可以使用多个Map和Reduce任务并行处理数据,提高整体的处理速度。

  3. 使用合适的数据结构和算法:在代码中选择合适的数据结构和算法,可以减少内存和计算资源的使用,提高程序的效率。

以下是一个可能的解决方案:

首先,创建一个Solution2.java文件,将优化后的代码保存在其中。

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class Solution2 {

    public static class Map extends Mapper<Object, Text, Text, DoubleWritable> {

        private final static DoubleWritable value = new DoubleWritable();

        public void map(Object key, Text line, Context context) throws IOException, InterruptedException {
            // 将每一行数据按空格分割
            String[] fields = line.toString().split(" ");

            // 获取item-name, price-per-unit, total-units
            String itemName = fields[0];
            double pricePerUnit = Double.parseDouble(fields[1]);
            double totalUnits = Double.parseDouble(fields[2]);

            // 计算price-per-unit * total-units
            double totalPrice = pricePerUnit * totalUnits;

            // 输出结果
            context.write(new Text(itemName), new DoubleWritable(totalPrice));
        }
    }

    public static class Reduce extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {

        public void reduce(Text key, Iterable<DoubleWritable> values, Context context)
                throws IOException, InterruptedException {
            // 遍历输入的values,将大于给定值的结果输出
            for (DoubleWritable value : values) {
                if (value.get() > Double.parseDouble(context.getConfiguration().get("given-value"))) {
                    context.write(key, value);
                }
            }
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.set("given-value", args[2]);

        Job job = Job.getInstance(conf, "Solution2");
        job.setJarByClass(Solution2.class);

        job.setMapperClass(Map.class);
        job.setCombinerClass(Reduce.class);
        job.setReducerClass(Reduce.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);

        TextInputFormat.addInputPath(job, new Path(args[0]));
        TextOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);
    }
}

接下来,使用以下命令编译和创建jar文件:

javac -classpath `hadoop classpath` Solution2.java
jar cf solution2.jar Solution2*.class

接下来,运行MapReduce应用程序:

hadoop jar solution2.jar Solution2 sales.txt output 100

其中,sales.txt为输入数据集,output为输出结果的路径,100为给定值。

最后,查看输出结果并将终端窗口中的所有消息复制到solution2.txt文件中:

hadoop fs -cat output/part-r-00000 > solution2.txt

以上是基于提供的问题和要求的一个解决方案。请注意,具体的实现可能会根据环境和需求而有所不同。


如果你已经解决了该问题, 非常希望你能够分享一下解决方案, 写成博客, 将相关链接放在评论区, 以帮助更多的人 ^-^