一个简单的 MapReduce 应用程序的实现
阅读 Filter.java 文件中提供的分析 MapReduce 应用程序
该应用程序的功能相当于以下 SQL 语句的功能:
SELECT key, value
FROM sequence-of-key-value-pairs
WHERE value > given-value;
此任务的目标是使用 Filter.java 文件中提供的 Java 代码来实现 MapReduce 应用程序 Solution2,其功能与以下 SQL 语句的功能等效:
SELECT item-name, price-per-unit * total-units
FROM sales.txt
WHERE price-per-unit * total-units > given-value;
输入数据集 sales.txt 中的单行必须具有以下格式。
item-name price-per-unit total-units
For example:
bolt 2 25
washer 3 8
screw 7 20
nail 5 10
screw 7 2
bolt 2 20
bolt 2 30
drill 10 5
washer 3 8
文件 sales.txt 的内容由您决定,只要它与上面解释的格式一致即可。
给定值的值必须通过程序的参数传递。
将解决方案保存在文件 Solution2.java 中。
准备好在终端窗口中列出 Solution2.java 后,编译、创建 jar 文件并处理应用程序。在终端窗口中列出输入数据集 sales.txt
应用程序创建的结果。完成后,将终端屏幕中的所有消息复制并粘贴到文件solution2.txt中。
Filter.java:
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class Filter {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
conf.set("limit", otherArgs[0]);
Job job = new Job(conf, "Distributed Filter");
job.setJarByClass(Filter.class);
job.setMapperClass(FilterMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setNumReduceTasks(0); // Set number of reducers to zero
FileInputFormat.addInputPath(job, new Path(args[1]));
FileOutputFormat.setOutputPath(job, new Path(args[2]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static class FilterMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable counter = new IntWritable(0);
private Text word = new Text();
private Integer total;
private Integer limit;
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
limit = Integer.parseInt( context.getConfiguration().get("limit") );
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
total = Integer.parseInt(itr.nextToken());
if ( total > limit )
{ counter.set( total );
context.write(word, counter); }
}
}
}
}
【以下回答由 GPT 生成】
我可以帮你实现这个MapReduce应用程序。下面是Solution2.java
的代码实现。
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class Solution2 {
public static class Map extends Mapper<Object, Text, Text, Text> {
private Text outputKey = new Text();
private Text outputValue = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
// 解析每一行的数据
String itemName = tokenizer.nextToken();
int pricePerUnit = Integer.parseInt(tokenizer.nextToken());
int totalUnits = Integer.parseInt(tokenizer.nextToken());
int totalPrice = pricePerUnit * totalUnits;
// 根据条件过滤数据,只输出满足条件的结果
int givenValue = context.getConfiguration().getInt("given_value", 0);
if (totalPrice > givenValue) {
outputKey.set(itemName);
outputValue.set(String.valueOf(totalPrice));
context.write(outputKey, outputValue);
}
}
}
public static class Reduce extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
// 计算每个item的总价值
int totalPrice = 0;
for (Text value : values) {
totalPrice += Integer.parseInt(value.toString());
}
context.write(key, new Text(String.valueOf(totalPrice)));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
conf.setInt("given_value", Integer.parseInt(otherArgs[0]));
Job job = new Job(conf, "MapReduce Solution");
job.setJarByClass(Solution2.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
你可以将上面的代码拷贝到Solution2.java
文件中,并使用以下命令编译和运行MapReduce应用程序:
hadoop com.sun.tools.javac.Main Solution2.java
jar cf solution2.jar Solution2*.class
hadoop jar solution2.jar Solution2 <given_value> <input_path> <output_path>
其中,<given_value>
是给定的值,<input_path>
是输入数据集的路径,<output_path>
是输出结果的路径。
请注意,在执行上述命令之前,请确保Hadoop环境已正确设置,并且你已经将Hadoop的相关库文件(例如hadoop-common-x.x.x.jar
和hadoop-mapreduce-client-core-x.x.x.jar
)添加到编译和运行命令中。
最后,程序将结果保存在文件solution2.txt
中。