public class ipSort {
public static class Map extends Mapper{
//将输入文件转换成的形式
private final static IntWritable ipNum = new IntWritable();
private Text ipAdd = new Text();
public void map(LongWritable key, IntWritable value, Context context)
throws IOException, InterruptedException{
//把每一行转成字符串
String line = value.toString();
// 分割每一行
StringTokenizer token = new StringTokenizer(line);
//solve every line
while(token.hasMoreElements()){
//divided by blank
StringTokenizer tokenLine = new StringTokenizer(token.nextToken());
ipAdd.set(token.nextToken().trim());
ipNum.set(Integer.valueOf(token.nextToken().trim()));
context.write(ipNum,new Text(ipAdd));
}
}
}
public static class Reduce extends Reducer<IntWritable, Text, Text, IntWritable>{
//把Map阶段的输出结果颠倒;
private Text result = new Text();
public void reduce(IntWritable key,Iterable<Text> values, Context context)
throws IOException, InterruptedException{
for(Text val : values){
result.set(val.toString());
context.write(new Text(result),key);
}
}
}
public static class IntKeyDescComparator extends WritableComparator{
protected IntKeyDescComparator(){
super(IntWritable.class,true);
}
public int compare(WritableComparable a, WritableComparable b){
return super.compare(a, b);
}
}
public static void main(String args[])
throws IOException, ClassNotFoundException, InterruptedException{
System.setProperty("hadoop.home.dir", "C:\\Users\\lenovo\\Desktop\\hadoop-2.6.0\\hadoop-2.6.0");
Configuration conf = new Configuration();
conf.set("mapred.job.tracker", "192.168.142.138");
Job job = new Job(conf,"ipSort");
job.setJarByClass(ipSort.class);
job.setSortComparatorClass(IntKeyDescComparator.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path("hdfs://10.170.54.193:9000/input"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://10.170.54.193:9000/output"));
System.exit(job.waitForCompletion(true)?0:1);
}
运行时出现问题Caused by: java.lang.ClassCastException: org.apache.hadoop.io.Text cannot be cast to org.apache.hadoop.io.IntWritable,但是找不到哪里类型转换错误了
public void reduce(IntWritable key,Iterable<Text> values, Context context)
throws IOException, InterruptedException{
for(Text val : values){
result.set(val.toString());
context.write(new Text(result),key);
}
}
讀代碼感覺可能是 context.write(new Text(result),key); 出錯了。
正確的是不是:context.write(key,new Text(result));
package ipmapreduce;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class ipSort {
public static class Map extends Mapper{
//将输入文件转换成的形式
private final static IntWritable ipNum = new IntWritable(1);
private Text ipAdd = new Text();
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException{
String line = value.toString();
// 分割每一行
StringTokenizer token = new StringTokenizer(line);
//solve every line
while(token.hasMoreElements()){
//divided by blank
//StringTokenizer tokenLine = new StringTokenizer(token.nextToken());
ipAdd.set(token.nextToken().trim());
ipNum.set(Integer.valueOf(token.nextToken().trim()));
context.write(ipNum,ipAdd);
}
}
}
public static class Reduce extends Reducer<IntWritable, Text, Text, IntWritable>{
//把Map阶段的输出结果颠倒;
private Text result = new Text();
public void reduce(IntWritable key,Iterable<Text> values, Context context)
throws IOException, InterruptedException{
for(Text val : values){
result.set(val.toString());
context.write(result,key);
}
}
}
public static class IntKeyDescComparator extends WritableComparator{
protected IntKeyDescComparator(){
super(IntWritable.class,true);
}
public int compare(WritableComparable a, WritableComparable b){
return super.compare(a, b);
}
}
public static void main(String args[])
throws IOException, ClassNotFoundException, InterruptedException{
System.setProperty("hadoop.home.dir", "C:\\Users\\lenovo\\Desktop\\hadoop-2.6.0\\hadoop-2.6.0");
Configuration conf = new Configuration();
conf.set("mapred.job.tracker", "192.168.142.138");
Job job = new Job(conf,"ipSort");
job.setJarByClass(ipSort.class);
job.setSortComparatorClass(IntKeyDescComparator.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//job.setInputFormatClass(TextInputFormat.class);
//job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path("hdfs://10.170.54.193:9000/input"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://10.170.54.193:9000/output"));
System.exit(job.waitForCompletion(true)?0:1);
}
}