sparkstreaming+kafka消费不到数据,需求就是想手动维护offsets并且消费数据,spark版本2.3.0,kafka版本Kafka_2.10-0.9.0.2.4.0.0-169,求大神帮帮忙

这是没有维护offsets前的一个消费代码


import com.unicom.tools.MyUtils

import kafka.serializer.StringDecoder

import org.apache.spark.{SparkConf, SparkContext}

import org.apache.spark.streaming.{Seconds, StreamingContext}

import org.apache.spark.streaming.dstream.{InputDStream, ReceiverInputDStream}

import org.apache.spark.streaming.kafka.KafkaUtils


/**

* 好像是消费成功了

*/

object KafkaA {

def main(args: Array[String]): Unit = {

val conf: SparkConf = new SparkConf().setAppName(this.getClass.getName)

val con = new SparkContext(conf)

val ssc = new StreamingContext(con,Seconds(20))

ssc.sparkContext.setLogLevel("WARN")


val brokerList =""

val kafkaParam: Map[String, String] = Map[String, String](

"metadata.broker.list"-> brokerList,

"group.id" -> "EventConsumer",

"auto.offset.reset" -> "smallest"

)


val value1: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,kafkaParam,Set[String]("DLTE3"))

value1.foreachRDD(rdd=>rdd.foreachPartition(p=>p.foreach(

s=>{

val value: String = s._2

println(value)

}

)))

ssc.start()

ssc.awaitTermination()

}

}



这个代码会报一个错




维护offsets的代码


import java.io


import com.unicom.tools.MyUtils

import kafka.common.TopicAndPartition

import kafka.message.MessageAndMetadata

import kafka.serializer.StringDecoder

import org.apache.spark.streaming.dstream.InputDStream

import org.apache.spark.{SparkConf, SparkContext}

import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}

import org.apache.spark.streaming.{Seconds, StreamingContext}


object KafkaMaintainOffset {

def main(args: Array[String]): Unit = {

val conf: SparkConf = new SparkConf().setAppName(this.getClass.getName)

val con = new SparkContext(conf)

val ssc = new StreamingContext(con,Seconds(5))

// ssc.checkpoint("D:\\html")


val brokerList = "master:9092,slave1:9092,slave2:9092"

val kafkaParam: Map[String, String] = Map[String, String](

"metadata.broker.list"-> brokerList,

"auto.offset.reset" -> "smallest",

"enable.auto.commit" -> "false"

)


val fromOffset: Map[TopicAndPartition, Long] = MyUtils.getOffsets(Set("DLTE1","DLTE4","DLTE3","DLTE2","DMC","DLTE5"))

val messageHandler: MessageAndMetadata[String, String] => String = (x: MessageAndMetadata[String, String]) => x.message()

val value2: InputDStream[String] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String](ssc,kafkaParam,fromOffset,messageHandler)

value2.foreachRDD(rdd =>{

// RDD非空时,保存 offset 到外部存储的函数

if (! rdd.isEmpty()) {

val ranges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

MyUtils.saveOffsetsToRedis(ranges)

}

/* rdd.foreachPartition(p=>p.foreach(e=>{

var i=0

if (100%i==0) {

val string: String = e.toString

println(string)

}

i=i+1

}))*/

})

ssc.start()

ssc.awaitTermination()

}

}


维护offsets的工具类


import kafka.common.TopicAndPartition

import org.apache.spark.streaming.kafka.{KafkaCluster, OffsetRange}

import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}


