Flink RocketMQ连接器

   目前因为有接入外部门RocketMQ数据的需求,目前关于Flink并没有官方的RocketMQ,同时为了兼容未来大数据平台的需求,需要有DataStream和FlinkSQL两种方式。

目前找了一个开源的库,并安装到本场Maven仓库并引入了。

   Flink RocketMQ Connector的GitHub地址:https://github.com/apache/rocketmq-flink。
   目前我把这个资源安装到本地的仓库。通过POM引用。

代码

public class RocketMqSourceTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //env.setParallelism(1);
        // enable checkpoint
        env.enableCheckpointing(3000);
        Properties consumerProps = new Properties();
        consumerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, "srvbd59.net.cn:9876");
        consumerProps.setProperty(RocketMQConfig.CONSUMER_GROUP, "group1");
        consumerProps.setProperty(RocketMQConfig.CONSUMER_TOPIC, "rocketTopic");
        RocketMQSourceFunction<String> source = new RocketMQSourceFunction<String>(new SimpleStringDeserializationSchema(), consumerProps);
        source.setStartFromGroupOffsets(OffsetResetStrategy.EARLIEST);
        SingleOutputStreamOperator<String> map = env.addSource(source).map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                return value;
            }
        });
        map.print();
        env.execute("rocketmq-flink-example1");
    }
}

运行结果及详细报错内容

我向Topic投递消息之后,普通的RocketMQConsumer可以消费到数据。但通过Flink RocketMQ Connector(即上面的代码)没有消费打印的数据,也没有抛出错误。

解答思路

控制台也打印了一些报错。但是是和日志相关的错误。代码相关异常并没有,因此无从排查。个人感觉可以通过日志框架看看,是否可以通过日志框架打印可以排查的信息。

我想要达到的结果。

目前我的Flink版本是1.14.5。希望基于这个版本。尝试引入Flink RocketMQ Connector。
1:需要DataStream 和FlinkSql两种都可以实现。
2:资源可以安装到本地仓库。通过POM的方式引用。

望采纳!!!点击回答右侧采纳即可!!
我猜测可能是因为以下几个原因,可以检测一下:

Topic 配置问题: 请确保 RocketMQ 中的 topic 名称和 consumerProps 中配置的 topic 名称一致。

NameServer 配置问题: 请确保 consumerProps 中 NameServer 地址配置正确。

RocketMQ 版本问题: 请确保 RocketMQ Connector 和您使用的 RocketMQ 版本兼容。

ConsumerGroup 配置问题: 请确保 consumerProps 中 consumerGroup 配置正确,如果没有配置请确保消费组与生产者的消息对应。

检查下一下rmq/conf下的broker.conf文件是否配置正确,其次检查下代码是否遗漏。你现在没有思路,无从排查的话,那你就要看下你是对着哪里的例子或者教程写的程序,对一遍看是不是那里写错了或者遗漏了。

package org.apache.rocketmq.flink.legacy.example;

import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.flink.legacy.RocketMQConfig;
import org.apache.rocketmq.flink.legacy.RocketMQSink;
import org.apache.rocketmq.flink.legacy.RocketMQSourceFunction;
import org.apache.rocketmq.flink.legacy.common.serialization.SimpleTupleDeserializationSchema;
import org.apache.rocketmq.flink.legacy.function.SinkMapFunction;
import org.apache.rocketmq.flink.legacy.function.SourceMapFunction;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Properties;

import static org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_OFFSET_LATEST;
import static org.apache.rocketmq.flink.legacy.RocketMQConfig.DEFAULT_CONSUMER_TAG;

