 {
}
@Override
public String toString() {
// return phone + '\t' + "\t" + upflow + "\t" + downflow;
return phone + '\t' + + upflow + "\t" + downflow+ '\t' +avg_upflow+"\t" +avg_downflow+"\t" +chaflow+"\t" +sumflow;
//+", sumflow=" + sumflow
}
public String getPhone() {
return phone;
}
public void setPhone(String phone) {
this.phone = phone;
}
public long getUpflow() {
return upflow;
}
public void setUpflow(long upflow) {
this.upflow = upflow;
}
public long getDownflow() {
return downflow;
}
public void setDownflow(long downflow) {
this.downflow = downflow;
}
public double getAvg_upflow() {
return avg_upflow;
}
public void setAvg_upflow(double avg_upflow) {
this.avg_upflow = avg_upflow;
}
public double getAvg_downflow() {
return avg_downflow;
}
public void setAvg_downflow(double avg_downflow) {
this.avg_downflow = avg_downflow;
}
public long getChaflow() {
return chaflow;
}
public void setChaflow(long chaflow) {
this.chaflow = chaflow;
}
public long getSumflow() {
return sumflow;
}
public void setSumflow(long sumflow) {
this.sumflow = sumflow;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(phone);
dataOutput.writeLong(upflow);
dataOutput.writeLong(downflow);
dataOutput.writeDouble(avg_upflow);
dataOutput.writeDouble(avg_downflow);
dataOutput.writeLong(chaflow);
dataOutput.writeLong(sumflow);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
phone=dataInput.readUTF();
upflow = dataInput.readLong();
downflow = dataInput.readLong();
avg_upflow = dataInput.readLong();
avg_downflow = dataInput.readLong();
chaflow=dataInput.readLong();
sumflow = dataInput.readLong();
}
@Override
public void write(PreparedStatement preparedStatement) throws SQLException {
preparedStatement.setString(1, this.phone);
preparedStatement.setLong(2, this.chaflow);
preparedStatement.setLong(3, this.sumflow);
preparedStatement.setDouble(4, this.avg_upflow);
preparedStatement.setDouble(5, this.avg_downflow);
preparedStatement.setLong(6, this.chaflow);
preparedStatement.setLong(7, this.sumflow);
// preparedStatement.setLong(1, this.avg_upflow);
// preparedStatement.setLong(2, this.avg_downflow);
// preparedStatement.setLong(3, this.chaflow);
// preparedStatement.setLong(4, this.sumflow);
}
@Override
public void readFields(ResultSet resultSet) throws SQLException {
phone=resultSet.getString(1);
upflow=resultSet.getLong(2);
downflow=resultSet.getLong(3);
avg_upflow=resultSet.getDouble(4);
avg_downflow=resultSet.getDouble(5);
chaflow=resultSet.getLong(6);
sumflow=resultSet.getLong(7);
// avg_upflow=resultSet.getLong(1);
// avg_downflow=resultSet.getLong(2);
// chaflow=resultSet.getLong(3);
// sumflow=resultSet.getLong(4);
}
}
package FlowWrite;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowMapper extends Mapper<LongWritable, Text,Text,FlowBean> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] strs = line.split("\t");//每行的内容使用tab制表符分割开来
String phone = strs[1];//文件中第二个值,所以下表为1
String upflow = strs[strs.length - 2];//倒数第二个
String downflow = strs[strs.length - 3];//倒数第二个
FlowBean mapOutBean = new FlowBean();
mapOutBean.setPhone(phone);
mapOutBean.setUpflow(Long.parseLong(upflow));//将Sting类型转为long类型
mapOutBean.setDownflow(Long.parseLong(downflow));//将Sting类型转为long类型
// String cha=value.get
mapOutBean.setAvg_upflow(mapOutBean.getAvg_upflow());
mapOutBean.setAvg_downflow(mapOutBean.getAvg_downflow());
mapOutBean.setChaflow(mapOutBean.getChaflow());
mapOutBean.setSumflow(Long.parseLong(upflow)+Long.parseLong(downflow));//总流量还未统计,任意设置一个值
context.write(new Text(phone),mapOutBean);
}
}
package FlowWrite;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.sql.ResultSet;
import java.util.ArrayList;
public class FlowReduce extends Reducer<Text, FlowBean,FlowBean,NullWritable> {
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
long avg_upflow;
long avg_downflow;
long upsum=0L;
long downsum=0L;
int count = 0;
//
ArrayList<FlowBean> b = new ArrayList<>();
for (FlowBean bea : values) {
FlowBean b1 = new FlowBean();
// System.out.println(key.toString() + "---" + bea.toString());
try {
BeanUtils.copyProperties(b1, bea);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
b.add(b1);
upsum +=bea.getUpflow();//求上行流量
downsum +=bea.getDownflow();//求下行流量和
// avg_upflow=upsum +bea.getAvg_upflow();
// avg_downflow=downsum +bea.getAvg_downflow();
count+= 1;
// System.out.println(avg_upflow + "\t" + avg_downflow+upsum+downsum);
}
long sum=upsum+downsum;
avg_upflow=upsum/count;
avg_downflow=downsum/count;
long chaflow=avg_upflow-avg_downflow;
// System.out.println(avg_upflow + "\t" + avg_downflow+ "\t"+chaflow+ "\t"+sum);
FlowBean outValue=new FlowBean();
outValue.setPhone(key.toString());
outValue.setUpflow(upsum);
outValue.setDownflow(downsum);
outValue.setAvg_upflow(avg_upflow);
outValue.setDownflow(avg_downflow);
outValue.setChaflow(chaflow);
outValue.setSumflow(sum);
System.out.println(outValue.toString());
context.write(outValue,NullWritable.get());
}
}
package FlowWrite;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, IOException {
Configuration conf =new Configuration();
DBConfiguration.configureDB(conf,"com.mysql.jdbc.Driver","jdbc:mysql://localhost:3306/test","root","123456");
Job job=Job.getInstance(conf);//job类静态方法调用,可以不通过对象去调用,而是直接通过类
job.getConfiguration().setStrings("mapreduce.reduce.shuffle.memory.limit.percent", "0.1");
job.setJarByClass(FlowDriver.class);
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
job.setOutputKeyClass(FlowBean.class);
job.setOutputValueClass(NullWritable.class);
Path input=new Path("./inputdir");
FileInputFormat.addInputPath(job,input);
DBOutputFormat.setOutput(job,"goods_table","phone","upflow","downflow","avg_up","avg_down","chaflow","sumflow");
// DBOutputFormat.setOutput(job,"goods_table","avg_up","avg_down","chaflow","sumflow");
job.setOutputFormatClass(DBOutputFormat.class);
boolean flag= job.waitForCompletion(true);
System.exit(flag?0:1);
}
}
贴下你的sql