JohnShen's Blog.

Greenplum-Spark 数据传输工具: Greenplum-Spark Connector

字数统计: 668阅读时长: 3 min
2020/12/17 Share

Greenplum-Spark Connector 简称 GSC ,目前使用的版本为v1.6: https://greenplum-spark.docs.pivotal.io/1-6/index.html

Greenplum-Spark Connector Version Greenplum Version Spark Version Scala Version
1.6.1, 1.6.2 4.3.x, 5.x, 6.x 2.3.1 and above 2.11
1.6.0 4.3.x, 5.x 2.1.2 and above 2.11

概述

Spark application 由 Driver 和 Executor 节点构成,当使用 GSC 加载GP数据至Spark时,Driver 会通过JDBC的方式请求 Greenplum 的 master 节点获取相关的元数据信息,这些信息帮助GSC获取到表数据存储在GP的位置,以及如何在可用的Spark工作节点切分数据或工作。

GSC 在加载Greenplum数据时,需要指定 GP 表的一个字段作为 Spark 的 partition 字段,Connector 会使用这个字段的值来计算,该 Greenplum 表的某个 segment 该被哪一个或多个Spark partition读取。

读取过程如下:

  1. Spark Driver通过Jdbc的方式连接Greenplum master,并读取指定表的相关元数据信息。然后根据指定的分区字段以及分区个数去决定segment怎么分配。
  2. Spark Executor端会通过Jdbc的方式连接Greenplum master,创建Greenplum外部表。
  3. 然后Spark Executor通过Http方式连接Greenplum的数据节点,获取指定的segment的数据。该获取数据的操作在Spark Executor并行执行。

网络与端口需求

GP Master 默认端口为 5432,Spark 的驱动和工作节点与该端口进行通信。

GSC 使用gpfdist来从 GP Segment 和 Spark 工作节点间传输数据。默认情况下,连接器使用Spark 工作节点的IP地址启动gpfdist服务进程,并暴露端口。gpfdist 的地址与端口可配。

基础使用

读取
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
val spark = SparkSession
.builder()
.master("local[*]")
.appName("Spark GP Example")
.getOrCreate()

val gscReadOptionMap = Map(
"url" -> "jdbc:postgresql://10.219.184.190:5432/youdata",
"user" -> "xxx",
"password" -> "xxx",
"dbschema" -> "test",
"dbtable" -> "hivespark1",
"partitionColumn" -> "x"
)

val gpdf = spark.read.format("greenplum")
.options(gscReadOptionMap)
.load()

gpdf.show()
spark.close()
写入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
val spark = SparkSession
.builder()
.master("local[*]")
.appName("Spark GP Write Example")
.config("hive.metastore.uris","thrift://xxx:9083")
.enableHiveSupport()
.getOrCreate()

val gscWriteOptionMap = Map(
"url" -> "jdbc:postgresql://xxx:5432/xxx",
"user" -> "xxx",
"password" -> "xxx",
"dbschema" -> "test",
"dbtable" -> "hivespark_write",
"partitionColumn" -> "x"
)

spark.table("xx.xxx")
.write
.format("greenplum")
.options(gscWriteOptionMap)
.mode(SaveMode.Overwrite)

spark.close()

在将spark/conf/下的log4j配置加上log4j.logger.io.pivotal.greenplum.spark=DEBUG后启动:

1
./spark-submit --master local[*] --class sparksql.GpWriteApp --jars ./temp/greenplum-spark_2.11-1.6.2.jar  ./spark-learn-1.0-SNAPSHOT-jar-with-dependencies.jar

可以发现整个SQL执行过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
CREATE TABLE "test"."hivespark_write" ("a" TEXT, "b" TEXT, "c" TEXT, "d" TEXT, "e" TEXT, "f" TEXT, "n" TEXT, "na" TEXT, "nb" TEXT, "t" TIMESTAMP, "sa" BIGINT, "sb" BIGINT, "sc" BIGINT, "sd" BIGINT, "se" BIGINT, "sf" BIGINT, "x" BIGINT, "y" BIGINT);

CREATE READABLE EXTERNAL TABLE
"test"."spark_216956b47a67008c_3d9d854163f8f07a_driver_61" (LIKE "test"."hivespark_write")
LOCATION ('gpfdist://10.219.185.6:39825/spark_216956b47a67008c_3d9d854163f8f07a_driver_61')
FORMAT 'CSV'
(DELIMITER AS '|'
NULL AS '')
ENCODING 'UTF-8'

INSERT INTO "test"."hivespark_write"
SELECT *
FROM "test"."spark_216956b47a67008c_3d9d854163f8f07a_driver_61"
CATALOG
  1. 1. 概述
  2. 2. 网络与端口需求
    1. 2.1. 基础使用
      1. 2.1.1. 读取
      2. 2.1.2. 写入