目前因为有接入外部门RocketMQ数据的需求,目前关于Flink并没有官方的RocketMQ,同时为了兼容未来大数据平台的需求,需要有DataStream和FlinkSQL两种方式。
Flink RocketMQ Connector的GitHub地址:https://github.com/apache/rocketmq-flink。
目前我把这个资源安装到本地的仓库。通过POM引用。
public class RocketMqSourceTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.setParallelism(1);
// enable checkpoint
env.enableCheckpointing(3000);
Properties consumerProps = new Properties();
consumerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, "srvbd59.net.cn:9876");
consumerProps.setProperty(RocketMQConfig.CONSUMER_GROUP, "group1");
consumerProps.setProperty(RocketMQConfig.CONSUMER_TOPIC, "rocketTopic");
RocketMQSourceFunction<String> source = new RocketMQSourceFunction<String>(new SimpleStringDeserializationSchema(), consumerProps);
source.setStartFromGroupOffsets(OffsetResetStrategy.EARLIEST);
SingleOutputStreamOperator<String> map = env.addSource(source).map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
return value;
}
});
map.print();
env.execute("rocketmq-flink-example1");
}
}
我向Topic投递消息之后,普通的RocketMQConsumer可以消费到数据。但通过Flink RocketMQ Connector(即上面的代码)没有消费打印的数据,也没有抛出错误。
控制台也打印了一些报错。但是是和日志相关的错误。代码相关异常并没有,因此无从排查。个人感觉可以通过日志框架看看,是否可以通过日志框架打印可以排查的信息。
目前我的Flink版本是1.14.5。希望基于这个版本。尝试引入Flink RocketMQ Connector。
1:需要DataStream 和FlinkSql两种都可以实现。
2:资源可以安装到本地仓库。通过POM的方式引用。
望采纳!!!点击回答右侧采纳即可!!
我猜测可能是因为以下几个原因,可以检测一下:
Topic 配置问题: 请确保 RocketMQ 中的 topic 名称和 consumerProps 中配置的 topic 名称一致。
NameServer 配置问题: 请确保 consumerProps 中 NameServer 地址配置正确。
RocketMQ 版本问题: 请确保 RocketMQ Connector 和您使用的 RocketMQ 版本兼容。
ConsumerGroup 配置问题: 请确保 consumerProps 中 consumerGroup 配置正确,如果没有配置请确保消费组与生产者的消息对应。
检查下一下rmq/conf下的broker.conf文件是否配置正确,其次检查下代码是否遗漏。你现在没有思路,无从排查的话,那你就要看下你是对着哪里的例子或者教程写的程序,对一遍看是不是那里写错了或者遗漏了。
package org.apache.rocketmq.flink.legacy.example;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.flink.legacy.RocketMQConfig;
import org.apache.rocketmq.flink.legacy.RocketMQSink;
import org.apache.rocketmq.flink.legacy.RocketMQSourceFunction;
import org.apache.rocketmq.flink.legacy.common.serialization.SimpleTupleDeserializationSchema;
import org.apache.rocketmq.flink.legacy.function.SinkMapFunction;
import org.apache.rocketmq.flink.legacy.function.SourceMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;
import static org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_OFFSET_LATEST;
import static org.apache.rocketmq.flink.legacy.RocketMQConfig.DEFAULT_CONSUMER_TAG;
public class RocketMQFlinkExample {
/**
* Source Config
*
* @return properties
*/
private static Properties getConsumerProps() {
Properties consumerProps = new Properties();
consumerProps.setProperty(
RocketMQConfig.NAME_SERVER_ADDR,
"127.0.0.1:9876");
consumerProps.setProperty(RocketMQConfig.CONSUMER_GROUP, "flink_consumer_test");
consumerProps.setProperty(RocketMQConfig.CONSUMER_TOPIC, "SOURCE_TOPIC");
consumerProps.setProperty(RocketMQConfig.CONSUMER_TAG, DEFAULT_CONSUMER_TAG);
consumerProps.setProperty(RocketMQConfig.CONSUMER_OFFSET_RESET_TO, CONSUMER_OFFSET_LATEST);
return consumerProps;
}
/**
* Sink Config
*
* @return properties
*/
private static Properties getProducerProps() {
Properties producerProps = new Properties();
producerProps.setProperty(
RocketMQConfig.NAME_SERVER_ADDR,
"127.0.0.1:9876");
producerProps.setProperty(RocketMQConfig.PRODUCER_GROUP, "flink_produce_test");
return producerProps;
}
public static void main(String[] args) throws Exception {
//final ParameterTool params = ParameterTool.fromArgs(args);
// for local
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
// for cluster
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.getConfig().setGlobalJobParameters(params);
env.setStateBackend(new MemoryStateBackend());
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// start a checkpoint every 10s
env.enableCheckpointing(10000);
// advanced options:
// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);
// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig()
.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
Properties consumerProps = getConsumerProps();
Properties producerProps = getProducerProps();
SimpleTupleDeserializationSchema schema = new SimpleTupleDeserializationSchema();
DataStreamSource<Tuple2<String, String>> source =
env.addSource(new RocketMQSourceFunction<>(schema, consumerProps))
.setParallelism(2);
source.print();
source.process(new SourceMapFunction())
.process(new SinkMapFunction("SINK_TOPIC", "*"))
.addSink(
new RocketMQSink(producerProps)
.withBatchFlushOnCheckpoint(true)
.withBatchSize(32)
.withAsync(true))
.setParallelism(2);
env.execute("rocketmq-connect-flink");
}
}
你参考下:
生产者:
package org.apache.rocketmq.flink.legacy.example;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SimpleProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleProducer.class);
private static final int MESSAGE_NUM = 10000;
private static final String PRODUCER_GROUP = "GID_SIMPLE_PRODUCER";
private static final String TOPIC = "SOURCE_TOPIC";
private static final String TAGS = "*";
private static final String KEY_PREFIX = "KEY";
private static RPCHook getAclRPCHook() {
final String accessKey = "${AccessKey}";
final String secretKey = "${SecretKey}";
return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
}
public static void main(String[] args) {
DefaultMQProducer producer =
new DefaultMQProducer(PRODUCER_GROUP, true, null);
producer.setNamesrvAddr("127.0.0.1:9876");
// When using aliyun products, you need to set up channels
//producer.setAccessChannel(AccessChannel.CLOUD);
try {
producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
for (int i = 0; i < MESSAGE_NUM; i++) {
String content = "Test Message " + i;
Message msg = new Message(TOPIC, TAGS, KEY_PREFIX + i, content.getBytes());
try {
SendResult sendResult = producer.send(msg);
assert sendResult != null;
System.out.printf(
"send result: %s %s\n",
sendResult.getMsgId(), sendResult.getMessageQueue().toString());
Thread.sleep(50);
} catch (Exception e) {
LOGGER.info("send message failed. {}", e.toString());
}
}
}
}
消费者:
package org.apache.rocketmq.flink.legacy.example;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SimpleConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumer.class);
private static final String GROUP = "GID_SIMPLE_CONSUMER";
private static final String TOPIC = "SOURCE_TOPIC";
private static final String TAGS = "*";
private static RPCHook getAclRPCHook() {
final String accessKey = "${AccessKey}";
final String secretKey = "${SecretKey}";
return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
}
public static void main(String[] args) {
DefaultMQPushConsumer consumer =
new DefaultMQPushConsumer(
GROUP);
consumer.setNamesrvAddr("127.0.0.1:9876");
// When using aliyun products, you need to set up channels
//consumer.setAccessChannel(AccessChannel.CLOUD);
try {
consumer.subscribe(TOPIC, TAGS);
} catch (MQClientException e) {
e.printStackTrace();
}
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(
(MessageListenerConcurrently)
(msgs, context) -> {
for (MessageExt msg : msgs) {
System.out.printf(
"%s %s %d %s\n",
msg.getMsgId(),
msg.getBrokerName(),
msg.getQueueId(),
new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
try {
consumer.start();
} catch (MQClientException e) {
LOGGER.info("send message failed. {}", e.toString());
}
}
}
flink从mq消费并写入mq
package org.apache.rocketmq.flink.legacy.example;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.flink.legacy.RocketMQConfig;
import org.apache.rocketmq.flink.legacy.RocketMQSink;
import org.apache.rocketmq.flink.legacy.RocketMQSourceFunction;
import org.apache.rocketmq.flink.legacy.common.serialization.SimpleTupleDeserializationSchema;
import org.apache.rocketmq.flink.legacy.function.SinkMapFunction;
import org.apache.rocketmq.flink.legacy.function.SourceMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;
import static org.apache.rocketmq.flink.legacy.RocketMQConfig.CONSUMER_OFFSET_LATEST;
import static org.apache.rocketmq.flink.legacy.RocketMQConfig.DEFAULT_CONSUMER_TAG;
public class RocketMQFlinkExample {
/**
* Source Config
*
* @return properties
*/
private static Properties getConsumerProps() {
Properties consumerProps = new Properties();
consumerProps.setProperty(
RocketMQConfig.NAME_SERVER_ADDR,
"127.0.0.1:9876");
consumerProps.setProperty(RocketMQConfig.CONSUMER_GROUP, "flink_consumer_test");
consumerProps.setProperty(RocketMQConfig.CONSUMER_TOPIC, "SOURCE_TOPIC");
consumerProps.setProperty(RocketMQConfig.CONSUMER_TAG, DEFAULT_CONSUMER_TAG);
consumerProps.setProperty(RocketMQConfig.CONSUMER_OFFSET_RESET_TO, CONSUMER_OFFSET_LATEST);
return consumerProps;
}
/**
* Sink Config
*
* @return properties
*/
private static Properties getProducerProps() {
Properties producerProps = new Properties();
producerProps.setProperty(
RocketMQConfig.NAME_SERVER_ADDR,
"127.0.0.1:9876");
producerProps.setProperty(RocketMQConfig.PRODUCER_GROUP, "flink_produce_test");
return producerProps;
}
public static void main(String[] args) throws Exception {
//final ParameterTool params = ParameterTool.fromArgs(args);
// for local
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
// for cluster
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.getConfig().setGlobalJobParameters(params);
env.setStateBackend(new MemoryStateBackend());
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// start a checkpoint every 10s
env.enableCheckpointing(10000);
// advanced options:
// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);
// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig()
.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
Properties consumerProps = getConsumerProps();
Properties producerProps = getProducerProps();
SimpleTupleDeserializationSchema schema = new SimpleTupleDeserializationSchema();
DataStreamSource<Tuple2<String, String>> source =
env.addSource(new RocketMQSourceFunction<>(schema, consumerProps))
.setParallelism(2);
source.print();
source.process(new SourceMapFunction())
.process(new SinkMapFunction("SINK_TOPIC", "*"))
.addSink(
new RocketMQSink(producerProps)
.withBatchFlushOnCheckpoint(true)
.withBatchSize(32)
.withAsync(true))
.setParallelism(2);
env.execute("rocketmq-connect-flink");
}
}