spark清洗JSON数据

  • img

需求:用spark进行清洗,结果类似下图➕要清洗的代码,能加注释的加上注释。
数据链接:https://share.weiyun.com/BadLnWZX

img

使用pyspark实现参考如下,主要是利用spark-sql的功能来实现统计,一行sql非常简单。

# 读取json数据
yiqing_data=spark.read.json("file:///home/work/yiqing.json")

# 将读取数据转成一个临时表(便于直接使用sql统计)
yiqing_data.createTempView("tmp_yiqing")

# 使用spark-sql进行求和统计(这里只有一天就这样统计,如果数据里面有日期字段可以根据日期进行groupby求和统计)
yiqing_stat=spark.sql("select '2022-08-24' as dt, sum(confirmedCount) as confirmedCount_sum,sum(currentConfirmedCount) as currentConfirmedCount_sum,sum(suspectedCount) as suspectedCount_sum,sum(curedCount) as curedCount_sum,sum(deadCount) as deadCount_sum from tmp_yiqing")

# 打印出来统计结果
yiqing_stat.collect()

img

数据清洗的目的是为了保证数据质量,包括数据的完整性、唯一性、一致性、合法性和
权威性。数据清洗的结果是对各种脏数据进行对应的处理方式,从而得到标准的、干净的、
连续的数据,提供给数据统计和数据挖掘使用。
解决数据的完整性问题:
(1) 通过其他信息不全;
(2) 通过前后数据不全;
(3) 如果实在无法不全,虽然可惜,但是还是要剔除掉进行统计。但是没必要删除,
后续其他分析可能还需要。
解决数据的唯一性问题:
(1) 根据主键进行去除,去除重复数据;
(2) 制定一系列规则,保证根据某种规则下只保存一条数据。
解决数据权威性的问题:
选择最权威的数据作为统计和挖掘。
解决合法性的问题:
设定判定规则,通过特定的规则来判断字段或者值来确定数据是否需要被清洗。


object JsonLog {
  def main(args: Array[String]): Unit = {
      val spark: SparkSession = SparkSession.builder().master("local").appName("readJson").getOrCreate()
      val sc: SparkContext = spark.sparkContext
      import spark.implicits._
      import org.apache.spark.sql.functions._

     val readDF: DataFrame = sc.textFile("in/Text.txt").toDF()

      readDF.printSchema()


    val readDF1: DataFrame = readDF.select(
      get_json_object($"value", "$.name").as("name"),
      get_json_object($"value", "$.url").as("url"),
      get_json_object($"value", "$.address").as("address"),
      get_json_object($"value", "$.domain_list").as("domain_list"))


    val textDF: DataFrame = readDF1.select($"domain_list")

    val arrayType = ArrayType(StructType(StructField("name",StringType) ::StructField("url",StringType)::Nil))
    val readDF2: DataFrame = textDF.select(from_json($"domain_list",arrayType).as("domain_list"))
    val readDF3: DataFrame = readDF2.withColumn("domain_list", explode(col("domain_list")))
      .select($"domain_list.name", $"domain_list.url")

    val readDF5: DataFrame = readDF1.select($"name", $"url", $"city", $"country",
      from_json($"domain_list", ArrayType(StructType(StructField("name", StringType)
        :: StructField("url", StringType) :: Nil))).as("domain_list")
    ).withColumn("domain_list", explode(col("domain_list")))
      .select($"name", $"url", $"city", $"country", $"domain_list.name", $"domain_list.url")

  }
}

这是我之前写的一个