这是我模拟出来的部分数据,代表(用户,地方,天气类型,气温,湿度,气压,紫外线等级,风速,日期,时间)
这段代码定义了一个名为WEADataService的类,该类包含一个名为dataAnalysis的方法,该方法的输入参数为类型为DStream[Answer]的流数据。该方法内有三个注释为需求一的函数,分别是统计每个用户各城市天气类型的数量、统计每个城市天气类型top1的温度和统计每个用户各城市天气类型top1的最低得分。但是这段代码缺少实现这些函数的代码。
def get_passed_weather(self,province):
weather_passed_file = 'input/passed_weather_' + province + '.csv'
if os.path.exists(weather_passed_file):
return
passed_weather = list()
count = 0
if province == 'ALL':
print ("开始爬取过去的天气状况")
for city in self.get_cities():
data = self.parse_json('http://www.nmc.cn/f/rest/passed/'+city['code'])
if data:
count = count + 1
for item in data:
item['city_code'] = city['code']
item['province'] = city['province']
item['city_name'] = city['city']
item['city_index'] = str(count)
passed_weather.extend(data)
if count % 50 == 0:
if count == 50:
self.write_header(weather_passed_file,passed_weather)
else:
self.write_row(weather_passed_file,passed_weather)
passed_weather = list()
if passed_weather:
if count <= 50:
self.write_header(weather_passed_file,passed_weather)
else:
self.write_row(weather_passed_file,passed_weather)
print ("爬取过去的天气状况完毕!")
else:
print ("开始爬取过去的天气状况")
select_city = filter(lambda x:x['province']==province,self.get_cities())
for city in select_city:
data = self.parse_json('http://www.nmc.cn/f/rest/passed/'+city['code'])
if data:
count = count + 1
for item in data:
item['city_index'] = str(count)
item['city_code'] = city['code']
item['province'] = city['province']
item['city_name'] = city['city']
passed_weather.extend(data)
self.write_csv(weather_passed_file,passed_weather)
print ("爬取过去的天气状况完毕!")
def run(self,range = 'ALL'):
self.get_passed_weather(range)
import pandas as pd
data = pd.read_csv('weather.csv')
city_count = data['城市'].value_counts()
print(city_count)
from pyspark.sql.functions import count, col
# 读取数据并创建DataFrame
df = spark.read.format("csv").option("header", True).load("path/to/data.csv")
# 对用户、城市和天气类型分组,统计数量并显示结果
result = df.groupBy("用户", "地方", "天气类型").agg(count("*").alias("数量")).orderBy("用户", "地方", "天气类型")
result.show()
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window
# 读取数据并创建DataFrame
df = spark.read.format("csv").option("header", True).load("path/to/data.csv")
# 对地方和气温进行降序排列,选取前10条记录
w = Window.partitionBy("地方").orderBy(col("气温").desc())
result = df.select("*", row_number().over(w).alias("rank")).filter(col("rank") <= 10).orderBy("地方", col("气温").desc())
result.show()
数据文件的格式是txt吗,可以考虑使用python语言来与spark通信。python使用pandas可以比较方便对,天气类型,气温,湿度等这些数据做统计分析。