spark!!写出4个实时分析需求。求!结算快!

这是我模拟出来的部分数据,代表(用户,地方,天气类型,气温,湿度,气压,紫外线等级,风速,日期,时间)

img


然后按照下面方法帮我改一下我的需求,要四个,我的太low了,其他的准备工作已经搞好。

img


还需要什么时候数据可以说,本人一直在。

这段代码定义了一个名为WEADataService的类,该类包含一个名为dataAnalysis的方法,该方法的输入参数为类型为DStream[Answer]的流数据。该方法内有三个注释为需求一的函数,分别是统计每个用户各城市天气类型的数量、统计每个城市天气类型top1的温度和统计每个用户各城市天气类型top1的最低得分。但是这段代码缺少实现这些函数的代码。


def get_passed_weather(self,province):
        weather_passed_file = 'input/passed_weather_' + province + '.csv'
        if os.path.exists(weather_passed_file):
            return
        passed_weather = list()
        count = 0
        if province == 'ALL':
            print ("开始爬取过去的天气状况")
            for city in self.get_cities():
                data = self.parse_json('http://www.nmc.cn/f/rest/passed/'+city['code'])
                if data:
                    count = count + 1
                    for item in data:
                        item['city_code'] = city['code']
                        item['province'] = city['province']
                        item['city_name'] = city['city']
                        item['city_index'] = str(count)
                    passed_weather.extend(data)
                if count % 50 == 0:
                    if count == 50:
                        self.write_header(weather_passed_file,passed_weather)
                    else:
                        self.write_row(weather_passed_file,passed_weather)
                    passed_weather = list()
            if passed_weather:
                if count <= 50:
                    self.write_header(weather_passed_file,passed_weather)
                else:
                    self.write_row(weather_passed_file,passed_weather)
            print ("爬取过去的天气状况完毕!")
        else:
            print ("开始爬取过去的天气状况")
            select_city = filter(lambda x:x['province']==province,self.get_cities())
            for city in select_city:
                data = self.parse_json('http://www.nmc.cn/f/rest/passed/'+city['code'])
                if data:
                    count = count + 1
                    for item in data:
                        item['city_index'] = str(count)
                        item['city_code'] = city['code']
                        item['province'] = city['province']
                        item['city_name'] = city['city']
                    passed_weather.extend(data)
            self.write_csv(weather_passed_file,passed_weather)
            print ("爬取过去的天气状况完毕!")
 
    def run(self,range = 'ALL'):
        self.get_passed_weather(range)

import pandas as pd

读取数据

data = pd.read_csv('weather.csv')

统计每个城市的天气数量

city_count = data['城市'].value_counts()

输出结果

print(city_count)

  1. 每个用户各城市天气类型的数量
from pyspark.sql.functions import count, col

# 读取数据并创建DataFrame
df = spark.read.format("csv").option("header", True).load("path/to/data.csv")

# 对用户、城市和天气类型分组,统计数量并显示结果
result = df.groupBy("用户", "地方", "天气类型").agg(count("*").alias("数量")).orderBy("用户", "地方", "天气类型")
result.show()
  1. 统计每个地方前10的温度
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

# 读取数据并创建DataFrame
df = spark.read.format("csv").option("header", True).load("path/to/data.csv")

# 对地方和气温进行降序排列,选取前10条记录
w = Window.partitionBy("地方").orderBy(col("气温").desc())
result = df.select("*", row_number().over(w).alias("rank")).filter(col("rank") <= 10).orderBy("地方", col("气温").desc())
result.show()

数据文件的格式是txt吗,可以考虑使用python语言来与spark通信。python使用pandas可以比较方便对,天气类型,气温,湿度等这些数据做统计分析。