Synchronizing information from Kafka to TDengine Databases

This article discusses how to use the TDengine Database Kafka Connector to synchronize data from Kafka to TDengine and provides a sample script to test its in a data synchronization scenario.
Background
Kafka is a general-purpose message broker featuring a distributed architecture. You can use the Kafka Connect component to read from and write to Kafka, and its plug-ins can be used to read from and write to a variety of data sources. Kafka Connect supports fault tolerance, restarting, logging, elastic scaling, and serialization and deserialization.

To make integrating Kafka and TDengine a simpler process, the TDengine Team developed the TDengine Kafka Connector as a Kafka Connect plug-in. The TDengine Kafka Connector consists of the TDengine Source Connector and TDengine Sink Connector. In this article, the TDengine Sink Connector is used to integrate Kafka with TDengine.

TDengine Sink Connector implementation
The TDengine Sink Connector synchronizes the data from specified Kafka topics to a TDengine time-series database (TSDB) in batches or in real time.

Before running the TDengine Sink Connector, you must create a properties file. For information about the properties file, see the official documentation.

The implementation of the TDengine Sink Connector is as follows:

The Kafka Connect framework starts a specified number of consumer threads.
These consumers simultaneously subscribe to data and deserialize the data according to the values of key.converter and value.converter set in the configuration file.
The Kafka Connect framework sends the deserialized data to a specified number of SinkTask instances.
SinkTask uses the schemaless write interface provided by TDengine to write data into the database.
In this process, the first three steps are implemented automatically by the Kafka Connect framework, and the TDengine Sink Connector performs the final step on its own.

Supported data formats
The TDengine Sink Connector writes data in schemaless mode. It supports the InfluxDB line format, OpenTSDB telnet format, and OpenTSDB JSON format. You can modify the value of the db.schemaless parameter to choose the format that you want to use. As an example, the following configuration enables InfluxDB line format:

db.schemaless=line
If the data in your Kafka deployment is already in one of the three formats mentioned, set value.converter to the built-in Kafka Connect string converter:

value.converter=org.apache.kafka.connect.storage.StringConverter
Otherwise, you must implement your own converter to process your data into one of the supported formats. For more information, see Converters.

In this implementation, Kafka Connect is acting as the consumer. Consumer behavior can therefore be controlled by modifying the Kafka Connect configuration.

Topic configuration
The topics to which the consumer subscribes are controlled by the topics parameter. For all other parameters, you can override the default configuration by adding the parameter to the properties file with each parameter having the consumer.override prefix. For example, the following line changes the maximum records per poll to 3000:

consumer.override.max.poll.records=3000
Thread configuration
In a Kafka Connect sink connector, each task is a consumer thread that receives data from the partitions of a topic. You can use the tasks.max parameter to control the maximum number of tasks and thereby the maximum number of threads. However, the number of tasks that are actually initiated is related to the number of topic partitions. For example, if you have ten partitions and the value of the tasks.max parameter is 5, each task will receive data from two partitions and keep track of the offsets of two partitions.

Note that if the value of the tasks.max parameter is larger than the number of partitions, the number of tasks that Kafka Connect initiates is equal to the number of partitions. The number of tasks is not related to the number of topics.