object MyUtils {

private val config = new JedisPoolConfig

//private val redisHost = "192.168.252.99"

private val redisHost = "132.194.43.199"

private val redisPort = 6379

// 最大连接

config.setMaxTotal(30)

// 最大空闲

config.setMaxIdle(10)

private val pool = new JedisPool(config, redisHost, redisPort, 10000)


private val topicPrefix = "kafka:topic"


private def getKey(topic: String, groupId: String = "", prefix: String = topicPrefix): String = s"$prefix:$topic:$groupId"


private def getRedisConnection: Jedis = pool.getResource


// 从 redis 中获取offsets

private def getOffsetFromRedis(topics: Set[String], groupId: String = ""): Map[TopicAndPartition, Long] = {

val jedis = getRedisConnection

val offsets = for (topic <- topics) yield {

import scala.collection.JavaConversions._


jedis.hgetAll(getKey(topic, groupId)).toMap

.map { case (partition, offset) => TopicAndPartition(topic, partition.toInt) -> offset.toLong }

}

jedis.close()

offsets.flatten.toMap

}


// 将 offsets 保存到 redis

def saveOffsetsToRedis(range: Array[OffsetRange], groupId: String = ""): Unit = {

val jedis = getRedisConnection

val offsets = for (range <- range) yield {

(range.topic, range.partition -> range.untilOffset)

}

val offsetsMap: Map[String, Map[Int, Long]] = offsets.groupBy(_._1).map { case (topic, buffer) => (topic, buffer.map(_._2).toMap) }


for ((topic, partitionAndOffset) <- offsetsMap) {

val offsets = partitionAndOffset.map(elem => (elem._1.toString, elem._2.toString))

import scala.collection.JavaConversions._

jedis.hmset(getKey(topic, groupId), offsets)

}

}


// 给定topics,获取offset的最大、最小值

private def getMaxMinOffsets(topics: Set[String]): (Map[TopicAndPartition, Long], Map[TopicAndPartition, Long]) = {

// 给定topic参数,获取kafka连接

val kafkaParams = Map("metadata.broker.list" -> "132.194.94.195:6667,132.194.94.197:6667,132.194.94.198:6667,132.194.94.199:6667,132.194.94.200:6667,132.194.94.201:6667,132.194.94.202:6667,132.194.94.203:6667")

// val kafkaParams = Map("metadata.broker.list" ->"master:9092,slave1:9092,slave2:9092")

val kc = new KafkaCluster(kafkaParams)

// 获取partition的信息

val tps = kc.getPartitions(topics).right.get


// 获取 offset 的信息(最大值、最小值)

val maxOffsets: Map[TopicAndPartition, Long] = kc.getLatestLeaderOffsets(tps).right.get.mapValues(x => x.offset)

val minOffsets: Map[TopicAndPartition, Long] = kc.getEarliestLeaderOffsets(tps).right.get.mapValues(x => x.offset)

(maxOffsets, minOffsets)

}


// 根据kafka中最大、最小Offset,校验从redis获取的offset

def getOffsets(topics: Set[String]): Map[TopicAndPartition, Long] = {

// 从 kafka 中获取 offsets 的最大最小值

val tuple: (Map[TopicAndPartition, Long], Map[TopicAndPartition, Long]) = getMaxMinOffsets(topics)

val maxOffsets: Map[TopicAndPartition, Long] = tuple._1

val minOffsets: Map[TopicAndPartition, Long] = tuple._2

assert(maxOffsets.keys == minOffsets.keys)


// 从 redis 中获取 offsets

val offsets: Map[TopicAndPartition, Long] = getOffsetFromRedis(topics)


// 用 最大、最小值校验 redis 中的 offsets

maxOffsets.map { case (tp, maxOffset) =>

val minOffset = minOffsets(tp)

val curOffset = offsets.getOrElse(tp, 1L)

val newOffset = if (curOffset > maxOffset) maxOffset else if (curOffset < minOffset) minOffset else curOffset

(tp, newOffset)

}

}

}


这个会报这个错




Pom

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>


<groupId>Kafka08</groupId>

<artifactId>Kafka08</artifactId>

<version>1.0-SNAPSHOT</version>

<properties>

<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

<jdk.version>1.8</jdk.version>

<spark.version>2.3.0</spark.version>

<scala.version>2.11</scala.version>

<jedis.version>3.1.0</jedis.version>

</properties>


<dependencies>


<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-core_${scala.version}</artifactId>

<version>${spark.version}</version>

</dependency>




<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-streaming_2.11</artifactId>

<version>${spark.version}</version>

</dependency>


<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>

<version>${spark.version}</version>

</dependency>

<!-- <dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>

<version>${spark.version}</version>

</dependency>-->

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->

<!-- <dependency>

<groupId>org.apache.kafka</groupId>

<artifactId>kafka_2.10</artifactId>

<version>0.8.2.1</version>

</dependency>-->



<!--redis 缓存-->

<dependency>

<groupId>redis.clients</groupId>

<artifactId>jedis</artifactId>

<version>${jedis.version}</version>

</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->

<dependency>

<groupId>org.apache.zookeeper</groupId>

<artifactId>zookeeper</artifactId>

<version>3.4.5</version>

<type>pom</type>

</dependency>


</dependencies>





<build>

<sourceDirectory>src/main/scala</sourceDirectory>

<testSourceDirectory>src/test/</testSourceDirectory>

<plugins>

<plugin>

<groupId>net.alchim31.maven</groupId>

<artifactId>scala-maven-plugin</artifactId>

<version>3.2.2</version>

<executions>

<execution>

<goals>

<goal>compile</goal>

<goal>testCompile</goal>

</goals>

</execution>

</executions>

</plugin>

<plugin>

<groupId>org.apache.maven.plugins</groupId>

<artifactId>maven-assembly-plugin</artifactId>

<version>3.0.0</version>

<configuration>

<archive>

<manifest>

<mainClass>way</mainClass>

</manifest>

</archive>

<descriptorRefs>

<descriptorRef>jar-with-dependencies</descriptorRef>

</descriptorRefs>

</configuration>

<executions>

<execution>

<id>make-assembly</id>

<phase>package</phase>

<goals>

<goal>single</goal>

</goals>

</execution>

</executions>

</plugin>

</plugins>

</build>


</project>







spark的版本是:2.3.0

kafka的版本是:hdp的Kafka_2.10-0.9.0.2.4.0.0-169

需求就是想要消费到数据,并且可以手动维护offsets。



提示信息挺明显的啊。你根据提示信息百度看看

有没有什么可以手动维护offsets的代码,分享一波,我试试,存到redis中