java.io.IOException: Unknown column 'phone' in 'field list'

![img](

img


package FlowWrite;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

public class FlowBean implements DBWritable, Writable {
    private String phone;
    private long upflow;
    private long downflow;
    private double avg_upflow;
    private double avg_downflow;
    private long chaflow;
    private long sumflow;

    public static void copyProperties(FlowBean b1, FlowBean bea) {
    }


    @Override
    public String toString() {
//        return  phone + '\t' + "\t" + upflow + "\t" + downflow;
        return  phone + '\t' +  + upflow + "\t" + downflow+ '\t'  +avg_upflow+"\t" +avg_downflow+"\t" +chaflow+"\t" +sumflow;
        //+", sumflow=" + sumflow
    }

    public String getPhone() {
        return phone;
    }

    public void setPhone(String phone) {
        this.phone = phone;
    }

    public long getUpflow() {
        return upflow;
    }

    public void setUpflow(long upflow) {
        this.upflow = upflow;
    }

    public long getDownflow() {
        return downflow;
    }

    public void setDownflow(long downflow) {
        this.downflow = downflow;
    }

    public double getAvg_upflow() {
        return avg_upflow;
    }

    public void setAvg_upflow(double avg_upflow) {
        this.avg_upflow = avg_upflow;
    }

    public double getAvg_downflow() {
        return avg_downflow;
    }

    public void setAvg_downflow(double avg_downflow) {
        this.avg_downflow = avg_downflow;
    }

    public long getChaflow() {
        return chaflow;
    }

    public void setChaflow(long chaflow) {
        this.chaflow = chaflow;
    }

    public long getSumflow() {
        return sumflow;
    }

    public void setSumflow(long sumflow) {
        this.sumflow = sumflow;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(phone);
        dataOutput.writeLong(upflow);
        dataOutput.writeLong(downflow);
        dataOutput.writeDouble(avg_upflow);
        dataOutput.writeDouble(avg_downflow);
        dataOutput.writeLong(chaflow);
        dataOutput.writeLong(sumflow);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        phone=dataInput.readUTF();
        upflow = dataInput.readLong();
        downflow = dataInput.readLong();
        avg_upflow = dataInput.readLong();
        avg_downflow = dataInput.readLong();
        chaflow=dataInput.readLong();
        sumflow = dataInput.readLong();
    }

    @Override
    public void write(PreparedStatement preparedStatement) throws SQLException {
        preparedStatement.setString(1, this.phone);
        preparedStatement.setLong(2, this.chaflow);
        preparedStatement.setLong(3, this.sumflow);
        preparedStatement.setDouble(4, this.avg_upflow);
        preparedStatement.setDouble(5, this.avg_downflow);
        preparedStatement.setLong(6, this.chaflow);
        preparedStatement.setLong(7, this.sumflow);
//        preparedStatement.setLong(1, this.avg_upflow);
//        preparedStatement.setLong(2, this.avg_downflow);
//        preparedStatement.setLong(3, this.chaflow);
//        preparedStatement.setLong(4, this.sumflow);
    }

    @Override
    public void readFields(ResultSet resultSet) throws SQLException {
        phone=resultSet.getString(1);
        upflow=resultSet.getLong(2);
        downflow=resultSet.getLong(3);
        avg_upflow=resultSet.getDouble(4);
        avg_downflow=resultSet.getDouble(5);
        chaflow=resultSet.getLong(6);
        sumflow=resultSet.getLong(7);
//        avg_upflow=resultSet.getLong(1);
//        avg_downflow=resultSet.getLong(2);
//        chaflow=resultSet.getLong(3);
//        sumflow=resultSet.getLong(4);
    }
}

package FlowWrite;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FlowMapper extends Mapper<LongWritable, Text,Text,FlowBean> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] strs = line.split("\t");//每行的内容使用tab制表符分割开来
        String phone = strs[1];//文件中第二个值,所以下表为1
        String upflow = strs[strs.length - 2];//倒数第二个
        String downflow = strs[strs.length - 3];//倒数第二个
        FlowBean mapOutBean = new FlowBean();
        mapOutBean.setPhone(phone);
        mapOutBean.setUpflow(Long.parseLong(upflow));//将Sting类型转为long类型
        mapOutBean.setDownflow(Long.parseLong(downflow));//将Sting类型转为long类型