Procedure
Install Kafka.
Download and decompress the Kafka installation package.
wget https://dlcdn.apache.org/kafka/3.2.0/kafka_2.13-3.2.0.tgz
tar -xzf kafka_2.13-3.2.0.tgz
Edit the .bash_profile file and add the following text:
export KAFKA_HOME=/home/bding/kafka_2.13-3.2.0
export PATH=$PATH:$KAFKA_HOME/bin
Reload the bash profile.
source .bash_profile
Configure Kafka.
Edit the Kafka Connect properties file:
cd kafka_2.13-3.2.0/config/
vi connect-standalone.properties
Add the plug-in directory:
plugin.path=/home/bding/connectors
Edit the log configuration of Kafka Connect:
vi connect-log4j.properties
Set the logging level for the TDengine Sink Connector to DEBUG:
log4j.logger.com.taosdata.kafka.connect.sink=DEBUG
This modification is necessary because these logs are used to calculate the time spent in synchronizing data.
Compile and install the TDengine Sink Connector.
git clone [email protected]:taosdata/kafka-connect-tdengine.git
cd kafka-connect-tdengine
mvn clean package
unzip -d ~/connectors target/components/packages/taosdata-kafka-connect-tdengine-*.zip
Start the ZooKeeper and Kafka servers.
zookeeper-server-start.sh -daemon $KAFKA_HOME/config/zookeeper.properties
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
Create a topic.
kafka-topics.sh --create --topic meters --partitions 1 --bootstrap-server localhost:9092
Generate test data.
Save the following script as gen-data.py:
#!/usr/bin/python3
import random
import sys
topic = sys.argv[1]
count = int(sys.argv[2])
start_ts = 1648432611249000000
location = ["SanFrancisco", "LosAngeles", "SanDiego"]
for i in range(count):
ts = start_ts + i
row = f"topic,location=location[i % 3],groupid=2 current=random.random() * 10,voltage=random.randint(100, 300),phase=random.random() ts"
print(row)
Run gen-data.py:
python3 gen-data.py meters 10000 | kafka-console-producer.sh --broker-list localhost:9092 --topic meters
The script generates 10,000 data points in InfluxDB line format and adds them to the meters topic. Each data point has two label fields and three data fields.

Start Kafka Connect.
Save the following configuration as sink-test.properties:
name=TDengineSinkConnector connector.class=com.taosdata.kafka.connect.sink.TDengineSinkConnector
tasks.max=1
topics=meters
connection.url=jdbc:TAOS://127.0.0.1:6030
connection.user=root
connection.password=taosdata
connection.database=power
db.schemaless=line
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
Start Kafka Connect:
connect-standalone.sh -daemon $KAFKA_HOME/config/connect-standalone.properties sink-test.properties
Use the TDengine CLI to query the meters table in the power database to verify that there are 10,000 data points.
[bding@vm95 test]$ taos

Welcome to the TDengine shell from Linux, Client Version:2.6.0.4
Copyright (c) 2022 by TAOS Data, Inc. All rights reserved.
taos> select count(*) from power.meters;
count(*) |
========================
10000 |
TDengine Sink Connector performance testing
This performance test is similar to steps 4 through 7 in the previous section. Note the following:

The performance test script takes two arguments. The first is the number of partitions, and the second is the number of data points to generate.
The tasks.max parameter is set to the number of partitions. This controls the number of threads in each test.
The test database must be empty before the test begins.
After the test is complete, stop Kafka, ZooKeeper, and Kafka Connect manually.
In each test, data is first written to Kafka and then synchronized by Kafka Connect to TDengine. This ensures that the sink connector handles the entire pressure caused by the synchronization task. The total time required for synchronization is calculated from when the sink connector receives the first batch of data to when it receives the last batch of data.

To run the performance test, save the following script as run-test.sh:

