这是没有维护offsets前的一个消费代码
import com.unicom.tools.MyUtils
import kafka.serializer.StringDecoder
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{InputDStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
/**
* 好像是消费成功了
*/
object KafkaA {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getName)
val con = new SparkContext(conf)
val ssc = new StreamingContext(con,Seconds(20))
ssc.sparkContext.setLogLevel("WARN")
val brokerList =""
val kafkaParam: Map[String, String] = Map[String, String](
"metadata.broker.list"-> brokerList,
"group.id" -> "EventConsumer",
"auto.offset.reset" -> "smallest"
)
val value1: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,kafkaParam,Set[String]("DLTE3"))
value1.foreachRDD(rdd=>rdd.foreachPartition(p=>p.foreach(
s=>{
val value: String = s._2
println(value)
}
)))
ssc.start()
ssc.awaitTermination()
}
}
这个代码会报一个错
维护offsets的代码
import java.io
import com.unicom.tools.MyUtils
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object KafkaMaintainOffset {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getName)
val con = new SparkContext(conf)
val ssc = new StreamingContext(con,Seconds(5))
// ssc.checkpoint("D:\\html")
val brokerList = "master:9092,slave1:9092,slave2:9092"
val kafkaParam: Map[String, String] = Map[String, String](
"metadata.broker.list"-> brokerList,
"auto.offset.reset" -> "smallest",
"enable.auto.commit" -> "false"
)
val fromOffset: Map[TopicAndPartition, Long] = MyUtils.getOffsets(Set("DLTE1","DLTE4","DLTE3","DLTE2","DMC","DLTE5"))
val messageHandler: MessageAndMetadata[String, String] => String = (x: MessageAndMetadata[String, String]) => x.message()
val value2: InputDStream[String] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String](ssc,kafkaParam,fromOffset,messageHandler)
value2.foreachRDD(rdd =>{
// RDD非空时,保存 offset 到外部存储的函数
if (! rdd.isEmpty()) {
val ranges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
MyUtils.saveOffsetsToRedis(ranges)
}
/* rdd.foreachPartition(p=>p.foreach(e=>{
var i=0
if (100%i==0) {
val string: String = e.toString
println(string)
}
i=i+1
}))*/
})
ssc.start()
ssc.awaitTermination()
}
}
维护offsets的工具类
import kafka.common.TopicAndPartition
import org.apache.spark.streaming.kafka.{KafkaCluster, OffsetRange}
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}
object MyUtils {
private val config = new JedisPoolConfig
//private val redisHost = "192.168.252.99"
private val redisHost = "132.194.43.199"
private val redisPort = 6379
// 最大连接
config.setMaxTotal(30)
// 最大空闲
config.setMaxIdle(10)
private val pool = new JedisPool(config, redisHost, redisPort, 10000)
private val topicPrefix = "kafka:topic"
private def getKey(topic: String, groupId: String = "", prefix: String = topicPrefix): String = s"$prefix:$topic:$groupId"
private def getRedisConnection: Jedis = pool.getResource
// 从 redis 中获取offsets
private def getOffsetFromRedis(topics: Set[String], groupId: String = ""): Map[TopicAndPartition, Long] = {
val jedis = getRedisConnection
val offsets = for (topic <- topics) yield {
import scala.collection.JavaConversions._
jedis.hgetAll(getKey(topic, groupId)).toMap
.map { case (partition, offset) => TopicAndPartition(topic, partition.toInt) -> offset.toLong }
}
jedis.close()
offsets.flatten.toMap
}
// 将 offsets 保存到 redis
def saveOffsetsToRedis(range: Array[OffsetRange], groupId: String = ""): Unit = {
val jedis = getRedisConnection
val offsets = for (range <- range) yield {
(range.topic, range.partition -> range.untilOffset)
}
val offsetsMap: Map[String, Map[Int, Long]] = offsets.groupBy(_._1).map { case (topic, buffer) => (topic, buffer.map(_._2).toMap) }
for ((topic, partitionAndOffset) <- offsetsMap) {
val offsets = partitionAndOffset.map(elem => (elem._1.toString, elem._2.toString))
import scala.collection.JavaConversions._
jedis.hmset(getKey(topic, groupId), offsets)
}
}
// 给定topics,获取offset的最大、最小值
private def getMaxMinOffsets(topics: Set[String]): (Map[TopicAndPartition, Long], Map[TopicAndPartition, Long]) = {
// 给定topic参数,获取kafka连接
val kafkaParams = Map("metadata.broker.list" -> "132.194.94.195:6667,132.194.94.197:6667,132.194.94.198:6667,132.194.94.199:6667,132.194.94.200:6667,132.194.94.201:6667,132.194.94.202:6667,132.194.94.203:6667")
// val kafkaParams = Map("metadata.broker.list" ->"master:9092,slave1:9092,slave2:9092")
val kc = new KafkaCluster(kafkaParams)
// 获取partition的信息
val tps = kc.getPartitions(topics).right.get
// 获取 offset 的信息(最大值、最小值)
val maxOffsets: Map[TopicAndPartition, Long] = kc.getLatestLeaderOffsets(tps).right.get.mapValues(x => x.offset)
val minOffsets: Map[TopicAndPartition, Long] = kc.getEarliestLeaderOffsets(tps).right.get.mapValues(x => x.offset)
(maxOffsets, minOffsets)
}
// 根据kafka中最大、最小Offset,校验从redis获取的offset
def getOffsets(topics: Set[String]): Map[TopicAndPartition, Long] = {
// 从 kafka 中获取 offsets 的最大最小值
val tuple: (Map[TopicAndPartition, Long], Map[TopicAndPartition, Long]) = getMaxMinOffsets(topics)
val maxOffsets: Map[TopicAndPartition, Long] = tuple._1
val minOffsets: Map[TopicAndPartition, Long] = tuple._2
assert(maxOffsets.keys == minOffsets.keys)
// 从 redis 中获取 offsets
val offsets: Map[TopicAndPartition, Long] = getOffsetFromRedis(topics)
// 用 最大、最小值校验 redis 中的 offsets
maxOffsets.map { case (tp, maxOffset) =>
val minOffset = minOffsets(tp)
val curOffset = offsets.getOrElse(tp, 1L)
val newOffset = if (curOffset > maxOffset) maxOffset else if (curOffset < minOffset) minOffset else curOffset
(tp, newOffset)
}
}
}
这个会报这个错
Pom
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>Kafka08</groupId>
<artifactId>Kafka08</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<jdk.version>1.8</jdk.version>
<spark.version>2.3.0</spark.version>
<scala.version>2.11</scala.version>
<jedis.version>3.1.0</jedis.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>-->
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<!-- <dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.1</version>
</dependency>-->
<!--redis 缓存-->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>${jedis.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.5</version>
<type>pom</type>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/</testSourceDirectory>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<archive>
<manifest>
<mainClass>way</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
spark的版本是:2.3.0
kafka的版本是:hdp的Kafka_2.10-0.9.0.2.4.0.0-169
需求就是想要消费到数据,并且可以手动维护offsets。
提示信息挺明显的啊。你根据提示信息百度看看
有没有什么可以手动维护offsets的代码,分享一波,我试试,存到redis中