原始数据集
数据库建表存储结果
你要贴在代码框里,大家可以拷贝了运行,不然,纯人肉看,成本太高,大家就不愿意回答你的问题,你可能就要花很长的时间自己研究。
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
public class BeanSQL implements Writable, DBWritable {
private String date;
private String country;
private String state;
private String fips;
private int cases;
private int deaths;
@Override
public String toString() {
return "BeanSQL{" +
"date='" + date + '\'' +
", country='" + country + '\'' +
", state='" + state + '\'' +
", fips='" + fips + '\'' +
", cases=" + cases +
", deaths=" + deaths +
'}';
}
public void set(String date, String country, String state, String fips, int cases, int deaths) {
this.date = date;
this.country = country;
this.state = state;
this.fips = fips;
this.cases = cases;
this.deaths = deaths;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(date);
out.writeUTF(country);
out.writeUTF(state);
out.writeUTF(fips);
out.writeInt(cases);
out.writeInt(deaths);
}
@Override
public void readFields(DataInput in) throws IOException {
this.date = in.readUTF();
this.country = in.readUTF();
this.state = in.readUTF();
this.fips = in.readUTF();
this.cases = in.readInt();
this.deaths = in.readInt();
}
@Override
public void write(PreparedStatement statement) throws SQLException {
statement.setString(1,this.date);
statement.setString(2,this.country);
statement.setString(3,this.state);
statement.setString(4,this.fips);
statement.setInt(5,this.cases);
statement.setInt(6,this.deaths);
}
@Override
public void readFields(ResultSet resultSet) throws SQLException {
this.date = resultSet.getString(1);
this.country = resultSet.getString(2);
this.state = resultSet.getString(3);
this.fips = resultSet.getString(4);
this.cases = resultSet.getInt(5);
this.deaths = resultSet.getInt(6);
}
public String getDate() {
return date;
}
public void setDate(String date) {
this.date = date;
}
public String getCountry() {
return country;
}
public void setCountry(String country) {
this.country = country;
}
public String getState() {
return state;
}
public void setState(String state) {
this.state = state;
}
public String getFips() {
return fips;
}
public void setFips(String fips) {
this.fips = fips;
}
public int getCases() {
return cases;
}
public void setCases(int cases) {
this.cases = cases;
}
public int getDeaths() {
return deaths;
}
public void setDeaths(int deaths) {
this.deaths = deaths;
}
}
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class MapSQL extends Mapper<LongWritable, Text, NullWritable, BeanSQL> {
BeanSQL v = new BeanSQL();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 获取每行数据进行字段拆分
String[] fields = value.toString().split(",");
// 判断
if (fields[0].equals("date") || fields.length < 6) return;
// 封装
v.set(
fields[0],
fields[1],
fields[2],
fields[3],
(int)Double.parseDouble(fields[4]),
(int)Double.parseDouble(fields[5])
);
// 写出
context.write(NullWritable.get(),v);
System.out.println(v.toString());
}
}
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class ReduceSQL extends Reducer<NullWritable, BeanSQL, BeanSQL, NullWritable> {
@Override
protected void reduce(NullWritable key, Iterable<BeanSQL> values, Context context) throws IOException, InterruptedException {
// 写出
for (BeanSQL bean:values){
context.write(bean,key);
System.out.println(bean);
}
}
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class DriverSQL {
public static void main(String[] args) {
try {
// 创建job
Configuration conf = new Configuration();
// 创建数据库连接
DBConfiguration.configureDB(
conf,
"com.mysql.jdbc.Driver",
"jdbc:mysql://localhost:3306/mr",
"root","123456"
);
Job job = Job.getInstance(conf);
// 配置Map、reduce、driver类
job.setMapperClass(MapSQL.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(BeanSQL.class);
job.setReducerClass(ReduceSQL.class);
job.setOutputKeyClass(BeanSQL.class);
job.setOutputValueClass(NullWritable.class);
job.setJarByClass(DriverSQL.class);
// 设置输入文件路径
Path input = new Path(
"G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\美国疫情\\data\\us_counties_covid19_daily.csv"
);
FileInputFormat.setInputPaths(job,input);
// 设置输出目标为数据库
job.setOutputFormatClass(DBOutputFormat.class);
// String[] fields = {"date","country","state","fips","cases","deaths"};
DBOutputFormat.setOutput(job,"usa","date","country","state","fips","cases","deaths");
// 提交job
System.exit(job.waitForCompletion(true) ? 0:1);
} catch (Exception e) {
e.printStackTrace();
}
}
}
date,county,state,fips,cases,deaths
2020-01-21,Snohomish,Washington,53061.0,1,0.0
2020-01-22,Snohomish,Washington,53061.0,1,0.0
2020-01-23,Snohomish,Washington,53061.0,1,0.0
2020-01-24,Cook,Illinois,17031.0,1,0.0
2020-01-24,Snohomish,Washington,53061.0,1,0.0
2020-01-25,Orange,California,6059.0,1,0.0
2020-01-25,Cook,Illinois,17031.0,1,0.0
2020-01-25,Snohomish,Washington,53061.0,1,0.0
2020-01-26,Maricopa,Arizona,4013.0,1,0.0
2020-01-26,Los Angeles,California,6037.0,1,0.0
2020-01-26,Orange,California,6059.0,1,0.0
2020-01-26,Cook,Illinois,17031.0,1,0.0
2020-04-03,Nash,North Carolina,37127.0,14,0.0
2020-04-03,New Hano
原始数据集比较大,这里截取部分
这里做了改动,将创建数据库连接放在了获取job的前面,运行后还是报错,但是比之前的报错java.io.IOException多了点内容,显示不能够连接到数据据库服务,但是navicate可以连接,按照 https://blog.csdn.net/qq_27387133/article/details/106277653 的方法后,还是不行(也许有的可以)。然后就想着换种方式,看看能不能传到虚拟机的数据库中。
创建好表后,然后再改一下数据库连接地址,运行后可以了!
发现应该是数据库的问题,本地装的8.0的,虚拟机装的5.7的,程序环境中的mysql-connector是5.1.40的。