在PySpark下使用Pandas和Apache Arrow

在PySpark下使用Pandas和Apache Arrow

在PySpark下的Apache Arrow

Apache Arrow是一种内存中的列式数据格式,在Spark中用于在JVM和Python进程之间高效地传输数据。目前,这对于使用panda/NumPy数据的Python用户最为有利。它的使用不是自动的,可能需要对配置或代码进行一些细微的更改,以充分利用和确保兼容性。本指南将对如何在Spark中使用Apache Arrow进行高级描述,并突出显示在处理启用箭头的数据时的任何差异。

确保PyArrow安装

如果使用pip安装PySpark,那么可以使用命令pip install PySpark将PyArrow作为SQL模块的额外依赖项引入。否则,必须确保在所有集群节点上安装并可用PyArrow。当前支持的版本是0.8.0。您可以从conda-forge通道使用pip或conda进行安装。有关详细信息,请参见PyArrow安装

支持与Pandas的相互转换

在使用toPandas()调用将Spark DataFrame转换为panda DataFrame时,以及在使用createDataFrame(pandas_df)从panda DataFrame创建Spark DataFrame时,可以使用Arrow进行优化。要在执行这些调用时使用Arrow,用户需要首先设置Spark配置的Spark.sql.execute.Arrow.enabledtrue“。这是默认禁用的。

此外,“spark.sql.execute.arrow.enabled”还支持优化。如果在Spark中实际计算之前发生错误,可以自动退回到非箭头优化实现。这可以通过“spark.sql.execute.arrow.fallback.enabled”来控制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import numpy as np
import pandas as pd

# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

# Generate a Pandas DataFrame
pdf = pd.DataFrame(np.random.rand(100, 3))

# Create a Spark DataFrame from a Pandas DataFrame using Arrow
df = spark.createDataFrame(pdf)

# Convert the Spark DataFrame back to a Pandas DataFrame using Arrow
result_pdf = df.select("*").toPandas()

使用上述优化将产生与未启用Arrow时相同的结果。注意,即使使用Arrow,toPandas()也会将DataFrame中的所有记录收集到驱动程序中,并且应该对数据的一小部分进行收集。当前不支持所有Spark数据类型,如果列为不受支持的类型,可能会引发错误,请参见受支持的SQL类型。如果在createDataFrame()期间发生错误,Spark将返回来创建没有Arrow的DataFrame。

Pandas UDFs(向量UDFs)

Pandas UDFs是用户定义的函数,通过Spark执行,用Arrow来传输数据,而Pandas用于处理数据。Pandas UDFs是使用关键字pandas_udf作为装饰器或包装函数来定义的,不需要额外的配置。目前,有两种类型的Pandas UDFs:Scalar and Grouped Map。

Scalar

Scalar Pandas UDFs用于向量化标量操作。它们可以与selectwithColumn等函数一起使用。Python函数应该使用pandas.Serise作为输入并返回一个pandas.Series长度相同的级数。在内部,Spark将执行Pandas UDFs方法将列分成批,并将每个批的函数作为数据的子集调用,然后将结果连接在一起。

下面的示例展示如何创建一个标量Pandas UDF,该标量计算两列的乘积

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import pandas as pd

from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# Declare the function and create the UDF
def multiply_func(a, b):
return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())

# The function for a pandas_udf should be able to execute with local Pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0 1
# 1 4
# 2 9
# dtype: int64

# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# | 1|
# | 4|
# | 9|
# +-------------------+

Grouped Map

分组映射Pandas UDFs与groupBy().apply()一起使用实现了“split-apply-combine”模式。拆分-应用-合并包括三个步骤:

  • 使用DataFrame.groupBy将数据分成组。
  • 对每个组应用一个函数。函数的输入和输出都是pandas.DataFrame。输入数据包含每个组的所有行和列。
  • 将结果合并到一个新的DataFrame中。

要使用groupBy().apply(),用户需要定义以下内容:

  • 定义每个组的计算的Python函数。
  • 一个StructType对象或字符串,它定义输出DataFrame的模式。

为了返回pandas.DataFrame的列标签:如果指定为字符串,DataFrame必须匹配已定义输出模式中的字段名,如果不是字符串,则必须按位置匹配字段数据类型,例如整数索引。.关于如何在构造pandas.DataFrame时对列进行标记,请看pandas.DataFrame

