MapReduce分区输出ip并且统计个数

MapReduce实现ip分类输出并统计个数
在使用mapReduce实现不同类别ip地址输出到不同文件如何统计每个类型ip地址的个数,将mapper结果用bean封装,分区主要靠partitioner实现,但是一直出现java.lang.ClassCastException的问题,按照网上的法子改了很多次都不行

Mapper代码

package com.topview.log;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class LogMapper extends Mapper<LongWritable,Text,Text,LogBean> {

    Text outK=new Text();
    LogBean outV=new LogBean();
    @Override
    protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException {

        //178.199.96.56    2023-03-17 02:29:29    2023-03-17 03:09:25
        //获取一行
        String line=value.toString();
        //日志解析
        LogBean logBean=parseLog(line);
        //合法性
        if(!logBean.isValid()){
            return;
        }
        outK.set(line);

        //写出
        context.write(outK,outV);
    }

    private LogBean parseLog(String line) {
        //切割
        String[] fields = line.split("\t");
        if(fields[0].length()<15) {
            outV.setIp(fields[0]);
            outV.setLogin(fields[1]);
            outV.setLogout(fields[2]);
            outV.setValid(true);
        }else{
            outV.setValid(false);
        }
        //判断是否合法
        return outV;

    }
}

Bean代码

package com.topview.log;

import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class LogBean implements Writable {
    private String ip;
    private String login;
    private String logout;
    private boolean valid = true;
    public LogBean(){
        super();
    }

    public LogBean(String ip,String login,String logout){
        super();
        this.ip=ip;
        this.login=login;
        this.logout=logout;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        //序列化
        dataOutput.writeUTF(ip);
        dataOutput.writeUTF(login);
        dataOutput.writeUTF(logout);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        //反序列化
        ip=dataInput.readUTF();
        login=dataInput.readUTF();
        logout=dataInput.readUTF();
    }

    public String toString(){
        return ip+"\t"+login+"\t"+logout;
    }

    public String getIp() {
        return ip;
    }
    public void setIp(String ip) {
        this.ip = ip;
    }
    public String getLogin() {
        return login;
    }
    public void setLogin(String login) {
        this.login = login;
    }
    public String getLogout() {
        return logout;
    }
    public void setLogout(String logout) {
        this.logout = logout;
    }
    public boolean isValid() {
        return valid;
    }
    public void setValid(boolean valid) {
        this.valid = valid;
    }
}

Partitioner代码

package com.topview.log;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

/**
 * @Title:
 * @Package
 * @Description:
 * @author: Yeeasy
 **/
public class ProvincePartitioner extends Partitioner<Text, IntWritable> {

    @Override
    public int getPartition(Text text, IntWritable intWritable, int i) {
        String key=text.toString();
        String[] keys=key.split(".");
        int ip=Integer.parseInt(keys[0]);
        int partition=0;
        if(ip>0&&ip<128){
            partition=0;
        }else if(ip>127&&ip<192){
            partition=1;
        }else if(ip>191&&ip<224){
            partition=2;
        }else if (ip>223&&ip<240){
            partition=3;
        }else if (ip>239&&ip<256){
            partition=4;
        }else{
        }
        return partition;
    }
}

Driver类

package com.topview.log;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


/**
 * @Title:
 * @Package
 * @Description:
 * @author: Yeeasy
 **/
public class LogDriver {
    public static void main(String[] args) throws Exception{

        //job
        Job job= Job.getInstance(new Configuration());

        //jar
        job.setJarByClass(LogDriver.class);

        //map
        job.setMapperClass(LogMapper.class);
        job.setMapOutputKeyClass(LogBean.class);
        job.setMapOutputValueClass(IntWritable.class);

        //reduce
        job.setReducerClass(LogReducer.class);
        job.setOutputKeyClass(LogBean.class);
        job.setOutputValueClass(IntWritable.class);

        //分区
        job.setPartitionerClass(ProvincePartitioner.class);
        job.setNumReduceTasks(5);

        //路径
        FileInputFormat.setInputPaths(job,new Path("D:/IDEA/TVlog/input"));
        FileOutputFormat.setOutputPath(job,new Path("D:/IDEA/TVlog/success"));

        //提交
        boolean completion = job.waitForCompletion(true);
        System.exit(completion ? 0 : 1);
    }
}


java.lang.Exception: java.io.IOException: Initialization of all the collectors failed. Error in last collector was:java.lang.ClassCastException: class com.topview.log.LogBean
看了driver类,应该是没有问题的,不知道如何实现这个ip分类和统计

bean类需要继承WritableComparable这个类,重写compareTo方法,单纯实现Writable,你自定义的对象没有实现自动排序,你要在compareTo中定义先来后到的存入