OpenMLDB Spark 发行版#

简介#

OpenMLDB Spark发行版是面向特征工程优化后的高性能原生Spark版本。OpenMLDB Spark和标准Spark发行版一样提供Scala、Java、Python和R编程接口,用户使用OpenMLDB Spark发行版的方法与标准版一致。

GitHub Repo: https://github.com/4paradigm/Spark/

下载OpenMLDB Spark发行版#

在上述Github仓库的Releases页面提供了OpenMLDB Spark发行版的下载地址,用户可以直接下载到本地使用。

注意,预编译的OpenMLDB Spark发行版为allinone版本,支持Linux和MacOS操作系统,如有特殊需求也可以下载源码重新编译。

OpenMLDB Spark配置#

OpenMLDB Spark兼容标准的Spark配置,除此之外,还支持新增的配置项,可以更好地利用原生执行引擎的性能优化。

新增配置#

配置项

说明

默认值

备注

spark.openmldb.window.parallelization

是否启动窗口并行计算优化

false

窗口并行计算可提高集群利用率但会增加计算节点

spark.openmldb.addIndexColumn.method

添加索引列方法

monotonicallyIncreasingId

可选方法有zipWithUniqueId, zipWithIndex, monotonicallyIncreasingId

spark.openmldb.concatjoin.jointype

拼接拼表方法

inner

可选方法有inner, left, last

spark.openmldb.enable.native.last.join

是否开启NativeLastJoin优化

true

相比基于LeftJoin的实现,具有更高性能

spark.openmldb.enable.unsaferow.optimization

是否开启UnsafeRow内存优化

false

开启后使用UnsafeRow编码格式,目前部分复杂类型不支持

spark.openmldb.opt.unsaferow.project

Project节点是否开启UnsafeRow内存优化

false

开启后降低Project节点编解码开销,目前部分复杂类型不支持

spark.openmldb.opt.unsaferow.window

Window节点是否开启UnsafeRow内存优化

false

开启后降低Window节点编解码开销,目前部分复杂类型不支持

spark.openmldb.opt.join.spark_expr

Join条件是否开启Spark表达式优化

true

开启后Join条件计算使用Spark表达式,减少编解码开销,目前部分复杂表达式不支持

spark.openmldb.physical.plan.graphviz.path

导出物理计划图片的路径

“”

默认不导出图片文件

  • 如果SQL任务有多个窗口计算并且计算资源足够,推荐开启窗口并行计算优化,提高资源利用率和降低任务运行时间。

  • 如果SQL任务中Join条件表达式比较复杂,默认运行失败,推荐关闭Join条件Spark表达式优化,提高任务运行成功率。

  • 如果SQL任务中输入表或中间表列数较大,推荐同时开启上表的三个UnsafeRow优化,减少编解码开销和降低任务运行时间。

使用#

使用Example Jars#

下载解压后,设置SPARK_HOME环境变量,可以直接执行Example Jars中的例子。

export SPARK_HOME=`pwd`/spark-3.2.1-bin-openmldbspark/

$SPARK_HOME/bin/spark-submit \
  --master local \
  --class org.apache.spark.examples.sql.SparkSQLExample \
  $SPARK_HOME/examples/jars/spark-examples*.jar

注意,SparkSQLExample为标准Spark源码自带的例子,部分SQL例子使用了OpenMLDB Spark优化进行加速,部分DataFrame例子不支持OpenMLDB Spark优化。

使用PySpark#

下载OpenMLDB Spark发行版后,也可以使用标准的PySpark编写应用,示例代码如下。

from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *
 
spark = SparkSession.builder.appName("demo").getOrCreate()
print(spark.version)

schema = StructType([
    StructField("name", StringType(), nullable=True),
    StructField("age", IntegerType(), nullable=True),
])

rows = [
    Row("Andy", 20),
    Row("Berta", 30),
    Row("Joe", 40)
]

spark.createDataFrame(spark.sparkContext.parallelize(rows), schema).createOrReplaceTempView("t1")
spark.sql("SELECT name, age + 1 FROM t1").show()

保存源码文件为openmldbspark_demo.py后,使用下面命令提交本地运行。

${SPARK_HOME}/bin/spark-submit \
    --master=local \
    ./openmldbspark_demo.py