请注意,在应用该函数之前,组的所有数据都将加载到内存中。这可能导致内存不足异常,特别是在组大小不正常的情况下。maxRecordsPerBatch的配置并不应用于组,而是由用户来确保分组的数据适合于可用内存。

下面的例子展示了如何使用groupby().apply()从组中的每个值中减去平均值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from pyspark.sql.functions import pandas_udf, PandasUDFType

df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))

@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())

df.groupby("id").apply(subtract_mean).show()
# +---+----+
# | id| v|
# +---+----+
# | 1|-0.5|
# | 1| 0.5|
# | 2|-3.0|
# | 2|-1.0|
# | 2| 4.0|
# +---+----+

有关详细用法,请参见pyspark.sql.functions.pandas_udfpyspark.sql.GroupedData.apply

分组聚合

分组聚合Pandas UDFs类似于Spark聚合函数。分组的聚合Pandas UDFs与groupBy().agg()pyspark.sql.window一起使用。它定义了来自一个或多个pandas.Series级数到标量值,其中每个pandas.Series表示组或窗口中的一列。

注意,这种类型的UDF不支持部分聚合,组或窗口的所有数据都将加载到内存中。此外,目前只支持分组聚合Pandas UFDs的无界窗口。

下面的例子展示了如何使用这种类型的UDF来计算groupBy和窗口操作的平均值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql import Window

df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))

@pandas_udf("double", PandasUDFType.GROUPED_AGG)
def mean_udf(v):
return v.mean()

df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# | 1| 1.5|
# | 2| 6.0|
# +---+-----------+

w = Window \
.partitionBy('id') \
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id| v|mean_v|
# +---+----+------+
# | 1| 1.0| 1.5|
# | 1| 2.0| 1.5|
# | 2| 3.0| 6.0|
# | 2| 5.0| 6.0|
# | 2|10.0| 6.0|
# +---+----+------+

有关详细用法,请参见pyspark.sql.functions.pandas_udf

使用笔记

支持的SQL Types

目前,所有Spark SQL数据类型都支持基于行的转换,仅当安装的PyArrow等于或高于0.10.0时,才支持MapTypeArrayTypeTimestampTypeStructType.BinaryType

设置Arrow Batch大小

Spark中的数据分区被转换为Arrow记录批处理,这可能会暂时导致JVM中的高内存使用。为了避免可能的内存不足异常,可以通过设置conf下的“spark.sql.execute.arrow..maxRecordsPerBatch”来调整Arrow记录批的大小。maxRecordsPerBatch”到一个整数,该整数将确定每个批处理的最大行数。默认值是每批10,000条记录。如果列数很大,则应相应地调整该值。使用此限制,每个数据分区将被分成1个或多个记录批处理。

带有时区语义的时间戳

Spark内部将时间戳存储为UTC值,没有指定时区输入的时间戳数据被转换为具有微秒分辨率的本地时间到UTC。在Spark中导出或显示时间戳数据时,会话时区用于本地化时间戳值。会话时区是用配置的spark.sql.session.timeZone设置的。如果没有设置,则默认为JVM系统本地时区。Pandas使用datetime64类型,分辨率为纳秒,datetime64[ns],每个列具有可选的时区。

当时间戳数据从Spark传输到panda时,它将被转换为纳秒,每一列将被转换为Spark会话时区,然后本地化到该时区,这将删除该时区并显示值作为本地时间。当使用时间戳列调用toPandas()pandas_udf时,将发生这种情况。

当时间戳数据从Pandas传输到Spark时,它将被转换为UTC微秒。当使用panda DataFrame调用createDataFrame或从pandas_udf返回时间戳时,会发生这种情况。这些转换是自动完成的,以确保Spark将具有预期格式的数据,因此没有必要自己进行任何这些转换。任何纳秒值都将被截断。

请注意,标准UDF(非Pandas)将以Python datetime对象的形式加载时间戳数据,这与Pandas时间戳不同。建议在使用pandas_udf中的时间戳时使用panda时间序列功能,以获得最佳性能,详细信息请参见这里