MapReduce如何求每个部门工资最高的员工姓名,求Reduce的代码,Reduce代码如何写
通过将关联条件作为map输出的key,将两表满足join条件的数据并携带数据所来源的文件信息,发往同一个reduce task,在reduce中进行数据的串联。
1)创建商品和订合并后的bean类
package com.atguigu.mapreduce.table; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable;
public class TableBean implements Writable { private String order_id; // 订单id private String p_id; // 产品id private int amount; // 产品数量 private String pname; // 产品名称 private String flag;// 表的标记
public TableBean() { super(); }
public TableBean(String order_id, String p_id, int amount, String pname, String flag) { super(); this.order_id = order_id; this.p_id = p_id; this.amount = amount; this.pname = pname; this.flag = flag; }
public String getFlag() { return flag; }
public void setFlag(String flag) { this.flag = flag; }
public String getOrder_id() { return order_id; }
public void setOrder_id(String order_id) { this.order_id = order_id; }
public String getP_id() { return p_id; }
public void setP_id(String p_id) { this.p_id = p_id; }
public int getAmount() { return amount; }
public void setAmount(int amount) { this.amount = amount; }
public String getPname() { return pname; }
public void setPname(String pname) { this.pname = pname; }
@Override public void write(DataOutput out) throws IOException { out.writeUTF(order_id); out.writeUTF(p_id); out.writeInt(amount); out.writeUTF(pname); out.writeUTF(flag); }
@Override public void readFields(DataInput in) throws IOException { this.order_id = in.readUTF(); this.p_id = in.readUTF(); this.amount = in.readInt(); this.pname = in.readUTF(); this.flag = in.readUTF(); }
@Override public String toString() { return order_id + "\t" + pname + "\t" + amount + "\t" ; } } |
2)编写TableMapper程序
package com.atguigu.mapreduce.table; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean>{ TableBean bean = new TableBean(); Text k = new Text();
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 获取输入文件类型 FileSplit split = (FileSplit) context.getInputSplit(); String name = split.getPath().getName();
// 2 获取输入数据 String line = value.toString();
// 3 不同文件分别处理 if (name.startsWith("order")) {// 订单表处理 // 3.1 切割 String[] fields = line.split("\t");
// 3.2 封装bean对象 bean.setOrder_id(fields[0]); bean.setP_id(fields[1]); bean.setAmount(Integer.parseInt(fields[2])); bean.setPname(""); bean.setFlag("0");
k.set(fields[1]); }else {// 产品表处理 // 3.3 切割 String[] fields = line.split("\t");
// 3.4 封装bean对象 bean.setP_id(fields[0]); bean.setPname(fields[1]); bean.setFlag("1"); bean.setAmount(0); bean.setOrder_id("");
k.set(fields[0]); } // 4 写出 context.write(k, bean); } } |
3)编写TableReducer程序
package com.atguigu.mapreduce.table; import java.io.IOException; import java.util.ArrayList; import org.apache.commons.beanutils.BeanUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
public class TableReducer extends Reducer<Text, TableBean, TableBean, NullWritable> {
@Override protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException {
// 1准备存储订单的集合 ArrayList<TableBean> orderBeans = new ArrayList<>(); // 2 准备bean对象 TableBean pdBean = new TableBean();
for (TableBean bean : values) {
if ("0".equals(bean.getFlag())) {// 订单表 // 拷贝传递过来的每条订单数据到集合中 TableBean orderBean = new TableBean(); try { BeanUtils.copyProperties(orderBean, bean); } catch (Exception e) { e.printStackTrace(); }
orderBeans.add(orderBean); } else {// 产品表 try { // 拷贝传递过来的产品表到内存中 BeanUtils.copyProperties(pdBean, bean); } catch (Exception e) { e.printStackTrace(); } } }
// 3 表的拼接 for(TableBean bean:orderBeans){ bean.setPname (pdBean.getPname());
// 4 数据写出去 context.write(bean, NullWritable.get()); } } } |
4)编写TableDriver程序
package com.atguigu.mapreduce.table; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class TableDriver {
public static void main(String[] args) throws Exception { // 1 获取配置信息,或者job对象实例 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration);
// 2 指定本程序的jar包所在的本地路径 job.setJarByClass(TableDriver.class);
// 3 指定本业务job要使用的mapper/Reducer业务类 job.setMapperClass(TableMapper.class); job.setReducerClass(TableReducer.class);
// 4 指定mapper输出数据的kv类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(TableBean.class);
// 5 指定最终输出的数据的kv类型 job.setOutputKeyClass(TableBean.class); job.setOutputValueClass(NullWritable.class);
// 6 指定job的输入原始文件所在目录 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } } |
3)运行程序查看结果
1001 小米 1 1001 小米 1 1002 华为 2 1002 华为 2 1003 格力 3 1003 格力 3 |
缺点:这种方式中,合并的操作是在reduce阶段完成,reduce端的处理压力太大,map节点的运算负载则很低,资源利用率不高,且在reduce阶段极易产生数据倾斜
解决方案: map端实现数据合并
我会尝试解决这个问题:
首先,我们需要定义一个数据结构来存储员工的信息,包括姓名和工资。可以定义一个类来实现这个功能,如下所示:
public class Employee {
private String name;
private double salary;
// constructor, getters, and setters
}
接下来,我们需要编写 Map 和 Reduce 函数来实现对每个部门中工资最高的员工的姓名的求解。首先是 Map 函数,它可以将每个员工的信息映射到一个 key-value 键值对上,其中 key 是部门名称,value 是一个 Employee 类型的对象。代码如下所示:
public static class DeptMaxSalaryMapper extends Mapper<Object, Text, Text, Employee> {
private Text department = new Text();
private Employee employeeRecord = new Employee();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
// Split the input CSV line into fields
String[] fields = value.toString().split(",");
// Parse the employee name, salary, and department from the input line
String name = fields[0];
double salary = Double.parseDouble(fields[1]);
String dept = fields[2];
// Create an Employee object with the parsed values
employeeRecord.setName(name);
employeeRecord.setSalary(salary);
// Set the department as the key for this record
department.set(dept);
// Emit the department and its associated employee record
context.write(department, employeeRecord);
}
}
在 Map 函数中,我们首先将输入的 CSV 行拆分成各个字段,然后从中解析出员工的姓名、工资和部门信息。接下来,我们创建一个 Employee 对象,并将从 CSV 行中解析出的值赋给它。最后,我们将该员工所在的部门作为 key,将该员工记录作为 value,通过 context 对象输出。
接下来是 Reduce 函数,它接收一个 department 和其对应的 Employee 列表,然后输出该部门工资最高的员工的姓名。代码如下所示:
public static class DeptMaxSalaryReducer extends Reducer<Text, Employee, Text, Text> {
private Employee highestSalaryEmployee = null;
public void reduce(Text key, Iterable<Employee> values, Context context) throws IOException, InterruptedException {
// Find the highest salary employee
for (Employee emp : values) {
if (highestSalaryEmployee == null || emp.getSalary() > highestSalaryEmployee.getSalary()) {
highestSalaryEmployee = emp;
}
}
// Emit the department and name of the employee with the highest salary
if (highestSalaryEmployee != null) {
context.write(key, new Text(highestSalaryEmployee.getName()));
}
}
}
在 Reduce 函数中,我们首先定义一个变量来存储部门中工资最高的员工的信息。然后,我们遍历输入的 Employee 列表,在其中查找工资最高的员工。一旦找到这个员工,我们就使用 context 对象输出部门名称和该员工的姓名。
最后,我们需要将 Map 和 Reduce 函数注册到我们的 MapReduce 作业中。完整的代码如下所示:
public class MaxSalaryByDept {
public static class DeptMaxSalaryMapper extends Mapper<Object, Text, Text, Employee> {
private Text department = new Text();
private Employee employeeRecord = new Employee();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
// Split the input CSV line into fields
String[] fields = value.toString().split(",");
// Parse the employee name, salary, and department from the input line
String name = fields[0];
double salary = Double.parseDouble(fields[1]);
String dept = fields[2];
// Create an Employee object with the parsed values
employeeRecord.setName(name);
employeeRecord.setSalary(salary);
// Set the department as the key for this record
department.set(dept);
// Emit the department and its associated employee record
context.write(department, employeeRecord);
}
}
public static class DeptMaxSalaryReducer extends Reducer<Text, Employee, Text, Text> {
private Employee highestSalaryEmployee = null;
public void reduce(Text key, Iterable<Employee> values, Context context) throws IOException, InterruptedException {
// Find the highest salary employee
for (Employee emp : values) {
if (highestSalaryEmployee == null || emp.getSalary() > highestSalaryEmployee.getSalary()) {
highestSalaryEmployee = emp;
}
}
// Emit the department and name of the employee with the highest salary
if (highestSalaryEmployee != null) {
context.write(key, new Text(highestSalaryEmployee.getName()));
}
}
}
public static void main(String[] args) throws Exception {
// Create a new MapReduce job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Max salary by department");
job.setJarByClass(MaxSalaryByDept.class);
// Set the mapper and reducer classes
job.setMapperClass(DeptMaxSalaryMapper.class);
job.setReducerClass(DeptMaxSalaryReducer.class);
// Set the output key and value classes
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// Set the input and output paths
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// Submit the job and wait for it to complete
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
在 main 方法中,我们首先创建一个新的 MapReduce 作业。然后,我们设置了 Map 和 Reduce 函数的类,同时指定了输出的 key 和 value 类型。接下来,我们设置了输入和输出路径,并提交了该作业。
以上就是解决该问题的代码实现,希望能对您有所帮助!