需求:用spark进行清洗,结果类似下图➕要清洗的代码,能加注释的加上注释。
数据链接:https://share.weiyun.com/BadLnWZX
使用pyspark实现参考如下,主要是利用spark-sql的功能来实现统计,一行sql非常简单。
# 读取json数据
yiqing_data=spark.read.json("file:///home/work/yiqing.json")
# 将读取数据转成一个临时表(便于直接使用sql统计)
yiqing_data.createTempView("tmp_yiqing")
# 使用spark-sql进行求和统计(这里只有一天就这样统计,如果数据里面有日期字段可以根据日期进行groupby求和统计)
yiqing_stat=spark.sql("select '2022-08-24' as dt, sum(confirmedCount) as confirmedCount_sum,sum(currentConfirmedCount) as currentConfirmedCount_sum,sum(suspectedCount) as suspectedCount_sum,sum(curedCount) as curedCount_sum,sum(deadCount) as deadCount_sum from tmp_yiqing")
# 打印出来统计结果
yiqing_stat.collect()
数据清洗的目的是为了保证数据质量,包括数据的完整性、唯一性、一致性、合法性和
权威性。数据清洗的结果是对各种脏数据进行对应的处理方式,从而得到标准的、干净的、
连续的数据,提供给数据统计和数据挖掘使用。
解决数据的完整性问题:
(1) 通过其他信息不全;
(2) 通过前后数据不全;
(3) 如果实在无法不全,虽然可惜,但是还是要剔除掉进行统计。但是没必要删除,
后续其他分析可能还需要。
解决数据的唯一性问题:
(1) 根据主键进行去除,去除重复数据;
(2) 制定一系列规则,保证根据某种规则下只保存一条数据。
解决数据权威性的问题:
选择最权威的数据作为统计和挖掘。
解决合法性的问题:
设定判定规则,通过特定的规则来判断字段或者值来确定数据是否需要被清洗。
object JsonLog {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().master("local").appName("readJson").getOrCreate()
val sc: SparkContext = spark.sparkContext
import spark.implicits._
import org.apache.spark.sql.functions._
val readDF: DataFrame = sc.textFile("in/Text.txt").toDF()
readDF.printSchema()
val readDF1: DataFrame = readDF.select(
get_json_object($"value", "$.name").as("name"),
get_json_object($"value", "$.url").as("url"),
get_json_object($"value", "$.address").as("address"),
get_json_object($"value", "$.domain_list").as("domain_list"))
val textDF: DataFrame = readDF1.select($"domain_list")
val arrayType = ArrayType(StructType(StructField("name",StringType) ::StructField("url",StringType)::Nil))
val readDF2: DataFrame = textDF.select(from_json($"domain_list",arrayType).as("domain_list"))
val readDF3: DataFrame = readDF2.withColumn("domain_list", explode(col("domain_list")))
.select($"domain_list.name", $"domain_list.url")
val readDF5: DataFrame = readDF1.select($"name", $"url", $"city", $"country",
from_json($"domain_list", ArrayType(StructType(StructField("name", StringType)
:: StructField("url", StringType) :: Nil))).as("domain_list")
).withColumn("domain_list", explode(col("domain_list")))
.select($"name", $"url", $"city", $"country", $"domain_list.name", $"domain_list.url")
}
}
这是我之前写的一个