Python处理各种Spark数据源
Spark SQL通过DataFrame接口支持对各种数据源进行操作。DataFrame可以使用关系型数据库转换操作,也可以用来创建临时视图。将一个DataFrame注册为临时视图允许您对其数据运行SQL查询。现在本人介绍使用Spark加载和保存数据的基本使用方法,然后介绍内置数据源的读取和保存。
数据的加载和保存
- 普通方式加载和保存spark默认数据类型parquet
1 | df = spark.read.load("examples/src/main/resources/users.parquet") |
- 在加载和保存时,通过参数指定数据源类型
我们可以在保存和读取时指定要使用的数据源以及希望传递给数据源的其他参数。数据源由它们的全限定名,也可以使用它们的名称,比如json、parquet、jdbc、orc、libsvm、csv、text等后面将一一介绍。从任何数据源类型的数据都可以使用这种语法转换为其他类型。
1 | df = spark.read.load("examples/src/main/resources/people.json", format="json") |
- 在读取文件的的时候直接运行SQL
1 | df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`") |
- 保存数据的模式
模式类型 | 说明 |
---|---|
error(default) | 如果保存数据的时候,数据已经存在,将会抛出错误说明 |
append | 如果保存数据的时候,数据已经存在,将会把数据追加保存 |
overwrite | 如果保存数据的时候,数据已经存在,将会把数据重新保存 |
ignore | 如果保存数据的时候,数据已经存在,将不会对已有数据进行操作,就像SQL语句CREATE TABLE IF NOT EXISTS |
- 保存到持久表
我们还可以使用saveAsTable命令将DataFrames永久保存到Hive metastore中。大家一想到要保存的Hive中,就要部署一套Hive,其实使用此特性不需要现有的Hive部署。Spark将使用Derby本地自动创建一个Hive metastore。与createOrReplaceTempView
命令不同的是saveAsTable
将实现DataFrame的内容,并创建指向Hive metastore中的数据的指针。即使在Spark程序重新启动之后,持久表仍然存在。可以通过在带有表名的SparkSession
上调用表方法来创建持久表的DataFrame。
对于基于文件的数据源,例如文本、parquet、json等,可以通过path选项指定自定义表路径,例如df.write.option("path"、"/some/path").saveAsTable('t')
。删除表时,自定义表路径不会被删除,表数据仍然存在。如果没有指定自定义表路径,Spark将把数据写到仓库目录下的默认表路径。删除表时,默认的表路径也将被删除。
从Spark 2.1开始,持久数据源表的每个分区元数据存储在’Hive metastore’中。这带来了几个好处:
由于转移点只能返回查询所需的分区,因此不再需要将第一个查询中的所有分区都发现到表中。
Hive DDLs例如
ALTER TABLE PARTITION ... SET LOCATION
现在对使用数据源API创建的表可用。
注意,在创建外部数据源表(带有’path’选项的表)时,默认情况下不会收集分区信息。要在metastore同步分区信息,可以调用MSCK REPAIR TABLE
。
- 嵌套、排序和分区
对于基于文件的数据源,在保存时可以进行嵌套、排序或分区。但是套接和排序只适用于持久表:
1 | df.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed") |
当使用数据集api时,分区可以与save和saveAsTable一起使用。
1 | df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet") |
可以对单表同时使用分区和嵌套:
1 | df = spark.read.parquet("examples/src/main/resources/users.parquet") |
partitionBy 创建一个目录结构目录结构下面的有所介绍。因此,它对具有高基数的列的适用性有限。相比之下,bucketBy 将数据分布在固定数量的bucket上,并且可以在许多惟一值是无界的情况下使用。
parquet文件
parquet是一种列式的格式,它受到许多其他数据处理系统的支持。Spark SQL支持读取和写入parquet文件,这些文件自动保存原始数据的模式。在保存parquet文件时,出于兼容性的原因,所有列都自动转换为可空列。关于parquet更多信息,请查看本人的Parquet简介。
- 使用Python加载数据
1 | peopleDF = spark.read.json("examples/src/main/resources/people.json") |
- 探索分区
表分区是Hive等系统中常用的优化方法。在分区表中,数据通常存储在不同的目录中,分区列值编码在每个分区目录的路径中。所有内置的文件源(包括text、CSV、JSON、ORC、Parquet)都能够自动发现和推断分区信息。例如,我们可以使用以下目录结构将所有以前使用的人口数据存储到分区表中,其中两个额外的列,gender
和country
作为分区列:
1 | path |
通过path/to/table
使用SparkSession.read.parquet`
或SparkSession.read.load`,Spark SQL将自动从路径中提取分区信息。现在返回的DataFrame的模式变成:
1 | root |
注意,分区列的数据类型是自动推断的。目前,支持数字数据类型、日期、时间戳和字符串类型。有时用户可能不希望自动推断分区列的数据类型。对于这些用例,可以通过spark.sql.sources.partitionColumnTypeInference.enabled
自动推断类型,默认为true。当为false时,分区列将使用string类型。
从Spark 1.6.0开始,默认情况下,分区发现只能在给定路径下找到分区。对于上面的例子,如果用户将path/to/table/gender=male传递给任何一个SparkSession.read.parquet
或SparkSession.read.load
时,gender
将不会视为分区列。如果用户需要指定分区发现应该开始的基本路径,他们可以在数据源选项中设置basePath。例如,当path/to/table/gender=male是数据的路径,用户将basePath设置为path/to/table/时,gender将是一个分区列。
模式合并
像ProtocolBuffer、Avro和Thrift一样,Parquet也支持模式演化。用户可以从一个简单的模式开始,然后根据需要逐渐向该模式添加更多的列。这样,用户可能会得到多个具有不同但相互兼容的模式的Parquet文件。Parquet数据源现在能够自动检测这种情况并合并所有这些文件的模式。
由于模式合并是一个相对昂贵的操作,而且在大多数情况下不是必需的,因此我们从1.5.0开始默认关闭它。你可以通过
读取parquet文件时,将数据源选项
mergeSchema
设置为true
(如下面的示例所示),或设置全局SQL选项
spark.sql.parquet.mergeSchema
为true
。
1 | from pyspark.sql import Row |
Hive metastore转换为parquet表
在从Hive metastore parquet table读取和写入数据时,Spark SQL将尝试使用自己的parquet,而不是Hive SerDe,以获得更好的性能。这个行为由spark.sql.hive.convertMetastoreParquet
配置,并在默认情况下打开。
Hive/Parquet Schema协调
从表模式处理的角度来看,Hive和Parquet有两个关键的区别。
Hive是不区分大小写的,而parquet区分。
Hive认为所有的列都是可空的,而在parquet中的可空性是很重要的
因此,在将Hive metastore parquet table
转换为Spark SQL parquet table
时,我们必须将Hive metastore parquet table
与Parquet
模式进行协调。协调规则如下:
无论是否为空,两个模式中具有相同名称的字段必须具有相同的数据类型。协调字段应该具有parquet的数据类型,以便考虑可空性。
协调模式正好包含在Hive metastore模式中定义的字段。
在parquet模式中的任何字段都将被删除在协调模式中。
只出现在Hive metastore模式中的任何字段都作为可空字段添加到协调模式中。
元数据刷新
Spark SQL缓存parquet元数据以获得更好的性能。当启用Hive metastore parquet table转换时,这些转换表的元数据也会被缓存。如果这些表是由Hive或其他外部工具更新的,则需要手动刷新它们,以确保一致的元数据。
1 | # spark is an existing SparkSession |
配置
可以使用SparkSession上的setConf
方法或使用SQL运行SET key=value
命令来配置parquet。
属性名称 | 默认值 | 描述 |
---|---|---|
spark.sql.parquet.binaryAsString | false | 其他一些生成Parquet的系统,特别是Impala、Hive和Spark SQL的旧版本,在编写Parquet模式时不区分二进制数据和字符串。这个标志告诉Spark SQL将二进制数据解释为字符串,以提供与这些系统的兼容性。 |
spark.sql.parquet.int96AsTimestamp | true | 一些parquet生产系统,特别是Impala和Hive,将时间戳存储到INT96中。这个标志告诉Spark SQL将INT96数据解释为一个时间戳,以提供与这些系统的兼容性。 |
spark.sql.parquet.cacheMetadata | true | 打开parquet模式元数据的缓存。可以加速静态数据的查询。snappy设置编写拼花文件时使用的压缩编解码器。可接受的值包括:uncompression、snappy、gzip、lzo。 |
spark.sql.parquet.filterPushdown | true | 设置为true时,启用parquet过滤下推优化 |
spark.sql.hive.convertMetastoreParquet | true | 当设置为false时,Spark SQL将对拼花表使用Hive SerDe,而不是内置支持。 |
spark.sql.parquet.mergeSchema | false | 如果为真,Parquet数据源将合并从所有数据文件收集的模式,否则将从总结文件中选择模式,如果没有总结文件可用,则从随机数据文件中选择模式。 |
spark.sql.optimizer.metadataOnly | true | 如果为真,则启用仅使用表的元数据来生成分区列而不是表扫描的元数据查询优化。当扫描的所有列都是分区列且查询具有满足不同语义的聚合操作符时,该方法将适用。 |
json数据集
Spark SQL可以自动推断JSON数据集的模式,并将其作为DataFrame加载。可以使用SparkSession.read.json
读取json文件。
注意,作为json文件提供的文件不是典型的json文件。每行必须包含一个单独的、自包含的有效JSON对象。有关更多信息请查看JSON行文本格式,也称为以新行分隔的JSON
对于常规的多行JSON文件,将多行参数设置为True。
1 | # spark is from the previous example. |
Hive表
Spark SQL还支持读取和写入存储在Apache Hive中的数据。然而,由于Hive有大量的依赖项,这些依赖项并不包含在默认的Spark发行版中。如果可以在类路径中找到Hive依赖项,Spark将自动加载它们。请注意,这些Hive依赖项还必须出现在所有工作节点上,因为它们需要访问Hive序列化和反序列化库(SerDes),以便访问存储在Hive中的数据。
Hive的配置是通过放置Hive站点来完成的: 配置在conf
下的hive-site.xml, core-site.xml(用于安全配置)和hdfs-site.conf(用于HDFS配置)文件。
在使用Hive时,必须使用Hive支持实例化SparkSession
,包括连接到持久的Hive转移、支持Hive serdes和Hive user-defined的函数。用户仍然可以启用Hive支持在没有Hive部署的机器上。当没有配置hive-site.xml
时。上下文会在当前目录中自动创建metastore_db,并创建一个由spark.sql.warehouse.dir
。默认为启动Spark应用程序的当前目录中的Spark-warehouse目录。注意hive.metastore.warehouse.dir
从Spark 2.0.0开始,hive-site.xml
就被弃用了。相反,使用spark.sql.warehouse.dir
指定数据库在仓库中的默认位置。您可能需要将写权限授予启动Spark应用程序的用户。
1 |
|
指定Hive表的存储格式
在创建Hive表时,需要定义这个表应该如何向文件系统读写数据,即”输入格式”和”输出格式”。您还需要定义这个表应该如何将数据反序列化为行,或将行序列化为数据,即”serde”。以下选项可用于指定存储格式(“serde”、”input format”、”output format”),例如CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet').
。默认情况下,我们将以纯文本的形式读取表文件。注意,在创建表时还不支持Hive存储处理程序,您可以使用Hive侧的存储处理程序创建表,并使用Spark SQL读取它。
属性名 | 描述 |
---|---|
fileFormat | fileFormat是一种存储格式规范的包,包括”serde”、”input format”和”output format”。目前我们支持6种文件格式:”sequencefile”,”rcfile”,”orc”,”parquet”,”textfile”和”avro”。 |
inputFormat, outputFormat | 这两个选项将对应的”InputFormat”和”OutputFormat”类的名称指定为字符串文本,例如。”org.apache.hadoop.hive.ql.io.orc.OrcInputFormat”。这两个选项必须成对出现,如果已经指定了”fileFormat”选项,则不能指定它们。 |
serde | 此选项指定serde类的名称。指定”fileFormat”选项时,如果给定的”fileFormat”已经包含serde的信息,则不要指定此选项。目前”sequencefile”、”textfile”和”rcfile”不包含serde信息,您可以在这三种文件格式中使用此选项。 |
fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim | 此选项只能与”textfile”文件格式一起使用。它们定义如何将带分隔符的文件读入行。 |
使用OPTIONS
定义的所有其他属性将被视为Hive serde属性。
与不同版本的Hive Metastore相互作用
Spark SQL的Hive支持中最重要的部分之一是与Hive metastore的交互,这使得Spark SQL能够访问Hive表的元数据。从Spark 1.4.0开始,Spark SQL的一个二进制构建可以使用下面描述的配置来查询不同版本的Hive metastores。注意,独立于用于与metastore通信的Hive版本,内部Spark SQL将针对Hive 1.2.1编译,并将这些类用于内部执行(serdes、udf、udf等)。
以下选项可用于配置用于检索元数据的Hive版本:
属性名称 | 默认值 | 描述 |
---|---|---|
spark.sql.hive.metastore.version | 1.2.1 | Hive metastore版本号。 蜂巢转移。可用选项从0.12.0到1.2.1 |
spark.sql.hive.metastore.jars | builtin | 用于实例化HiveMetastoreClient的jar的位置。此属性可以是三个选项之一:builtin: 使用Hive 1.2.1,当启用-Phive时,它与Spark程序集绑定在一起。当选择此选项时,spark.sq .hive.metastore。版本必须是1.2.1或未定义。maven: 用从Maven存储库下载的指定版本的Hive jar。一般不建议将此配置用于生产部署。 JVM标准格式的类路径。这个类路径必须包括所有Hive及其依赖项,包括正确的Hadoop版本。这些jar只需要出现在驱动程序上,但是如果您在yarn集群模式下运行,那么您必须确保它们与您的应用程序一起打包。 |
spark.sql.hive.metastore.sharedPrefixes | com.mysql.jdbc, org.postgresql, com.microsoft.sqlserver, oracle.jdbc | 类前缀的逗号分隔列表,应该使用在Spark SQL和特定版本的Hive之间共享的类加载器加载这些前缀。应该共享的类的一个例子是JDBC驱动程序,它需要与转移服务器进行通信。需要共享的其他类是那些与已经共享的类交互的类。例如,log4j使用的自定义appender。 |
spark.sql.hive.metastore.barrierPrefixes | (empty) | Spark SQL正在与之通信的每个Hive版本都应该显式地重新加载类前缀的逗号分隔列表。例如,Hive udf声明在一个通常会被共享的前缀中(即org.apache.spark.*)。 |
通过JDBC来连接其他的关系型数据库
Spark SQL还包括一个数据源,可以使用JDBC从其他数据库读取数据。与使用JdbcRDD相比,这种功能更可取。这是因为结果是以DataFrame的形式返回的,并且它们可以在Spark SQL中轻松处理或与其他数据源连接。JDBC数据源也更容易从Java或Python中使用,因为它不需要用户提供一个ClassTag。(请注意,这与Spark SQL JDBC服务器不同,后者允许其他应用程序使用Spark SQL运行查询)。
首先,您需要在spark类路径中包含特定数据库的JDBC驱动程序。例如,要从Spark Shell连接到postgres,对于postgres的按照查看此处,需要运行以下命令:
1 | bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar |
可以使用数据源API将远程数据库中的表加载为DataFrame或Spark SQL临时视图。用户可以在数据源选项中指定JDBC连接属性。用户和密码通常作为登录数据源的连接属性提供。除了连接属性,Spark还支持以下不区分大小写的选项:
属性名 | 描述 |
---|---|
url | 要连接的JDBC URL。特定于源代码的连接属性可以在URL中指定。例如: jdbc:postgresql://localhost/test?user=fred&password=secret |
dbtable | 应该读取的JDBC表。注意,可以使用SQL查询的FROM子句中有效的任何内容。例如,您也可以使用圆括号中的子查询来代替完整的表。 |
driver | 用于连接此URL的JDBC驱动程序的类名。 |
partitionColumn, lowerBound, upperBound | 如果指定了这些选项中的任何一个,则必须指定所有这些选项。此外,必须指定numpartition。它们描述了当从多个worker并行读取数据时如何对表进行分区。partitionColumn必须是表中的数字列。注意,下界和上界仅用于决定分区步幅,而不是用于过滤表中的行。因此,表中的所有行都将被分区并返回。此选项仅适用于阅读。 |
numPartitions | 用于表读写并行性的分区的最大数目。这也决定了并发JDBC连接的最大数量。如果要写入的分区数量超过此限制,则在写入之前调用coalesce(numPartitions)将其减少到此限制。 |
fetchsize | JDBC获取大小,它决定每次往返要获取多少行。这有助于JDBC驱动程序的性能,JDBC驱动程序默认为低获取大小(例如。Oracle有10行)。此选项仅适用于阅读。 |
batchsize | JDBC批处理大小,它决定每个往返要插入多少行。这可以帮助JDBC驱动程序的性能。此选项仅适用于写入。默认值是1000。 |
isolationLevel | 事务隔离级别,适用于当前连接。它可以是NONE、READ_COMMITTED、READ_UNCOMMITTED、REPEATABLE_READ或SERIALIZABLE之一,对应于JDBC的连接对象定义的标准事务隔离级别,默认为READ_UNCOMMITTED。此选项仅适用于写入。请参考java.sql.Connection中的文档。 |
truncate | 这是一个JDBC编写器相关的选项。当SaveMode。启用覆盖后,此选项将导致Spark截断现有表,而不是删除和重新创建该表。这可以更有效地防止表元数据(例如索引)被删除。但是,在某些情况下,例如新数据具有不同的模式时,它将不起作用。它默认为false。此选项仅适用于写入。 |
createTableOptions | 这是一个JDBC编写器相关的选项。如果指定,该选项允许在创建表时设置特定于数据库的表和分区选项(例如,CREATE table t (name string) ENGINE=InnoDB.)。此选项仅适用于写入。 |
createTableColumnTypes | 创建表时要使用的数据库列数据类型,而不是默认值。数据类型信息应该以与CREATE TABLE columns语法(e。g:”name CHAR(64), comments VARCHAR(1024)”)。指定的类型应该是有效的spark sql数据类型。此选项仅适用于写入。 |
1 | # Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods |
在Spark repo的”examples/src/main/python/sql/datasource.py”中找到完整的示例。
故障排除
JDBC驱动程序类必须对客户机会话和所有执行程序上的原始类装入器可见。这是因为Java的DriverManager类做了一个安全检查,当打开一个连接时,它会忽略所有原始类装入器不可见的驱动程序。一种方便的方法是修改compute_classpath。在所有工作节点上使用sh来包含驱动程序jar。
有些数据库,如H2,将所有名称转换为大写。您需要使用大写字母来引用Spark SQL中的这些名称。
要想了解更多关于Spark的使用,请查看本人下面博客: