看看,耽误一分钟帮我看一下啦

耽误一分钟帮我看一下感谢啦
报错 For input string: ]

img

测试数据

img

package com.qingtai.graph;

import java.io.IOException;

import com.aliyun.odps.io.WritableRecord;
import com.aliyun.odps.graph.Combiner;
import com.aliyun.odps.graph.ComputeContext;
import com.aliyun.odps.graph.Edge;
import com.aliyun.odps.graph.GraphJob;
import com.aliyun.odps.graph.GraphLoader;
import com.aliyun.odps.graph.MutationContext;
import com.aliyun.odps.graph.Vertex;
import com.aliyun.odps.graph.WorkerContext;
import com.aliyun.odps.io.LongWritable;
import com.aliyun.odps.data.TableInfo;

/*** @author * @date * @version 1.0* @parameter* @desc* @since* @return*/
public class SSSP {
    public static final String START_VERTEX = "sssp.start.vertex.id";

    /*** 定义SSSPVertex ,其中:
     * 点值表示该点到源点startVertexId 的当前最短距离。
     * compute() 方法使用迭代公式:d[v]=min(d[v], d[u]+weight(u, v)) 更新点值。
     * cleanup() 方法把点及其到源点的最短距离写到结果表中。*/


    public static class SSSPVertex extends Vertex {
        private static long startVertexId = -1;

        /***定义SSSPVertex ,其中:点值表示该点到源点startVertexId 的当前最短距离。
         * compute() 方法使用迭代公式:d[v]=min(d[v], d[u]+weight(u, v)) 更新点值。
         * cleanup() 方法把点及其到源点的最短距离写到结果表中。*/

        public SSSPVertex() {
            this.setValue(new LongWritable(Long.MAX_VALUE));
        }

        public boolean isStartVertex(ComputeContext context) {
            if (startVertexId == -1) {
                String s = context.getConfiguration().get(START_VERTEX);
                startVertexId = Long.parseLong(s);
            }
            return getId().get() == startVertexId;
        }

        @Override
        public void compute(ComputeContext context, Iterable messages) throws IOException {
            long minDist = isStartVertex(context) ? 0 : Integer.MAX_VALUE;
            for (LongWritable msg : messages) {
                if (msg.get() < minDist) {
                    minDist = msg.get();
                }
            }
            if (minDist < this.getValue().get()) {
                this.setValue(new LongWritable(minDist));
                if (hasEdges()) {
                    for (Edge e : this.getEdges()) {
                        context.sendMessage(e.getDestVertexId(), new LongWritable(minDist + e.getValue().get()));
                    }
                }
            } else {
                // 当点值没发生变化时,调用voteToHalt() 告诉框架该点进入halt 状态,当所有点都进入halt 状态时,计算结束。
                voteToHalt();
            }
        }

        @Override
        public void cleanup(WorkerContext context) throws IOException {
            context.write(getId(), getValue());
        }
    }

    /**
     * 定义MinLongCombiner,对发送给同一个点的消息进行合并,优化性能,减少内存占用。
     **/
    public static class MinLongCombiner extends Combiner {
        @Override
        public void combine(LongWritable vertexId, LongWritable combinedMessage, LongWritable messageToCombine) throws IOException {
            if (combinedMessage.get() > messageToCombine.get()) {
                combinedMessage.set(messageToCombine.get());
            }
        }
    }

    /*** 定义SSSPVertexReader 类,加载图,将表中每一条记录解析为一个点,记录的第一列是点标识,第二列存储该点起始的所有的边集,内容如:2:2,3:1,4:4。**/
    public static class SSSPVertexReader extends GraphLoader {
        @Override
        public void load(LongWritable recordNum, WritableRecord record, MutationContext context) throws IOException {
            SSSPVertex vertex = new SSSPVertex();
            vertex.setId((LongWritable) record.get(0));
            String[] edges = record.get(1).toString().split(",");
            for (int i = 0;
                 i < edges.length;
                 i++) {
                String[] ss = edges[i].split(":");
                vertex.addEdge(new LongWritable(Long.parseLong(ss[0])), new LongWritable(Long.parseLong(ss[1])));
            }
            context.addVertexRequest(vertex);
        }
    }

    /*** 主程序(main 函数),定义GraphJob,指定Vertex/GraphLoader/Combiner 等的实现,指定输入输出表。*/

    public static void main(String[] args) throws IOException {
        if (args.length < 2) {
            System.out.println("Usage: < >  ");
            System.exit(-1);
        }
        GraphJob job = new GraphJob();
        job.setGraphLoaderClass(SSSPVertexReader.class);
        job.setVertexClass(SSSPVertex.class);
        job.setCombinerClass(MinLongCombiner.class);
        job.set(START_VERTEX, args[0]);
        job.addInput(TableInfo.builder().tableName(args[1]).build());
        job.addOutput(TableInfo.builder().tableName(args[2]).build());
        long startTime = System.currentTimeMillis();
        job.run();
        System.out.println("Job Finished in " + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
    }
}

img

这是因为格式转换时,有乱码导致异常发生。