OpenMLDB Spark Distribution
Contents
OpenMLDB Spark Distribution#
Overview#
The OpenMLDB Spark distribution is a high-performance native Spark version optimized for feature engineering. Like the standard Spark distribution, OpenMLDB Spark provides Scala, Java, Python, and R programming interfaces. Users can use the OpenMLDB Spark in the same way as the standard Spark.
GitHub Repo: https://github.com/4paradigm/Spark/
Download#
You can download the OpenMLDB Spark distribution in the Release page of the repository mentioned above.
Note
The pre-compiled OpenMLDB Spark distribution is the AllinOne version, which supports Linux and MacOS operating systems. If you have special requirements, you can also download the source code and recompile it.
Configuration#
OpenMLDB Spark supports standard Spark configuration. Furthermore, it has new configuration that can take full advantage of the performance optimization based on the native execution engine.
New Configuration of the OpenMLDB Spark Distribution#
Configuration |
Function |
Default Value |
Note |
---|---|---|---|
spark.openmldb.window.parallelization |
It defines whether to enable the window parallelization. |
false |
Window parallelization can improve the efficiency when there is sufficient computing resource. |
spark.openmldb.addIndexColumn.method |
It defines the method of adding indexes on columns. |
monotonicallyIncreasingId |
Options are |
spark.openmldb.concatjoin.jointype |
It defines the method of concatenating tables. |
inner |
Options are |
spark.openmldb.enable.native.last.join |
It defines whether to enable the native last join implementation. |
true |
When the value is |
spark.openmldb.enable.unsaferow.optimization |
It defines whether to enable the UnsafeRow memory optimization |
false |
When the value is |
spark.openmldb.opt.unsaferow.project |
It defines whether to enable the UnsafeRow memory optimization on PROJECT nodes. |
false |
When the value is |
spark.openmldb.opt.unsaferow.window |
It defines whether to enable the UnsafeRow memory optimization on WINDOW nodes. |
false |
When the value is |
spark.openmldb.opt.join.spark_expr |
It defines whether to use the Spark expression on JOIN clause. |
true |
When the value is |
spark.openmldb.physical.plan.graphviz.path |
It is the path that the physical plan image will be exported to. |
“” |
Image files are not exported by default. |
If there are multiple window computing tasks and enough resources, it is recommended to set
spark.openmldb.window.parallelization=true
in order to improve resource utilization and reduce runtime.If the JOIN expression is too complicated, the execution may fail by default. It is recommended to set
spark.openmldb.opt.join.spark_expr=false
to ensure the program can run successfully.If there are too many columns in input tables or intermediate tables, you are recommended to enable all three optimization techniques related to
UnsafeRow
, in order to reduce the cost of encoding/decoding and improve the efficiency.
Usage#
Using Example Jars#
The examples in the Example Jars
can be executed directly after you install the OpenMLDB Spark distribution and set the SPARK_HOME
.
export SPARK_HOME=`pwd`/spark-3.2.1-bin-openmldbspark/
$SPARK_HOME/bin/spark-submit \
--master local \
--class org.apache.spark.examples.sql.SparkSQLExample \
$SPARK_HOME/examples/jars/spark-examples*.jar
Note
SparkSQLExample is an example provided with the standard Spark source code.
Some SQL examples use OpenMLDB Spark optimization for higher performance.
Some DataFrame examples do not support OpenMLDB Spark optimization.
Using PySpark#
After installing the OpenMLDB Spark distribution, you can use the standard PySpark for development.
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *
spark = SparkSession.builder.appName("demo").getOrCreate()
print(spark.version)
schema = StructType([
StructField("name", StringType(), nullable=True),
StructField("age", IntegerType(), nullable=True),
])
rows = [
Row("Andy", 20),
Row("Berta", 30),
Row("Joe", 40)
]
spark.createDataFrame(spark.sparkContext.parallelize(rows), schema).createOrReplaceTempView("t1")
spark.sql("SELECT name, age + 1 FROM t1").show()
After saving the source file as openmldbspark_demo.py
, you can use the following command to run the script locally.
${SPARK_HOME}/bin/spark-submit \
--master=local \
./openmldbspark_demo.py