public class RocketMQFlinkExample {

/**
 * Source Config
 *
 * @return properties
 */
private static Properties getConsumerProps() {
    Properties consumerProps = new Properties();
    consumerProps.setProperty(
            RocketMQConfig.NAME_SERVER_ADDR,
            "127.0.0.1:9876");
    consumerProps.setProperty(RocketMQConfig.CONSUMER_GROUP, "flink_consumer_test");
    consumerProps.setProperty(RocketMQConfig.CONSUMER_TOPIC, "SOURCE_TOPIC");
    consumerProps.setProperty(RocketMQConfig.CONSUMER_TAG, DEFAULT_CONSUMER_TAG);
    consumerProps.setProperty(RocketMQConfig.CONSUMER_OFFSET_RESET_TO, CONSUMER_OFFSET_LATEST);
    return consumerProps;
}

/**
 * Sink Config
 *
 * @return properties
 */
private static Properties getProducerProps() {
    Properties producerProps = new Properties();
    producerProps.setProperty(
            RocketMQConfig.NAME_SERVER_ADDR,
            "127.0.0.1:9876");
    producerProps.setProperty(RocketMQConfig.PRODUCER_GROUP, "flink_produce_test");
    return producerProps;
}

public static void main(String[] args) throws Exception {

    //final ParameterTool params = ParameterTool.fromArgs(args);

    // for local
    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

    // for cluster
    // StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    //env.getConfig().setGlobalJobParameters(params);
    env.setStateBackend(new MemoryStateBackend());
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    // start a checkpoint every 10s
    env.enableCheckpointing(10000);
    // advanced options:
    // set mode to exactly-once (this is the default)
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    // checkpoints have to complete within one minute, or are discarded
    env.getCheckpointConfig().setCheckpointTimeout(60000);
    // make sure 500 ms of progress happen between checkpoints
    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
    // allow only one checkpoint to be in progress at the same time
    env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
    // enable externalized checkpoints which are retained after job cancellation
    env.getCheckpointConfig()
            .enableExternalizedCheckpoints(
                    CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

    Properties consumerProps = getConsumerProps();
    Properties producerProps = getProducerProps();

    SimpleTupleDeserializationSchema schema = new SimpleTupleDeserializationSchema();

    DataStreamSource<Tuple2<String, String>> source =
            env.addSource(new RocketMQSourceFunction<>(schema, consumerProps))
                    .setParallelism(2);

    source.print();
    source.process(new SourceMapFunction())
            .process(new SinkMapFunction("SINK_TOPIC", "*"))
            .addSink(
                    new RocketMQSink(producerProps)
                            .withBatchFlushOnCheckpoint(true)
                            .withBatchSize(32)
                            .withAsync(true))
            .setParallelism(2);

    env.execute("rocketmq-connect-flink");
}

}

你参考下:
生产者:

package org.apache.rocketmq.flink.legacy.example;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SimpleProducer {

    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleProducer.class);

    private static final int MESSAGE_NUM = 10000;

    private static final String PRODUCER_GROUP = "GID_SIMPLE_PRODUCER";
    private static final String TOPIC = "SOURCE_TOPIC";
    private static final String TAGS = "*";
    private static final String KEY_PREFIX = "KEY";

    private static RPCHook getAclRPCHook() {
        final String accessKey = "${AccessKey}";
        final String secretKey = "${SecretKey}";
        return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
    }

    public static void main(String[] args) {
        DefaultMQProducer producer =
                new DefaultMQProducer(PRODUCER_GROUP, true, null);
        producer.setNamesrvAddr("127.0.0.1:9876");

        // When using aliyun products, you need to set up channels
        //producer.setAccessChannel(AccessChannel.CLOUD);

        try {
            producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }

        for (int i = 0; i < MESSAGE_NUM; i++) {
            String content = "Test Message " + i;
            Message msg = new Message(TOPIC, TAGS, KEY_PREFIX + i, content.getBytes());
            try {
                SendResult sendResult = producer.send(msg);
                assert sendResult != null;
                System.out.printf(
                        "send result: %s %s\n",
                        sendResult.getMsgId(), sendResult.getMessageQueue().toString());
                Thread.sleep(50);
            } catch (Exception e) {
                LOGGER.info("send message failed. {}", e.toString());
            }
        }
    }
}

消费者:

