使用spark streaming进行实时数据统计并保存到redis。

问题遇到的现象和发生背景

使用kafka生产消息,利用spark streaming进行实时数据统计并保存到redis。

问题相关代码,请勿粘贴截图

package org.lanqiao.BigData.chapter16
import java.util.HashSet
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka010.ConsumerStrategies
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies
import com.alibaba.fastjson.JSON
import redis.clients.jedis.{HostAndPort, JedisCluster}

object SparkOrder extends Serializable {
  def main(args: Array[String]): Unit = {
    val sparkconf=new SparkConf().setAppName("sparkorder")
    //设置DStream批次时间间隔为5秒
    val ssc=new StreamingContext(sparkconf,Seconds(5))
    //设置Kafka的配置:zk集群链接、消费组
    val kafkaParams = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG ->"192.168.159.120:9092,192.168.159.122:9092,192.168.159.123:9092,192.168.159.124:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "mygroup",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])
    //设置消费主题
    val topics = Array("order")
    //获取kafka数据流
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))
    //获取每一行的数据
    val line=stream.map(_.value)
    //val line=stream.map(_._2)
    line.print()
    //{'id':'101','info':[{'name':'可乐','price':2.00,'num':3,'sum':6.00},{'name':'辣条','price':1.00,'num':5,'sum':5.00}]}
    //连接Redis集群
    val jedisClusterNodes = new HashSet[HostAndPort]
    jedisClusterNodes.add(new HostAndPort("192.168.159.120", 7001))
    jedisClusterNodes.add(new HostAndPort("192.168.159.122", 7003))
    jedisClusterNodes.add(new HostAndPort("192.168.159.123", 7005))
    jedisClusterNodes.add(new HostAndPort("192.168.159.124", 7007))
    val jedis = new JedisCluster(jedisClusterNodes)
    //解析数据
    val data = line.map { a =>{
      val json = JSON.parseObject(a)
      val info = json.getJSONArray("info")
      var sum = 0.0
      for (i<-0 until info.size()){
        sum += info.getJSONObject(i).getDouble("sum")
      }
      //累计求和并存入Redis
      var orderNum = jedis.incrByFloat("orderNum", 1)
      var orderSum = jedis.incrByFloat("orderSum", sum)
      //求和后,把累计的返回值打印
      print("累计订单数:"+orderNum)
      print("累计销售金额:"+orderSum)
    }}

    //开始计算

    ssc.start()
    //等待停止
    ssc.awaitTermination()
    //关闭Jedis
    jedis.close()
  }
}

运行结果及报错内容

img

img

我的解答思路和尝试过的方法
我想要达到的结果