pycharm 使用pyspark 调用map算子一直报错

pycharm 使用pyspark 调用map算子一直报错
from pyspark import SparkConf, SparkContext
import os

os.environ['PYSPARK_PYTHON'] = 'D:/myApps/python/python.exe'

conf = SparkConf().setMaster("local[*]").setAppName("test_app_name")
sc = SparkContext(conf=conf)
wyyList = {"name": "刘德华", 'age': 18}
rdd1 = sc.parallelize([1, 2, 3, 4])


def func(data):
    return data * 2


rdd2 = rdd1.map(func)
print("rdd1rdd1", rdd1.collect())  # 这行打印正常
print("rdd2rdd2", rdd2.collect())

sc.stop()

Traceback (most recent call last):

File "D:\myApps\python\Lib\site-packages\pyspark\serializers.py", line 458, in dumps
return cloudpickle.dumps(obj, pickle_protocol)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "D:\myApps\python\Lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py", line 73, in dumps
cp.dump(obj)
File "D:\myApps\python\Lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py", line 602, in dump
return Pickler.dump(self, obj)
^^^^^^^^^^^^^^^^^^^^^^^
File "D:\myApps\python\Lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py", line 692, in reducer_override
return self._function_reduce(obj)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "D:\myApps\python\Lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py", line 565, in _function_reduce
return self._dynamic_function_reduce(obj)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "D:\myApps\python\Lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py", line 546, in _dynamic_function_reduce
state = _function_getstate(func)
^^^^^^^^^^^^^^^^^^^^^^^^
File "D:\myApps\python\Lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py", line 157, in _function_getstate
f_globals_ref = _extract_code_globals(func.code)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "D:\myApps\python\Lib\site-packages\pyspark\cloudpickle\cloudpickle.py", line 334, in _extract_code_globals
out_names = {names[oparg]: None for _, oparg in _walk_global_ops(co)}
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "D:\myApps\python\Lib\site-packages\pyspark\cloudpickle\cloudpickle.py", line 334, in
out_names = {names[oparg]: None for _, oparg in _walk_global_ops(co)}
~~~~~^^^^^^^
IndexError: tuple index out of range
Traceback (most recent call last):
File "D:\myApps\python\Lib\site-packages\pyspark\serializers.py", line 458, in dumps
return cloudpickle.dumps(obj, pickle_protocol)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "D:\myApps\python\Lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py", line 73, in dumps
cp.dump(obj)
File "D:\myApps\python\Lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py", line 602, in dump
return Pickler.dump(self, obj)
^^^^^^^^^^^^^^^^^^^^^^^
File "D:\myApps\python\Lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py", line 692, in reducer_override
return self._function_reduce(obj)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "D:\myApps\python\Lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py", line 565, in _function_reduce
return self._dynamic_function_reduce(obj)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "D:\myApps\python\Lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py", line 546, in _dynamic_function_reduce
state = _function_getstate(func)
^^^^^^^^^^^^^^^^^^^^^^^^
File "D:\myApps\python\Lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py", line 157, in _function_getstate
f_globals_ref = _extract_code_globals(func.code)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "D:\myApps\python\Lib\site-packages\pyspark\cloudpickle\cloudpickle.py", line 334, in _extract_code_globals
out_names = {names[oparg]: None for _, oparg in _walk_global_ops(co)}
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "D:\myApps\python\Lib\site-packages\pyspark\cloudpickle\cloudpickle.py", line 334, in
out_names = {names[oparg]: None for _, oparg in _walk_global_ops(co)}
~~~~~^^^^^^^
IndexError: tuple index out of range

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "D:\wyyStudyDocument\python\python-learn\pyspark\pyspark_02.py", line 23, in
print("rdd2rdd2", rdd2.collect())
^^^^^^^^^^^^^^
File "D:\myApps\python\Lib\site-packages\pyspark\rdd.py", line 1194, in collect
sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
^^^^^^^^^^
File "D:\myApps\python\Lib\site-packages\pyspark\rdd.py", line 3500, in _jrdd
wrapped_func = _wrap_function(
^^^^^^^^^^^^^^^
File "D:\myApps\python\Lib\site-packages\pyspark\rdd.py", line 3359, in _wrap_function
pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "D:\myApps\python\Lib\site-packages\pyspark\rdd.py", line 3342, in _prepare_for_python_RDD
pickled_command = ser.dumps(command)
^^^^^^^^^^^^^^^^^^
File "D:\myApps\python\Lib\site-packages\pyspark\serializers.py", line 468, in dumps
raise pickle.PicklingError(msg)
_pickle.PicklingError: Could not serialize object: IndexError: tuple index out of range

看看是不是环境配置不正确导致的,请检查SparkContext是否正确配置,以及Python版本是否与Spark版本兼容。

这个错误是由于 cloudpickle 库在序列化函数时出错了。在 PySpark 中,RDD 算子(如 map)需要将函数序列化并传输到工作节点上运行。cloudpickle 库是用来序列化 Python 对象的库,如果它在序列化函数时出现问题,就会导致这个错误。

可以试试把函数写在外面,或者用pickle序列化,或者用内置函数代替自定义函数。
仅供参考,望采纳,谢谢。

raise pickle.PicklingError(msg)
_pickle.PicklingError: Could not serialize object: IndexError: tuple index out of range
引发pickle.PickingError(消息)
_pickle.PickingError:无法序列化对象:IndexError:元组索引超出范围
从这个报错来看,主要是2处错误:序列化失败和元组索引超出范围

这个错误是在使用 PySpark 的 map 算子时发生的,具体原因是因为 PySpark 是使用 pickle 序列化对象的,而 pickle 无法将闭包函数序列化。为了解决这个问题,可以将函数转化为可以 pickle 的对象,比如将函数转化为类的实例,或者将函数放在另一个可以 pickle 的对象里面。
例如,可以把函数改为类的方法,这样就可以 pickle 了

class MyFunc:
    def __init__(self):
        pass
    def func(self, data):
        return data * 2

my_func = MyFunc()
rdd2 = rdd1.map(my_func.func)


或者使用 functools.partial 函数,将函数变成可调用的对象

from functools import partial

rdd2 = rdd1.map(partial(func))


这两种方法都可以解决问题。