使用kafka生产消息,利用spark streaming进行实时数据统计并保存到redis。
package org.lanqiao.BigData.chapter16
import java.util.HashSet
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka010.ConsumerStrategies
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies
import com.alibaba.fastjson.JSON
import redis.clients.jedis.{HostAndPort, JedisCluster}
object SparkOrder extends Serializable {
def main(args: Array[String]): Unit = {
val sparkconf=new SparkConf().setAppName("sparkorder")
//设置DStream批次时间间隔为5秒
val ssc=new StreamingContext(sparkconf,Seconds(5))
//设置Kafka的配置:zk集群链接、消费组
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG ->"192.168.159.120:9092,192.168.159.122:9092,192.168.159.123:9092,192.168.159.124:9092",
ConsumerConfig.GROUP_ID_CONFIG -> "mygroup",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])
//设置消费主题
val topics = Array("order")
//获取kafka数据流
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))
//获取每一行的数据
val line=stream.map(_.value)
//val line=stream.map(_._2)
line.print()
//{'id':'101','info':[{'name':'可乐','price':2.00,'num':3,'sum':6.00},{'name':'辣条','price':1.00,'num':5,'sum':5.00}]}
//连接Redis集群
val jedisClusterNodes = new HashSet[HostAndPort]
jedisClusterNodes.add(new HostAndPort("192.168.159.120", 7001))
jedisClusterNodes.add(new HostAndPort("192.168.159.122", 7003))
jedisClusterNodes.add(new HostAndPort("192.168.159.123", 7005))
jedisClusterNodes.add(new HostAndPort("192.168.159.124", 7007))
val jedis = new JedisCluster(jedisClusterNodes)
//解析数据
val data = line.map { a =>{
val json = JSON.parseObject(a)
val info = json.getJSONArray("info")
var sum = 0.0
for (i<-0 until info.size()){
sum += info.getJSONObject(i).getDouble("sum")
}
//累计求和并存入Redis
var orderNum = jedis.incrByFloat("orderNum", 1)
var orderSum = jedis.incrByFloat("orderSum", sum)
//求和后,把累计的返回值打印
print("累计订单数:"+orderNum)
print("累计销售金额:"+orderSum)
}}
//开始计算
ssc.start()
//等待停止
ssc.awaitTermination()
//关闭Jedis
jedis.close()
}
}