我部署flink1.14.4版本,为啥 执行flinksql,读kafka数据会自动停止啊

我部署flink1.14.4版本,为啥 执行flinksql,读kafka数据会自动停止啊,一般情况不都是一直在运行吗

create table kafka1(
  id int,
  data STRING
) with (
  'connector' = 'kafka'
  ,'topic' = 'ylqtest002'
  ,'properties.zookeeper.connect' = '10.0.0.xx:2181'
  ,'properties.bootstrap.servers' = '10.0.0.xxx:9092'
  ,'format' = 'json'
  ,'properties.group.id'='iceberg1'
  ,'scan.startup.mode'='latest-offset'
);
select * from kafka1;

执行

img

  • 关于该问题,我找了一篇非常好的博客,你可以看看是否有帮助,链接:Flink 消费kafka 序列化
  • 除此之外, 这篇博客: Flink1.13中基于flinksql实时数仓简易demo中的 4.3 模拟kafka数据 部分也许能够解决你的问题, 你可以仔细阅读以下内容或跳转源博客中阅读:
  • 1)子类数据构建:

    [bigdata_admin@dn5 ~]$ kafka-console-producer --broker-list dn3:9092,dn4:9092,dn5:9092 --topic son
    SLF4J: Class path contains multiple SLF4J bindings.
    SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/KAFKA-3.0.0-1.3.0.0.p0.40/lib/kafka/libs/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/KAFKA-3.0.0-1.3.0.0.p0.40/lib/kafka/libs/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
    22/05/11 23:20:40 INFO producer.ProducerConfig: ProducerConfig values: 
            acks = 1
            batch.size = 16384
            bootstrap.servers = [dn3:9092, dn4:9092, dn5:9092]
            buffer.memory = 33554432
            client.id = console-producer
            compression.type = none
            connections.max.idle.ms = 540000
            enable.idempotence = false
            interceptor.classes = null
            key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
            linger.ms = 1000
            max.block.ms = 60000
            max.in.flight.requests.per.connection = 5
            max.request.size = 1048576
            metadata.max.age.ms = 300000
            metric.reporters = []
            metrics.num.samples = 2
            metrics.recording.level = INFO
            metrics.sample.window.ms = 30000
            partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
            receive.buffer.bytes = 32768
            reconnect.backoff.max.ms = 1000
            reconnect.backoff.ms = 50
            request.timeout.ms = 1500
            retries = 3
            retry.backoff.ms = 100
            sasl.jaas.config = null
            sasl.kerberos.kinit.cmd = /usr/bin/kinit
            sasl.kerberos.min.time.before.relogin = 60000
            sasl.kerberos.service.name = null
            sasl.kerberos.ticket.renew.jitter = 0.05
            sasl.kerberos.ticket.renew.window.factor = 0.8
            sasl.mechanism = GSSAPI
            security.protocol = PLAINTEXT
            send.buffer.bytes = 102400
            ssl.cipher.suites = null
            ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
            ssl.endpoint.identification.algorithm = null
            ssl.key.password = null
            ssl.keymanager.algorithm = SunX509
            ssl.keystore.location = null
            ssl.keystore.password = null
            ssl.keystore.type = JKS
            ssl.protocol = TLS
            ssl.provider = null
            ssl.secure.random.implementation = null
            ssl.trustmanager.algorithm = PKIX
            ssl.truststore.location = null
            ssl.truststore.password = null
            ssl.truststore.type = JKS
            transaction.timeout.ms = 60000
            transactional.id = null
            value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    
    22/05/11 23:20:40 INFO utils.AppInfoParser: Kafka version : 0.11.0-kafka-3.0.0
    22/05/11 23:20:40 INFO utils.AppInfoParser: Kafka commitId : unknown
    
    >1,张三,21,shanghai,1001
    >2,李四,22,beijing,1002
    >3,王五,23,guangzhou,1003
    >4,赵六,24,shenzhen,1004
    

    2)父类数据构建

    [bigdata_admin@dn5 ~]$ kafka-console-producer --broker-list dn3:9092,dn4:9092,dn5:9092 --topic father
    
    >1001,张杰
    >1003,王杰
    >1005,钱杰
    >1007,李杰