OpenMLDB Spark Distribution#

Overview#

The OpenMLDB Spark distribution is a high-performance native Spark version optimized for feature engineering. Like the standard Spark distribution, OpenMLDB Spark provides Scala, Java, Python, and R programming interfaces. Users can use the OpenMLDB Spark in the same way as the standard Spark.

GitHub Repo: https://github.com/4paradigm/Spark/

Download#

You can download the OpenMLDB Spark distribution in the Release page of the repository mentioned above.

Note

The pre-compiled OpenMLDB Spark distribution is the AllinOne version, which supports Linux and MacOS operating systems. If you have special requirements, you can also download the source code and recompile it.

Configuration#

OpenMLDB Spark supports standard Spark configuration. Furthermore, it has new configuration that can take full advantage of the performance optimization based on the native execution engine.

New Configuration of the OpenMLDB Spark Distribution#

Configuration

Function

Default Value

Note

spark.openmldb.window.parallelization

It defines whether to enable the window parallelization.

false

Window parallelization can improve the efficiency when there is sufficient computing resource.

spark.openmldb.addIndexColumn.method

It defines the method of adding indexes on columns.

monotonicallyIncreasingId

Options are zipWithUniqueId, zipWithIndex, monotonicallyIncreasingId.

spark.openmldb.concatjoin.jointype

It defines the method of concatenating tables.

inner

Options are inner, left, last.

spark.openmldb.enable.native.last.join

It defines whether to enable the native last join implementation.

true

When the value is true, it will have higher performance compared with the implementation based on LEFT JOIN.

spark.openmldb.enable.unsaferow.optimization

It defines whether to enable the UnsafeRow memory optimization

false

When the value is true, it will use the UnsafeRow format for encoding to improve the performance. However, there are known issues when expressions are complicated.

spark.openmldb.opt.unsaferow.project

It defines whether to enable the UnsafeRow memory optimization on PROJECT nodes.

false

When the value is true, it will reduce the overhead of encoding and decoding on PROJECT nodes but there are known issues for complicated expressions.

spark.openmldb.opt.unsaferow.window

It defines whether to enable the UnsafeRow memory optimization on WINDOW nodes.

false

When the value is true, it will reduce the overhead of encoding and decoding on WINDOW nodes but there are known issues for complicated expressions.

spark.openmldb.opt.join.spark_expr

It defines whether to use the Spark expression on JOIN clause.

true

When the value is true, it will use the Spark expression when processing JOIN clause. There are known issues when expressions are complicated as well.

spark.openmldb.physical.plan.graphviz.path

It is the path that the physical plan image will be exported to.

“”

Image files are not exported by default.

  • If there are multiple window computing tasks and enough resources, it is recommended to set spark.openmldb.window.parallelization=true in order to improve resource utilization and reduce runtime.

  • If the JOIN expression is too complicated, the execution may fail by default. It is recommended to set spark.openmldb.opt.join.spark_expr=false to ensure the program can run successfully.

  • If there are too many columns in input tables or intermediate tables, you are recommended to enable all three optimization techniques related to UnsafeRow, in order to reduce the cost of encoding/decoding and improve the efficiency.

Usage#

Using Example Jars#

The examples in the Example Jars can be executed directly after you install the OpenMLDB Spark distribution and set the SPARK_HOME.

export SPARK_HOME=`pwd`/spark-3.2.1-bin-openmldbspark/

$SPARK_HOME/bin/spark-submit \
  --master local \
  --class org.apache.spark.examples.sql.SparkSQLExample \
  $SPARK_HOME/examples/jars/spark-examples*.jar

Note

  • SparkSQLExample is an example provided with the standard Spark source code.

  • Some SQL examples use OpenMLDB Spark optimization for higher performance.

  • Some DataFrame examples do not support OpenMLDB Spark optimization.

Using PySpark#

After installing the OpenMLDB Spark distribution, you can use the standard PySpark for development.

from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *
 
spark = SparkSession.builder.appName("demo").getOrCreate()
print(spark.version)

schema = StructType([
    StructField("name", StringType(), nullable=True),
    StructField("age", IntegerType(), nullable=True),
])

rows = [
    Row("Andy", 20),
    Row("Berta", 30),
    Row("Joe", 40)
]

spark.createDataFrame(spark.sparkContext.parallelize(rows), schema).createOrReplaceTempView("t1")
spark.sql("SELECT name, age + 1 FROM t1").show()

After saving the source file as openmldbspark_demo.py, you can use the following command to run the script locally.

${SPARK_HOME}/bin/spark-submit \
    --master=local \
    ./openmldbspark_demo.py