package org.apache.rocketmq.flink.legacy.example;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SimpleConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumer.class);

    private static final String GROUP = "GID_SIMPLE_CONSUMER";
    private static final String TOPIC = "SOURCE_TOPIC";
    private static final String TAGS = "*";

    private static RPCHook getAclRPCHook() {
        final String accessKey = "${AccessKey}";
        final String secretKey = "${SecretKey}";
        return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
    }

    public static void main(String[] args) {
        DefaultMQPushConsumer consumer =
                new DefaultMQPushConsumer(
                        GROUP);
        consumer.setNamesrvAddr("127.0.0.1:9876");

        // When using aliyun products, you need to set up channels
        //consumer.setAccessChannel(AccessChannel.CLOUD);

        try {
            consumer.subscribe(TOPIC, TAGS);
        } catch (MQClientException e) {
            e.printStackTrace();
        }

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.registerMessageListener(
                (MessageListenerConcurrently)
                        (msgs, context) -> {
                            for (MessageExt msg : msgs) {
                                System.out.printf(
                                        "%s %s %d %s\n",
                                        msg.getMsgId(),
                                        msg.getBrokerName(),
                                        msg.getQueueId(),
                                        new String(msg.getBody()));
                            }
                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                        });

        try {
            consumer.start();
        } catch (MQClientException e) {
            LOGGER.info("send message failed. {}", e.toString());
        }
    }
}

flink从mq消费并写入mq

package org.apache.rocketmq.flink.legacy.example;

import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.flink.legacy.RocketMQConfig;
import org.apache.rocketmq.flink.legacy.RocketMQSink;
import org.apache.rocketmq.flink.legacy.RocketMQSourceFunction;
import org.apache.rocketmq.flink.legacy.common.serialization.SimpleTupleDeserializationSchema;
import org.apache.rocketmq.flink.legacy.function.SinkMapFunction;
import org.apache.rocketmq.flink.legacy.function.SourceMapFunction;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Properties;

import static org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_OFFSET_LATEST;
import static org.apache.rocketmq.flink.legacy.RocketMQConfig.DEFAULT_CONSUMER_TAG;

public class RocketMQFlinkExample {

    /**
     * Source Config
     *
     * @return properties
     */
    private static Properties getConsumerProps() {
        Properties consumerProps = new Properties();
        consumerProps.setProperty(
                RocketMQConfig.NAME_SERVER_ADDR,
                "127.0.0.1:9876");
        consumerProps.setProperty(RocketMQConfig.CONSUMER_GROUP, "flink_consumer_test");
        consumerProps.setProperty(RocketMQConfig.CONSUMER_TOPIC, "SOURCE_TOPIC");
        consumerProps.setProperty(RocketMQConfig.CONSUMER_TAG, DEFAULT_CONSUMER_TAG);
        consumerProps.setProperty(RocketMQConfig.CONSUMER_OFFSET_RESET_TO, CONSUMER_OFFSET_LATEST);
        return consumerProps;
    }

    /**
     * Sink Config
     *
     * @return properties
     */
    private static Properties getProducerProps() {
        Properties producerProps = new Properties();
        producerProps.setProperty(
                RocketMQConfig.NAME_SERVER_ADDR,
                "127.0.0.1:9876");
        producerProps.setProperty(RocketMQConfig.PRODUCER_GROUP, "flink_produce_test");
        return producerProps;
    }

    public static void main(String[] args) throws Exception {

        //final ParameterTool params = ParameterTool.fromArgs(args);

        // for local
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

        // for cluster
        // StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //env.getConfig().setGlobalJobParameters(params);
        env.setStateBackend(new MemoryStateBackend());
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // start a checkpoint every 10s
        env.enableCheckpointing(10000);
        // advanced options:
        // set mode to exactly-once (this is the default)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // checkpoints have to complete within one minute, or are discarded
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        // make sure 500 ms of progress happen between checkpoints
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        // allow only one checkpoint to be in progress at the same time
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        // enable externalized checkpoints which are retained after job cancellation
        env.getCheckpointConfig()
                .enableExternalizedCheckpoints(
                        CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        Properties consumerProps = getConsumerProps();
        Properties producerProps = getProducerProps();

        SimpleTupleDeserializationSchema schema = new SimpleTupleDeserializationSchema();

        DataStreamSource<Tuple2<String, String>> source =
                env.addSource(new RocketMQSourceFunction<>(schema, consumerProps))
                        .setParallelism(2);

        source.print();
        source.process(new SourceMapFunction())
                .process(new SinkMapFunction("SINK_TOPIC", "*"))
                .addSink(
                        new RocketMQSink(producerProps)
                                .withBatchFlushOnCheckpoint(true)
                                .withBatchSize(32)
                                .withAsync(true))
                .setParallelism(2);

        env.execute("rocketmq-connect-flink");
    }
}