MapReduce自定义对象怎么写

如何用以下数据自定义对象,用于实现分区 排序等等操作,初来乍到,请多关照。

img

不太明白这个自定义对象用在什么地方还是直接应用在项目中😂😂😂如果要存在数据库中直接用Batis创建字段就行,直接保存到集合的话直接在class中添加属性封装然后继承compareTo实现排序就行,也可以用反射和序列化

http://www.mark-to-win.com/tutorial/mydb_MapReduce_CustomizedClass.html

我发一个我之前写过的学生的,和这个类似,创建自定义对象和java差不多

自定义学生对象
public class Student implements WritableComparable<Student> {
    private long stuId;
    private String stuName;
    private int score;

    public Student()  {
    }

    public Student(long stuId, String stuName, int score) {
        this.stuId = stuId;
        this.stuName = stuName;
        this.score = score;
    }

    @Override
    public String toString() {
        return "Student{" +
                "stuId=" + stuId +
                ", stuName='" + stuName + '\'' +
                ", score=" + score +
                '}';
    }

    public long getStuId() {
        return stuId;
    }

    public void setStuId(Long stuId) {
        this.stuId = stuId;
    }

    public String getStuName() {
        return stuName;
    }

    public void setStuName(String stuName) {
        this.stuName = stuName;
    }

    public int getScore() {
        return score;
    }

    public void setScore(int score) {
        this.score = score;
    }
  @Override
    public int compareTo(Student o) {
//        if(o.stuId==this.stuId)
//            return 1;
        return 0;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(stuId);
        dataOutput.writeUTF(stuName);
        dataOutput.writeInt(score);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.stuId = dataInput.readLong();
        this.stuName = dataInput.readUTF();
         this.score= dataInput.readInt();


    }
mapper 阶段
public class StudentMapper extends Mapper<LongWritable, Text,LongWritable,Student> {


    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        System.out.println(key.get()+"  "+value);
        String[] stu = value.toString().split(",");
        LongWritable stuId = new LongWritable(Long.parseLong(stu[0]));
        Student student=new Student(stuId.get(),stu[1],Integer.parseInt(stu[3]));
//        String stuId = stu[0];
//        String stuName = stu[1];
//        String score = stu[3];
//
//        student.setStuId(stuId);
//        student.setStuName(stuName);
//        student.setScore(score);
        context.write(stuId,student);

    }
}
reducer阶段
public class StudentReducer extends Reducer<LongWritable,Student,LongWritable,Student> {
    Student student=new Student();
    @Override
    protected void reduce(LongWritable key, Iterable<Student> values, Context context) throws IOException, InterruptedException {
        int sum=0;
        String stuName="";
        for (Student student:
             values) {
            if(stuName.equals("")) {
                stuName = student.getStuName();
            }
           sum+= student.getScore();
        }
        student.setStuId(key.get());
        student.setStuName(stuName);
        student.setScore(sum);
        System.out.println(key.get()+"总成绩:"+sum);
        context.write(key,student);
    }
}
driver执行阶段
public class StudentScoreDriver {
    public static void main(String[] args)throws Exception {
        Configuration conf=new Configuration();
        Job job=Job.getInstance(conf);

        job.setJarByClass(StudentScoreDriver.class);

        job.setMapperClass(StudentMapper.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Student.class);

        job.setReducerClass(StudentReducer.class);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Student.class);

        Path pathIn=new Path("C:\\Users\\Administrator\\IdeaProjects\\hadoopStu\\in\\demo2\\stuScore.csv");
        FileInputFormat.setInputPaths(job,pathIn);
        Path pathOut=new Path("C:\\Users\\Administrator\\IdeaProjects\\hadoopStu\\in\\student");
        FileOutputFormat.setOutputPath(job,pathOut);
        FileSystem fileSystem=FileSystem.get(pathOut.toUri(),conf);
        if(fileSystem.exists(pathOut)){
            fileSystem.delete(pathOut,true);
        }
        job.waitForCompletion(true);
    }
}