Flink Sum计算


package org.example.tmp;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.*;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;


public class WordCount1 {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
//        DataSource<String> stringDataSource = executionEnvironment.fromElements("it hadoop spark flink", "it hadoop spark", "it hadoop", "it");
        DataSet<String> ds= executionEnvironment.fromElements("it hadoop spark flink", "it hadoop spark", "it hadoop", "it");
        FlatMapOperator<String, String> flatMapOperator = ds.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                String[] split = value.split(" ");
                for (String s : split) {
                    out.collect(s);
                }
            }
        });
        MapOperator<String, Tuple2<String, Integer>> mapOperator = flatMapOperator.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                return Tuple2.of(value, 1);
            }
        });
        UnsortedGrouping<Tuple2<String, Integer>> groupBy = mapOperator.groupBy(0);
        AggregateOperator<Tuple2<String, Integer>> sum = groupBy.sum(1);
        SortPartitionOperator<Tuple2<String, Integer>> sort = sum.sortPartition(1, Order.DESCENDING).setParallelism(1);
        sort.print();
        //5.触发执行-execute//如果有print,DataSet不需要调用execute,DataStream需要调用execute
//        executionEnvironment.execute();
    }
}

输出结果为:

img

package org.example.tmp;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.util.Collector;

public class WordCount2 {
    public static void main(String[] args) throws Exception  {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        DataStream<String> dataStream = env.fromElements("it hadoop spark flink", "it hadoop spark", "it hadoop", "it");
        DataStream<String> flatMap = dataStream.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                for (String s : value.split(" ")) {
                    out.collect(s);
                }
            }
        });
        DataStream<Tuple2<String, Integer>> mapDataStream = flatMap.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String s) throws Exception {
                return  Tuple2.of(s, 1);
            }
        });
        KeyedStream<Tuple2<String, Integer>, Tuple> keyBy = mapDataStream.keyBy(0);
        DataStream<Tuple2<String, Integer>> sum = keyBy.sum(1).setParallelism(1);
        sum.print();
        env.execute();
    }
}


输出结果为:

img

问题:为什么下面datastream的形式去写flink的代码,执行sum统计的时候,当我的词的数据量为1的时候,结果没有统计到?

原:

 DataStream<String> dataStream = env.fromElements("it hadoop spark flink", "it hadoop spark", "it hadoop", "it");

改为:

 DataStream<String> dataStream = env.fromElements("it hadoop spark flink", "it hadoop spark", "it hadoop", "it","flink","test");

这样改的话,test也不会输出来.

【以下回答由 GPT 生成】

首先,我注意到您的代码中存在一些不必要的步骤和冗余操作,例如flatMap和map函数可以合并为一个操作,并且可以直接调用sum函数进行求和。此外,您在排序操作中设置了并行度为1,这可能会影响性能。下面是优化后的代码:

package org.example.tmp;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class WordCount1 {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<String> ds = executionEnvironment.fromElements("it hadoop spark flink", "it hadoop spark", "it hadoop", "it");

        DataSet<Tuple2<String, Integer>> result = ds
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                        String[] split = value.split(" ");
                        for (String s : split) {
                            out.collect(Tuple2.of(s, 1));
                        }
                    }
                })
                .groupBy(new KeySelector<Tuple2<String, Integer>, String>() {
                    @Override
                    public String getKey(Tuple2<String, Integer> value) throws Exception {
                        return value.f0;
                    }
                })
                .sum(1);

        result.print();
    }
}

在新的代码中,我们首先将flatMap和map函数合并为一个操作,使用Tuple2.of函数创建Tuple2类型的数据。然后使用groupBy函数按照单词进行分组,最后直接调用sum函数进行求和。

这样的优化能够减少不必要的步骤和操作,提高代码的执行效率。希望对您有所帮助!


如果你已经解决了该问题, 非常希望你能够分享一下解决方案, 写成博客, 将相关链接放在评论区, 以帮助更多的人 ^-^