我部署flink1.14.4版本,为啥 执行flinksql,读kafka数据会自动停止啊,一般情况不都是一直在运行吗
create table kafka1(
id int,
data STRING
) with (
'connector' = 'kafka'
,'topic' = 'ylqtest002'
,'properties.zookeeper.connect' = '10.0.0.xx:2181'
,'properties.bootstrap.servers' = '10.0.0.xxx:9092'
,'format' = 'json'
,'properties.group.id'='iceberg1'
,'scan.startup.mode'='latest-offset'
);
select * from kafka1;
执行
1)子类数据构建:
[bigdata_admin@dn5 ~]$ kafka-console-producer --broker-list dn3:9092,dn4:9092,dn5:9092 --topic son
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/KAFKA-3.0.0-1.3.0.0.p0.40/lib/kafka/libs/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/KAFKA-3.0.0-1.3.0.0.p0.40/lib/kafka/libs/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
22/05/11 23:20:40 INFO producer.ProducerConfig: ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [dn3:9092, dn4:9092, dn5:9092]
buffer.memory = 33554432
client.id = console-producer
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = false
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 1000
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 1500
retries = 3
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 102400
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
22/05/11 23:20:40 INFO utils.AppInfoParser: Kafka version : 0.11.0-kafka-3.0.0
22/05/11 23:20:40 INFO utils.AppInfoParser: Kafka commitId : unknown
>1,张三,21,shanghai,1001
>2,李四,22,beijing,1002
>3,王五,23,guangzhou,1003
>4,赵六,24,shenzhen,1004
2)父类数据构建
[bigdata_admin@dn5 ~]$ kafka-console-producer --broker-list dn3:9092,dn4:9092,dn5:9092 --topic father
>1001,张杰
>1003,王杰
>1005,钱杰
>1007,李杰