Importing Real-Time Data Streams from Pulsar
Contents
Importing Real-Time Data Streams from Pulsar#
Introduction#
Apache Pulsar is a cloud-native, distributed messaging and streaming platform. It can be used as online data source for OpenMLDB to import real-time data streams. You can learn more about Pulsar from the project website https://pulsar.apache.org/. We have developed an OpenMLDB JDBC Connector to work seamlessly with Pulsar. In this document, you will learn the concepts and usages of this connector.
Note that, for the sake of simplicity, for this document, we use Pulsar Standalone, OpenMLDB cluster and a simple JSON message producer to show how the OpenMLDB JDBC Connector works. The connector also works well with the Pulsar Cluster.
Overview#
Download#
You can download the entire demo package here, which are needed by this demo, including the connector nar, schema files, and config files.
If you would like to download the connector only, you can download it here from the OpenMLDB release.
Workflow#
The below figure summarizes the workflow of using this connector. We will further explain the detail later. Moreover, we have recorded the steps at terminalizer page for easy reference; or you can also download the demo script demo.yml.
Step 1#
Create OpenMLDB Cluster#
Use docker to start it simply, and we need to create a test table, you can check on Get started with cluster version of OpenMLDB .
Caution
Only OpenMLDB cluster mode can be the sink dist, and only write to online storage.
We recommend that you use ‘host network’ to run docker. And bind volume ‘files’ too. The sql scripts are in it.
docker run -dit --network host -v `pwd`/files:/work/pulsar_files --name openmldb 4pdosc/openmldb:0.7.3 bash
docker exec -it openmldb bash
Note
Even the host network, docker on macOS cannot support connecting to the container from the host. You only can connect openmldb cluster in other containers, like pulsar container.
In OpenMLDB container, start the cluster:
./init.sh
Create table#
We use a script to create the table, create.sql content:
create database pulsar_test;
use pulsar_test;
create table connector_test(id string, vendor_id int, pickup_datetime bigint, dropoff_datetime bigint, passenger_count int, pickup_longitude double, pickup_latitude double, dropoff_longitude double, dropoff_latitude double, store_and_fwd_flag string, trip_duration int);
desc connector_test;
Run the script:
/work/openmldb/bin/openmldb --zk_cluster=127.0.0.1:2181 --zk_root_path=/openmldb --role=sql_client < /work/pulsar_files/create.sql
Note
JSONSchema and JDBC base connector can’t support ‘java.sql.Timestamp’ now. So we use ‘bigint’ to be the timestamp column type(it works in OpenMLDB).
Step 2#
Start Pulsar Standalone#
It’s simpler and quicker to run Pulsar in docker.
We recommend that you use ‘host network’ to run docker, to avoid network problems about docker containers.
And we need to use pulsar-admin to create a sink, it’s in the docker container. So we will run the container in bash first, and run cmds in it.
Don’t forget to bind the dir ‘files’.
docker run -dit --network host -v `pwd`/files:/pulsar/files --name pulsar apachepulsar/pulsar:2.9.1 bash
docker exec -it pulsar bash
In Pulsar container, start the pulsar standalone server.
bin/pulsar-daemon start standalone --zookeeper-port 5181
Note
OpenMLDB want to use the port 2181, so we should change the zk port here. We will use zk port 2181 to connect OpenMLDB, but zk port in Pulsar standalone won’t affect anything.
You can ps
to check if the pulsar runs well. If failed, check the standalone server log logs/pulsar-standalone-....log
.
ps axu|grep pulsar
When you start a local standalone cluster, a public/default namespace is created automatically. The namespace is used for development purposes, ref pulsar doc.
We will create the sink in the namespace.
See also
If you really want to start pulsar locally, see Set up a standalone Pulsar locally.
Q&A#
Q:
2022-04-07T03:15:59,289+0000 [main] INFO org.apache.zookeeper.server.NIOServerCnxnFactory - binding to port 0.0.0.0/0.0.0.0:5181
2022-04-07T03:15:59,289+0000 [main] ERROR org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble - Exception while instantiating ZooKeeper
java.net.BindException: Address already in use
How to fix it? A: Pulsar wants an unused address to start zk server,5181 is used too. Change another port in ‘–zookeeper-port’.
Q: 8080 is already used?
A: change the port ‘webServicePort’ in conf/standalone.conf
. Don’t forget the ‘webServiceUrl’ in conf/client.conf
, pulsar-admin needs the conf.
Q: 6650 is already used?
A: change ‘brokerServicePort’ in conf/standalone.conf
and ‘brokerServiceUrl’ in conf/client.conf
.
Connector installation(Optional)#
In the previous step, we bind mount ‘files’, the connector nar is in it. We’ll use ‘non built-in connector’ mode to set up the connector(use ‘archive’ in sink config).
If you really want the connector to be the built-in connector, copy it to ‘connectors’.
mkdir connectors
cp files/pulsar-io-jdbc-openmldb-2.11.0-SNAPSHOT.nar connectors/
You want to change or add more connectors, you can update connectors when pulsar standalone is running:
bin/pulsar-admin sinks reload
Built-in OpenMLDB connector’s sink type is ‘jdbc-openmldb’.
Create sink#
We use the ‘public/default’ namespace to create sink, and we need a sink config file, it’s files/pulsar-openmldb-jdbc-sink.yaml
, content:
tenant: "public"
namespace: "default"
name: "openmldb-test-sink"
archive: "files/pulsar-io-jdbc-openmldb-2.11.0-SNAPSHOT.nar"
inputs: ["test_openmldb"]
configs:
jdbcUrl: "jdbc:openmldb:///pulsar_test?zk=localhost:2181&zkPath=/openmldb"
tableName: "connector_test"
Then create a sink and check it, notice that the input topic is ‘test_openmldb’.
./bin/pulsar-admin sinks create --sink-config-file files/pulsar-openmldb-jdbc-sink.yaml
./bin/pulsar-admin sinks status --name openmldb-test-sink
Create Schema#
Upload schema to topic ‘test_openmldb’, schema type is JSON. We’ll produce the JSON message in the same schema later. The schema file is ‘files/openmldb-table-schema’. Schema content:
{
"type": "JSON",
"schema":"{\"type\":\"record\",\"name\":\"OpenMLDBSchema\",\"namespace\":\"com.foo\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"vendor_id\",\"type\":\"int\"},{\"name\":\"pickup_datetime\",\"type\":\"long\"},{\"name\":\"dropoff_datetime\",\"type\":\"long\"},{\"name\":\"passenger_count\",\"type\":\"int\"},{\"name\":\"pickup_longitude\",\"type\":\"double\"},{\"name\":\"pickup_latitude\",\"type\":\"double\"},{\"name\":\"dropoff_longitude\",\"type\":\"double\"},{\"name\":\"dropoff_latitude\",\"type\":\"double\"},{\"name\":\"store_and_fwd_flag\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"trip_duration\",\"type\":\"int\"}]}",
"properties": {}
}
Upload schema and check it, commands:
./bin/pulsar-admin schemas upload test_openmldb -f ./files/openmldb-table-schema
./bin/pulsar-admin schemas get test_openmldb
For demonstration purposes, we omit the fields part. The result as follows:
Test#
Send messages#
We use the first 2 rows of sample data(in openmldb docker data/taxi_tour_table_train_simple.csv
) to be the test messages, as follows.
Java Producer#
Producer JAVA code in demo producer. Essential code is
So the producer will send the 2 messages to topic ‘test_openmldb’. And then Pulsar will read the messages and write them to OpenMLDB cluster online storage.
The package is in ‘files’. You can run it directly.
java -cp files/pulsar-client-java-1.0-SNAPSHOT-jar-with-dependencies.jar org.example.Client
Python Producer#
You can write the Producer in Python, please check the code in files/pulsar_client.py
.
Before run it, you should install the pulsar python client:
pip3 install pulsar-client==2.9.1
Then run the producer:
python3 files/pulsar_client.py
Check#
Check in Pulsar#
We can check the sink status:
./bin/pulsar-admin sinks status --name openmldb-test-sink
Note
“numReadFromPulsar”: pulsar sent 2 messages to the sink instance.
“numWrittenToSink”: sink instance write 2 messages to OpenMLDB.
Check in OpenMLDB#
And we can get these messages data in the OpenMLDB table’s online storage now. The script select.sql content:
set @@execute_mode='online';
use pulsar_test;
select *, string(timestamp(pickup_datetime)), string(timestamp(dropoff_datetime)) from connector_test;
In OpenMLDB container, run:
/work/openmldb/bin/openmldb --zk_cluster=127.0.0.1:2181 --zk_root_path=/openmldb --role=sql_client < /work/pulsar_files/select.sql
Debug#
If the OpenMLDB table doesn’t have the data, but the sinks status shows it has written to OpenMLDB, the sink instance may have some problems. You should check the sink log, the path is logs/functions/public/default/openmldb-test-sink/openmldb-test-sink-0.log
. If you use another sink name, the path will change.
Pulsar will retry to write the failed messages. So if you sent the wrong message 1 and then sent the right message 2, even the right message 2 has written to OpenMLDB, the wrong message 1 will be sent and print the error in log. It’s confusing. We’d recommend you to truncate the topic before testing again.
./bin/pulsar-admin topics truncate persistent://public/default/test_openmldb
If you use another sink name, you can get it by ./bin/pulsar-admin topics list public/default
.
debug log#
If the sink instance log is not enough, you can open the debug level of log. You should modify the log config, and restart the sink instance.
vim conf/functions_log4j2.xml
and modify it:
<Property>
<name>pulsar.log.level</name>
<value>debug</value> <!-- set to debug level -->
</Property>
<Root>
<level>${sys:pulsar.log.level}</level> <!--change from info to ${sys:pulsar.log.level} or debug -->
<AppenderRef>
<ref>${sys:pulsar.log.appender}</ref>
<level>${sys:pulsar.log.level}</level>
</AppenderRef>
</Root>
Then restart the sink instance:
./bin/pulsar-admin sinks restart --name openmldb-test-sink
reinitialize Pulsar#
bin/pulsar-daemon stop standalone --zookeeper-port 5181
rm -r data logs
bin/pulsar-daemon start standalone --zookeeper-port 5181