//        String cha=value.get

        mapOutBean.setAvg_upflow(mapOutBean.getAvg_upflow());
        mapOutBean.setAvg_downflow(mapOutBean.getAvg_downflow());
        mapOutBean.setChaflow(mapOutBean.getChaflow());
        mapOutBean.setSumflow(Long.parseLong(upflow)+Long.parseLong(downflow));//总流量还未统计,任意设置一个值
        context.write(new Text(phone),mapOutBean);
    }
}


package FlowWrite;

import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.sql.ResultSet;
import java.util.ArrayList;

public class FlowReduce extends Reducer<Text, FlowBean,FlowBean,NullWritable> {
    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
        long avg_upflow;
        long avg_downflow;
        long upsum=0L;
        long downsum=0L;
        int count = 0;
//
        ArrayList<FlowBean> b = new ArrayList<>();
        for (FlowBean bea : values) {
            FlowBean b1 = new FlowBean();
//            System.out.println(key.toString() + "---" + bea.toString());
            try {
                BeanUtils.copyProperties(b1, bea);
            } catch (IllegalAccessException e) {
                e.printStackTrace();
            } catch (InvocationTargetException e) {
                e.printStackTrace();
            }
            b.add(b1);
            upsum +=bea.getUpflow();//求上行流量
            downsum +=bea.getDownflow();//求下行流量和
//            avg_upflow=upsum +bea.getAvg_upflow();
//            avg_downflow=downsum +bea.getAvg_downflow();
            count+= 1;
//            System.out.println(avg_upflow + "\t" + avg_downflow+upsum+downsum);
        }
        long sum=upsum+downsum;
        avg_upflow=upsum/count;
        avg_downflow=downsum/count;
        long chaflow=avg_upflow-avg_downflow;
//        System.out.println(avg_upflow + "\t" + avg_downflow+ "\t"+chaflow+ "\t"+sum);
        FlowBean outValue=new FlowBean();
        outValue.setPhone(key.toString());
        outValue.setUpflow(upsum);
        outValue.setDownflow(downsum);
        outValue.setAvg_upflow(avg_upflow);
        outValue.setDownflow(avg_downflow);
        outValue.setChaflow(chaflow);
        outValue.setSumflow(sum);
        System.out.println(outValue.toString());
        context.write(outValue,NullWritable.get());

    }
}

package FlowWrite;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class FlowDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, IOException {
        Configuration conf =new Configuration();
        DBConfiguration.configureDB(conf,"com.mysql.jdbc.Driver","jdbc:mysql://localhost:3306/test","root","123456");
        Job job=Job.getInstance(conf);//job类静态方法调用,可以不通过对象去调用,而是直接通过类
        job.getConfiguration().setStrings("mapreduce.reduce.shuffle.memory.limit.percent", "0.1");
        job.setJarByClass(FlowDriver.class);
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReduce.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
        job.setOutputKeyClass(FlowBean.class);
        job.setOutputValueClass(NullWritable.class);
        Path input=new Path("./inputdir");
        FileInputFormat.addInputPath(job,input);
        DBOutputFormat.setOutput(job,"goods_table","phone","upflow","downflow","avg_up","avg_down","chaflow","sumflow");
//        DBOutputFormat.setOutput(job,"goods_table","avg_up","avg_down","chaflow","sumflow");
        job.setOutputFormatClass(DBOutputFormat.class);

        boolean flag= job.waitForCompletion(true);
        System.exit(flag?0:1);
    }
}


贴下你的sql