为什么我已经改了数据类型还是报错啊,希望各位帮助解决一下
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
process()
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 400, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
return f(*args, **kwargs)
File "/usr/local/spark/python/pyspark/sql/session.py", line 730, in prepare
verify_func(obj)
File "/usr/local/spark/python/pyspark/sql/types.py", line 1391, in verify
verify_value(obj)
File "/usr/local/spark/python/pyspark/sql/types.py", line 1372, in verify_struct
verifier(v)
File "/usr/local/spark/python/pyspark/sql/types.py", line 1391, in verify
verify_value(obj)
File "/usr/local/spark/python/pyspark/sql/types.py", line 1317, in verify_integer
verify_acceptable_types(obj)
File "/usr/local/spark/python/pyspark/sql/types.py", line 1280, in verify_acceptable_types
% (dataType, obj, type(obj))))
TypeError: field id: IntegerType can not accept object '3' in type <class 'str'>
```python
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
#设置模式信息
schema = StructType([StructField("id",IntegerType(),True),StructField("name",StringType(),True),StructField("gender",StringType(),True),StructField("age",IntegerType(),True)])
#创建RDD
employeeRDD = spark.sparkContext.parallelize(["3 Mary F 26","4 Tom M 23"]).map(lambda x:x.split(" "))
#创建Row对象
rowRDD = employeeRDD.map(lambda p:Row(int(p[0].strip()),p[1].strip(),p[2].strip(),int(p[3].strip())))
#建立Row对象与模式之间的对应关系,即把数据与模式对应起来
employeeDF = spark.createDataFrame(employeeRDD,schema)
我试着回答一下:
这个错误的原因是因为 employeeRDD 是一个包含字符串的 RDD,当使用 createDataFrame() 函数创建 DataFrame 时,Spark 无法将字符串转换为模式中定义的整数类型,因此会抛出 TypeError 异常。
解决方法是使用 rowRDD 替换 employeeRDD,因为 rowRDD 中的数据已经按照模式定义进行了类型转换。
最后一行代码改为employeeDF = spark.createDataFrame(rowRDD,schema)
如果我的回答对你有帮助,还望采纳
源文件内容如下(包含id,name,age):
1,Ella,36 2,Bob,29 3,Jack,29 |
---|
请将数据复制保存到Linux系统中,命名为employee.txt,实现从RDD转换得到DataFrame,并按“id:1,name:Ella,age:36”的格式打印出DataFrame的所有数据。请写出程序代码。
#反射机制 – 针对数据项已知
import os
os.environ[“JAVA_HOME”]=“/usr/lib/jvm/jdk1.8.0_162”
os.environ[“PYSPARK_PYTHON”]=‘/usr/bin/python3.5’
# 导入Spark相关包
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
# 构建 spark 单元
spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()
# 构建表头
schemaString = “id name age”
fields = [StructField(field_name ,StringType(),True) for field_name in schemaString.split(" ")]
schema = StructType(fields)
# 加载数据
filename = “employee.txt”
people= spark.sparkContext.textFile(filename)
# print(people.collect())
# 数据预处理
people_data = people.map(lambda x : x.split(“,”))
# print(people_data.collect())
# 处理为 ROW 对象模式
people_rows = people_data.map(lambda attributes : Row(int(attributes[0]),attributes[1],int(attributes[2])))
# 构建 DataFrame
schemapeople = spark.createDataFrame(people_rows,schema)
# 构建临时表
schemapeople.createOrReplaceTempView(“employee”)
# SQL 查询
DF_people = spark.sql(“select * from employee”)
# DF – RDD
people_rdd = DF_people.rdd.map(lambda p : “id:” + p.id + “,” + “name:” + p.name + “,” + “Age:” + str(p.age))
# print(people_rdd.collect())
# print(people_rdd.collect())
for i in people_rdd.collect():
print(i)