#!/bin/bash
if [ $# -lt 2 ];then
echo "Usage: ./run-test.sh "
exit 0
fi
echo "---------------------------TEST STARTED---------------------------------------"
echo clean data and logs
taos -s "DROP DATABASE IF EXISTS power"
rm -rf /tmp/kafka-logs /tmp/zookeeper
rm -f $KAFKA_HOME/logs/connect.log
np=$1 # number of partitions
total=$2 # number of records
echo number of partitions is $np, number of recordes is $total.
echo start zookeeper
zookeeper-server-start.sh -daemon $KAFKA_HOME/config/zookeeper.properties
echo start kafka
sleep 3
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
sleep 5
echo create topic
kafka-topics.sh --create --topic meters --partitions $np --bootstrap-server localhost:9092
kafka-topics.sh --describe --topic meters --bootstrap-server localhost:9092
echo generate test data
python3 gen-data.py meters $total | kafka-console-producer.sh --broker-list localhost:9092 --topic meters

echo alter connector configuration setting tasks.max=$np
sed -i "s/tasks.max=.*/tasks.max=$np/" sink-test.properties
echo start kafka connect
connect-standalone.sh -daemon $KAFKA_HOME/config/connect-standalone.properties sink-test.properties

echo -e "e[1;31m open another console to monitor connect.log. press enter when no more data received.e[0m"
read

echo stop connect
jps | grep ConnectStandalone | awk 'print $1' | xargs kill
echo stop kafka server
kafka-server-stop.sh
echo stop zookeeper
zookeeper-server-stop.sh
# extract timestamps of receiving the first batch of data and the last batch of data
grep "records" $KAFKA_HOME/logs/connect.log | grep meters- > tmp.log
start_time=`cat tmp.log | grep -Eo "[0-9]4-[0-9]2-[0-9]2 [0-9]2:[0-9]2:[0-9]2,[0-9]3" | head -1`
stop_time=`cat tmp.log | grep -Eo "[0-9]4-[0-9]2-[0-9]2 [0-9]2:[0-9]2:[0-9]2,[0-9]3" | tail -1`

echo "--------------------------TEST FINISHED------------------------------------"
echo "| records | partitions | start time | stop time |"
echo "|---------|------------|------------|-----------|"
echo "| $total | $np | $start_time | $stop_time |"
You can then run the script and specify a number of partitions and data points. As an example, run the following command for a performance test with one partition and 1 million data points:

./run-test.sh 1 1000000
The test process is shown in the following figure.


Note that you must monitor the connect.log file in a separate console and press Enter once all data has been consumed. You can use the tail -f connect.log command to monitor progress:

[bding@vm95 ~]$ cd kafka_2.13-3.2.0/logs/
[bding@vm95 logs]$ tail -f connect.log
[2022-06-21 17:39:00,176] DEBUG [TDengineSinkConnector|task-0] Received 500 records. First record kafka coordinates:(meters-0-314496). Writing them to the database... (com.taosdata.kafka.connect.sink.TDengineSinkTask:101)
[2022-06-21 17:39:00,180] DEBUG [TDengineSinkConnector|task-0] Received 500 records. First record kafka coordinates:(meters-0-314996). Writing them to the database... (com.taosdata.kafka.connect.sink.TDengineSinkTask:101)
When new entries are no longer being written to the log file, this indicates that the data consumption has been completed.

The following table shows the data write performance determined by this test.

1 thread 3 thread 5 threads 10 threads
1 million 105,219 232,937 333,000 489,956
3 million 107,650 239,330 363,240 573,175
5 million 108,321 246,573 364,087 580,720
10 million 107,803 248,855 372,912 562,936
15 million 106,651 249,671 377,283 541,927
20 million 103,626 244,921 371,402 557,460

The data points indicate the average number of entries written per second for configurations with one, three, five, and ten threads.

Conclusion
From the preceding figure, we can see that given the same size data set, write speed increases with the number of threads.

With a single thread, 100,000 data points are written per second.
With five threads, the speed increases to 350,000 data points per second
With ten threads, the speed increases to 550,000 data points per second.
The write speed is relatively stable and is not clearly associated with the total size of the data set.

However, it can be seen that the performance improvement per thread declines as the number of threads increases. Going from one to ten threads only increases speed by a factor of five. This may be caused by uneven distribution of data among partitions. Some tasks take longer to complete than others, and this shift increases with the size of the data set.

As an example, if the data set is created with 10 million data points spread across 10 partitions, the distribution of data points per partition is as follows:

[bding@vm95 kafka-logs]$ du -h ./ -d 1
125M ./meters-8
149M ./meters-7
119M ./meters-9
138M ./meters-4
110M ./meters-3
158M ./meters-6
131M ./meters-5
105M ./meters-0
113M ./meters-2
99M ./meters-1
Another factor influencing multithreaded write speed is out-of-order data. Because this test allocates data randomly across partitions, only a single-partition configuration can result in strictly ordered data, which provides the highest performance. As the number of threads is raised, the degree to which the data is out of order increases.

For this reason, it is recommended that all data contained within a subtable is stored in the same Kafka partition.

Appendix
All source code and test results for this article have been uploaded to GitHub.
The test environment used in this article is described as follows:
OpenJDK 1.8.0
CentOS 7.9
64 GB of memory
16-core i7-10700 (x86-64, 2.90 GHz)
HDD storage
TDengine 2.6.0.4
Kafka 3.2

Leave a Reply

Your email address will not be published. Required fields are marked *