MapReduce怎么求每个部门最高工资的员工姓名

MapReduce如何求每个部门工资最高的员工姓名,求Reduce的代码,Reduce代码如何写

img

  • 帮你找了个相似的问题, 你可以看下: https://ask.csdn.net/questions/7605785
  • 我还给你找了一篇非常好的博客,你可以看看是否有帮助,链接:Mapreduce执行机制之Map和Reduce
  • 除此之外, 这篇博客: MapReduce实战之 MapReduce中多表合并案例中的  需求1:Reduce端表合并(数据倾斜) 部分也许能够解决你的问题, 你可以仔细阅读以下内容或跳转源博客中阅读:
  • 通过将关联条件作为map输出的key,将两表满足join条件的数据并携带数据所来源的文件信息,发往同一个reduce task,在reduce中进行数据的串联。

    1)创建商品和订合并后的bean

    package com.atguigu.mapreduce.table;

    import java.io.DataInput;

    import java.io.DataOutput;

    import java.io.IOException;

    import org.apache.hadoop.io.Writable;

     

    public class TableBean implements Writable {

           private String order_id; // 订单id

           private String p_id; // 产品id

           private int amount; // 产品数量

           private String pname; // 产品名称

           private String flag;// 表的标记

     

           public TableBean() {

                  super();

           }

     

           public TableBean(String order_id, String p_id, int amount, String pname, String flag) {

                  super();

                  this.order_id = order_id;

                  this.p_id = p_id;

                  this.amount = amount;

                  this.pname = pname;

                  this.flag = flag;

           }

     

           public String getFlag() {

                  return flag;

           }

     

           public void setFlag(String flag) {

                  this.flag = flag;

           }

     

           public String getOrder_id() {

                  return order_id;

           }

     

           public void setOrder_id(String order_id) {

                  this.order_id = order_id;

           }

     

           public String getP_id() {

                  return p_id;

           }

     

           public void setP_id(String p_id) {

                  this.p_id = p_id;

           }

     

           public int getAmount() {

                  return amount;

           }

     

           public void setAmount(int amount) {

                  this.amount = amount;

           }

     

           public String getPname() {

                  return pname;

           }

     

           public void setPname(String pname) {

                  this.pname = pname;

           }

     

           @Override

           public void write(DataOutput out) throws IOException {

                  out.writeUTF(order_id);

                  out.writeUTF(p_id);

                  out.writeInt(amount);

                  out.writeUTF(pname);

                  out.writeUTF(flag);

           }

     

           @Override

           public void readFields(DataInput in) throws IOException {

                  this.order_id = in.readUTF();

                  this.p_id = in.readUTF();

                  this.amount = in.readInt();

                  this.pname = in.readUTF();

                  this.flag = in.readUTF();

           }

     

           @Override

           public String toString() {

                  return order_id + "\t" + pname + "\t" + amount + "\t" ;

           }

    }

    2)编写TableMapper程序

    package com.atguigu.mapreduce.table;

    import java.io.IOException;

    import org.apache.hadoop.io.LongWritable;

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapreduce.Mapper;

    import org.apache.hadoop.mapreduce.lib.input.FileSplit;

     

    public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean>{

           TableBean bean = new TableBean();

           Text k = new Text();

          

           @Override

           protected void map(LongWritable key, Text value, Context context)

                         throws IOException, InterruptedException {

                 

                  // 1 获取输入文件类型

                  FileSplit split = (FileSplit) context.getInputSplit();

                  String name = split.getPath().getName();

                 

                  // 2 获取输入数据

                  String line = value.toString();

                 

                  // 3 不同文件分别处理

                  if (name.startsWith("order")) {// 订单表处理

                         // 3.1 切割

                         String[] fields = line.split("\t");

                        

                         // 3.2 封装bean对象

                         bean.setOrder_id(fields[0]);

                         bean.setP_id(fields[1]);

                         bean.setAmount(Integer.parseInt(fields[2]));

                         bean.setPname("");

                         bean.setFlag("0");

                        

                         k.set(fields[1]);

                  }else {// 产品表处理

                         // 3.3 切割

                         String[] fields = line.split("\t");

                        

                         // 3.4 封装bean对象

                         bean.setP_id(fields[0]);

                         bean.setPname(fields[1]);

                         bean.setFlag("1");

                         bean.setAmount(0);

                         bean.setOrder_id("");

                        

                         k.set(fields[0]);

                  }

                  // 4 写出

                  context.write(k, bean);

           }

    }

    3)编写TableReducer程序

    package com.atguigu.mapreduce.table;

    import java.io.IOException;

    import java.util.ArrayList;

    import org.apache.commons.beanutils.BeanUtils;

    import org.apache.hadoop.io.NullWritable;

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapreduce.Reducer;

     

    public class TableReducer extends Reducer<Text, TableBean, TableBean, NullWritable> {

     

           @Override

           protected void reduce(Text key, Iterable<TableBean> values, Context context)

                         throws IOException, InterruptedException {

     

                  // 1准备存储订单的集合

                  ArrayList<TableBean> orderBeans = new ArrayList<>();

                  // 2 准备bean对象

                  TableBean pdBean = new TableBean();

     

                  for (TableBean bean : values) {

     

                         if ("0".equals(bean.getFlag())) {// 订单表

                                // 拷贝传递过来的每条订单数据到集合中

                                TableBean orderBean = new TableBean();

                                try {

                                       BeanUtils.copyProperties(orderBean, bean);

                                } catch (Exception e) {

                                       e.printStackTrace();

                                }

     

                                orderBeans.add(orderBean);

                         } else {// 产品表

                                try {

                                       // 拷贝传递过来的产品表到内存中

                                       BeanUtils.copyProperties(pdBean, bean);

                                } catch (Exception e) {

                                       e.printStackTrace();

                                }

                         }

                  }

     

                  // 3 表的拼接

                  for(TableBean bean:orderBeans){

                         bean.setPname (pdBean.getPname());

                        

                         // 4 数据写出去

                         context.write(bean, NullWritable.get());

                  }

           }

    }

    4)编写TableDriver程序

    package com.atguigu.mapreduce.table;

    import org.apache.hadoop.conf.Configuration;

    import org.apache.hadoop.fs.Path;

    import org.apache.hadoop.io.NullWritable;

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapreduce.Job;

    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

     

    public class TableDriver {

     

           public static void main(String[] args) throws Exception {

                  // 1 获取配置信息,或者job对象实例

                  Configuration configuration = new Configuration();

                  Job job = Job.getInstance(configuration);

     

                  // 2 指定本程序的jar包所在的本地路径

                  job.setJarByClass(TableDriver.class);

     

                  // 3 指定本业务job要使用的mapper/Reducer业务类

                  job.setMapperClass(TableMapper.class);

                  job.setReducerClass(TableReducer.class);

     

                  // 4 指定mapper输出数据的kv类型

                  job.setMapOutputKeyClass(Text.class);

                  job.setMapOutputValueClass(TableBean.class);

     

                  // 5 指定最终输出的数据的kv类型

                  job.setOutputKeyClass(TableBean.class);

                  job.setOutputValueClass(NullWritable.class);

     

                  // 6 指定job的输入原始文件所在目录

                  FileInputFormat.setInputPaths(job, new Path(args[0]));

                  FileOutputFormat.setOutputPath(job, new Path(args[1]));

     

                  // 7 job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行

                  boolean result = job.waitForCompletion(true);

                  System.exit(result ? 0 : 1);

           }

    }

    3)运行程序查看结果

    1001       小米       1    

    1001       小米       1    

    1002       华为       2    

    1002       华为       2    

    1003       格力       3    

    1003       格力       3    

    缺点:这种方式中,合并的操作是在reduce阶段完成,reduce端的处理压力太大,map节点的运算负载则很低,资源利用率不高,且在reduce阶段极易产生数据倾斜

    解决方案: map端实现数据合并

     

  • 您还可以看一下 尚硅谷IT教育老师的大数据技术课程中的 MapReduce排序分类小节, 巩固相关知识点
  • 以下回答由chatgpt基于相关博客总结生成:

    我会尝试解决这个问题:

    首先,我们需要定义一个数据结构来存储员工的信息,包括姓名和工资。可以定义一个类来实现这个功能,如下所示:

    public class Employee {
        private String name;
        private double salary;
    
        // constructor, getters, and setters
    }
    

    接下来,我们需要编写 Map 和 Reduce 函数来实现对每个部门中工资最高的员工的姓名的求解。首先是 Map 函数,它可以将每个员工的信息映射到一个 key-value 键值对上,其中 key 是部门名称,value 是一个 Employee 类型的对象。代码如下所示:

    public static class DeptMaxSalaryMapper extends Mapper<Object, Text, Text, Employee> {
    
        private Text department = new Text();
        private Employee employeeRecord = new Employee();
    
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            // Split the input CSV line into fields
            String[] fields = value.toString().split(",");
            // Parse the employee name, salary, and department from the input line
            String name = fields[0];
            double salary = Double.parseDouble(fields[1]);
            String dept = fields[2];
            // Create an Employee object with the parsed values
            employeeRecord.setName(name);
            employeeRecord.setSalary(salary);
            // Set the department as the key for this record
            department.set(dept);
            // Emit the department and its associated employee record
            context.write(department, employeeRecord);
        }
    }
    

    在 Map 函数中,我们首先将输入的 CSV 行拆分成各个字段,然后从中解析出员工的姓名、工资和部门信息。接下来,我们创建一个 Employee 对象,并将从 CSV 行中解析出的值赋给它。最后,我们将该员工所在的部门作为 key,将该员工记录作为 value,通过 context 对象输出。

    接下来是 Reduce 函数,它接收一个 department 和其对应的 Employee 列表,然后输出该部门工资最高的员工的姓名。代码如下所示:

    public static class DeptMaxSalaryReducer extends Reducer<Text, Employee, Text, Text> {
    
        private Employee highestSalaryEmployee = null;
    
        public void reduce(Text key, Iterable<Employee> values, Context context) throws IOException, InterruptedException {
            // Find the highest salary employee
            for (Employee emp : values) {
                if (highestSalaryEmployee == null || emp.getSalary() > highestSalaryEmployee.getSalary()) {
                    highestSalaryEmployee = emp;
                }
            }
            // Emit the department and name of the employee with the highest salary
            if (highestSalaryEmployee != null) {
                context.write(key, new Text(highestSalaryEmployee.getName()));
            }
        }
    }
    

    在 Reduce 函数中,我们首先定义一个变量来存储部门中工资最高的员工的信息。然后,我们遍历输入的 Employee 列表,在其中查找工资最高的员工。一旦找到这个员工,我们就使用 context 对象输出部门名称和该员工的姓名。

    最后,我们需要将 Map 和 Reduce 函数注册到我们的 MapReduce 作业中。完整的代码如下所示:

    public class MaxSalaryByDept {
    
        public static class DeptMaxSalaryMapper extends Mapper<Object, Text, Text, Employee> {
    
            private Text department = new Text();
            private Employee employeeRecord = new Employee();
    
            public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
                // Split the input CSV line into fields
                String[] fields = value.toString().split(",");
                // Parse the employee name, salary, and department from the input line
                String name = fields[0];
                double salary = Double.parseDouble(fields[1]);
                String dept = fields[2];
                // Create an Employee object with the parsed values
                employeeRecord.setName(name);
                employeeRecord.setSalary(salary);
                // Set the department as the key for this record
                department.set(dept);
                // Emit the department and its associated employee record
                context.write(department, employeeRecord);
            }
        }
    
        public static class DeptMaxSalaryReducer extends Reducer<Text, Employee, Text, Text> {
    
            private Employee highestSalaryEmployee = null;
    
            public void reduce(Text key, Iterable<Employee> values, Context context) throws IOException, InterruptedException {
                // Find the highest salary employee
                for (Employee emp : values) {
                    if (highestSalaryEmployee == null || emp.getSalary() > highestSalaryEmployee.getSalary()) {
                        highestSalaryEmployee = emp;
                    }
                }
                // Emit the department and name of the employee with the highest salary
                if (highestSalaryEmployee != null) {
                    context.write(key, new Text(highestSalaryEmployee.getName()));
                }
            }
        }
    
        public static void main(String[] args) throws Exception {
            // Create a new MapReduce job
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf, "Max salary by department");
            job.setJarByClass(MaxSalaryByDept.class);
    
            // Set the mapper and reducer classes
            job.setMapperClass(DeptMaxSalaryMapper.class);
            job.setReducerClass(DeptMaxSalaryReducer.class);
    
            // Set the output key and value classes
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
    
            // Set the input and output paths
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            // Submit the job and wait for it to complete
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }
    

    在 main 方法中,我们首先创建一个新的 MapReduce 作业。然后,我们设置了 Map 和 Reduce 函数的类,同时指定了输出的 key 和 value 类型。接下来,我们设置了输入和输出路径,并提交了该作业。

    以上就是解决该问题的代码实现,希望能对您有所帮助!