pyspark sc.textFile 转为的RDD 再生成 dataframe 问题

首先 我用的是spark 1.5.1 所以 网上的 spark.read.csv 等方法对我来说并不适用
这是我的配置

图片说明

然后 我不能解决的问题是 在pyspark 中 由 sc.textFile转换的RDD 再生成
dataframe

接下来是我的代码:
(数据有54列)

df=sc.textFile("file:///home/hu/births_train.csv") # 删除了第一行

import pyspark.sql.types as typ

lable=[("INFANT_ALIVE_AT_REPORT",typ.StringType()),("BIRTH_YEAR",typ.IntegerType()),\
("BIRTH_MONTH",typ.IntegerType()),( "BIRTH_PLACE",typ.StringType()),\
("MOTHER_AGE_YEARS",typ.IntegerType()),( "MOTHER_RACE_6CODE",typ.StringType()),\
("MOTHER_EDUCATION",typ.StringType()),( "FATHER_COMBINED_AGE",typ.IntegerType()),\
("FATHER_EDUCATION",typ.StringType()),( "MONTH_PRECARE_RECODE",typ.StringType()),\
("CIG_BEFORE",typ.StringType()),( "CIG_1_TRI",typ.StringType()),\
("CIG_2_TRI",typ.StringType()),( "CIG_3_TRI",typ.StringType()),\
("MOTHER_HEIGHT_IN",typ.StringType()),( "MOTHER_BMI_RECODE",typ.StringType()),\
("MOTHER_PRE_WEIGHT",typ.IntegerType()),( "MOTHER_DELIVERY_WEIGHT",typ.IntegerType()),\
("MOTHER_WEIGHT_GAIN",typ.IntegerType()),( "DIABETES_PRE",typ.StringType()),\
("DIABETES_GEST",typ.StringType()),( "HYP_TENS_PRE",typ.StringType()),\
("HYP_TENS_GEST",typ.StringType()),( "PREV_BIRTH_PRETERM",typ.StringType()),\
("NO_RISK",typ.StringType()),( "NO_INFECTIONS_REPORTED",typ.StringType()),\
("LABOR_IND",typ.StringType()),( "LABOR_AUGM",typ.StringType()),\
( "STEROIDS",typ.StringType()),( "ANTIBIOTICS",typ.StringType()),\
( "ANESTHESIA",typ.StringType()),( "DELIV_METHOD_RECODE_COMB",typ.StringType()),\
( "ATTENDANT_BIRTH",typ.StringType()),( "APGAR_5",typ.StringType()),\
( "APGAR_5_RECODE",typ.StringType()),( "APGAR_10",typ.StringType()),\
( "APGAR_10_RECODE",typ.StringType()),( "INFANT_SEX",typ.StringType()),\
( "OBSTETRIC_GESTATION_WEEKS",typ.StringType()),( "INFANT_WEIGHT_GRAMS",typ.StringType()),\
( "INFANT_ASSIST_VENTI",typ.StringType()),( "INFANT_ASSIST_VENTI_6HRS",typ.StringType()),\
( "INFANT_NICU_ADMISSION",typ.StringType()),( "INFANT_SURFACANT",typ.StringType()),\
( "INFANT_ANTIBIOTICS",typ.StringType()),( "INFANT_SEIZURES",typ.StringType()),\
( "INFANT_NO_ABNORMALITIES",typ.StringType()),( "INFANT_ANCEPHALY",typ.StringType()),\
( "INFANT_MENINGOMYELOCELE",typ.StringType()),( "INFANT_LIMB_REDUCTION",typ.StringType()),\
( "INFANT_DOWN_SYNDROME",typ.StringType()),( "INFANT_SUSPECTED_CHROMOSOMAL_DISORDER",typ.StringType()),\
( "INFANT_NO_CONGENITAL_ANOMALIES_CHECKED",typ.StringType()),( "INFANT_BREASTFED",typ.StringType())]

schema=typ.StructType([typ.StructField(e[0],e[1],False ) for e in lable])

from pyspark.sql import Row

df1= df.map(lambda p : Row(p(0), p(1), p(2), p(3), p(4), p(5), p(6), p(7), p(8), p(9), p(10), p(11), p(12), p(13), p(14), p(15), p(16), p(17), p(18), p(19), p(20), p(21), p(22), p(23), p(24), p(25), p(26), p(27), p(28), p(29), p(30), p(31),p(32), p(33), p(34), p(35), p(36), p(37), p(38), p(39), p(40), p(41),p(42), p(43), p(44), p(45), p(46), p(47), p(48), p(49), p(50),
p(51),p(52), p(53)))

births=sqlContext.createDataFrame(df1,schema)

但是报错了 ,报错的图片如下
图片说明

所以这该怎么解决呢 ??

谢谢

https://blog.csdn.net/coding01/article/details/81512430