错误的缓存与pyspark简单RDD而没有缓存的代码工作正常(如何使笔记本一类picklable)

问题描述:

,我有以下简单的代码导致错误关于缓存:错误的缓存与pyspark简单RDD而没有缓存的代码工作正常(如何使笔记本一类picklable)

trips_in = sc.textFile("trip_data.csv") 
trips = trips_in.map(lambda l: l.split(",")).map(lambda x: parseTrip(x)).cache() 

trips.count() 

功能parseTrip()得到一个字符串列表和创建并返回一个类旅行:

class Trip: 
    def __init__(self, id, duration): 
    self.id = id 
    self.duration = duration 

我得到了错误的操作count()之后。但是,如果我在第二行末尾删除cache(),则一切正常。 根据错误的问题是,该类旅行不能腌制:

PicklingError: Can't pickle __main__.Trip: attribute lookup __main__.Trip failed 

所以,我怎样才能使它picklable(如果它是一个实际的话)? 请注意,我正在使用Databricks笔记本,因此无法为类定义制作单独的.py以使其可用。

环境不会影响答案 - 如果要使用自定义类,它必须可以在群集中的每个节点上导入。

  • 对于一个单独的模块,你可以很容易地使用SparkContext.addPyFile与URL到GitHub Gist(或其他支持的格式为: “file in HDFS (or other Hadoop-supported filesystems), or an HTTP, HTTPS or FTP URI”)

    • 创建一个要点。
    • 点击原始链接和复制网址。
    • 在你的笔记本电话:

      sc.addPyFile(raw_gist_url) 
      
  • 对于复杂的依赖关系,你分发鸡蛋文件。

    • 创建Python packageusing setuptools

      目录结构:

      . 
      ├── setup.py 
      └── trip 
          └── __init__.py 
      

      示例设置文件:

      #!/usr/bin/env python 
      
      from setuptools import setup 
      
      setup(name='trip', 
           version='0.0.1', 
           description='Trip', 
           author='Jane Doe', 
           author_email='[email protected]', 
           url='https://example.com', 
           packages=['trip'],) 
      
    • 创建egg文件:

      python setup.py bdist_egg 
      

      这将创建dist目录与文件trip-0.0.1-pyX.Y.egg

    • 转到Databricks仪表盘 - >新建 - > Libary和dist目录上传鸡蛋文件:

      enter image description here

    • 连接库,你要使用的集群。

  • 最后,如果你想要的是一个记录类型,你可以使用namedtuple没有任何额外的步骤:

    from collections import namedtuple 
    
    Trip = namedtuple('Trip', ['id', 'duration'])