Kafka data synchronization in Unix/Linux

I have the following problem:

It is necessary to synchronize data between MSK AWS and on-premises Kafka or Vice versa. I found online several solutions. Now let’s talk about them:

  • The use of Kafka Mirror Maker to synchronize
  • The use of a programming language for Kafka sync, and write code for this task (e.g. Python or Java). Might consider the option of writing code in Python.
  • Other options that I can find or come up with implementation (will be late).

Proceed to the solutions.

The use of Kafka Mirror Maker to synchronize

Mirroring feature Kafka (Mirror-ing) allows you to maintain a copy of an existing Kafka cluster (replicate). In the figure below, shows how to use MirrorMaker to mirror Kafka cluster:

The tool uses a Kafka consumer to receive messages from the source cluster, and re-writes these messages to the target cluster using the embedded Kafka producer-and.

How to set mirror ring?

The first thing you need to do is download the archive with the required scripts, I described how to do this in his article:

Work with Kafka in Unix/Linux

It may be useful to read:

Install Kafka on Unix/Linux

To configure replication is easy — just start the process(s) with mirrormaker-Ohm.

Data sync on-premise to AWS MSK in Unix/Linux

For example, if you need to sezincote data from on-premise to AWS MSK, you need to run MM like this:

$ ENV="nonprod" && 
kafka-mirror-maker.sh 
 --consumer.config /opt/kafka_2.12-2.2.1/kafka-onpremise_${ENV}_consumer.properties 
 --num.streams 3 
 --producer.config /opt/kafka_2.12-2.2.1/mskkafkatest-cluster-${ENV}_producer.properties 
 --whitelist="topic_1, topic_2" 

Data synchronization MSK AWS and on-premises Kafka in Unix/Linux

If you want to sezincote MSK data from AWS to on-premise, then you need to run MM like this:

$ ENV="nonprod" && 
kafka-mirror-maker.sh 
 --consumer.config /opt/kafka_2.12-2.2.1/mskkafkatest-cluster-${ENV}_consumer.properties 
 --num.streams 3 
 --producer.config /opt/kafka_2.12-2.2.1/kafka-onpremise_${ENV}_producer.properties 
 --whitelist="topic_1, topic_2"

Where:

  • ENV — ENV Is used.
  • —consumer.config /opt/kafka_2.12-2.2.1/mskkafkatest-cluster-${ENV}_consumer.properties — the configuration File for the consumer to AWS MSK.
  • —consumer.config /opt/kafka_2.12-2.2.1/kafka-onpremise_${ENV}_consumer.properties — the configuration File for the consumer for on-premise Kafka.
  • —num.streams 3 — Set the number of threads for the sink.
  • —producer.config /opt/kafka_2.12-2.2.1/mskkafkatest-cluster-${ENV}_producer.properties — the configuration File for the producer to AWS MSK.
  • —producer.config /opt/kafka_2.12-2.2.1/kafka-onpremise_${ENV}_producer.properties — the configuration File for the producer to on-premise Kafka.
  • —whitelist=”topic_1, topic_2″ List of topics for replication.

Consider configs for MM. File mskkafkatest-cluster-${ENV}_consumer.properties as follows:

# cat << EOF > ${KAFKA_PATH}/mskkafkatest-cluster-${ENV}_consumer.properties
bootstrap.servers=${AWS_MSK_BROKERS}
exclude.internal.topics=true
client.id=mirror_maker_consumer
group.id=mirror_maker_consumer
auto.offset.reset=earliest
auto.commit.enabled=false
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
security.protocol=SSL
ssl.truststore.location=${KAFKA_PATH}/mskkafkatest-cluster-${ENV}.client.truststore.jks
ssl.keystore.location=${KAFKA_PATH}/mskkafkatest-cluster-${ENV}.client.keystore.jks
ssl.keystore.password=${STOREPASS}
ssl.key.password=${KEYPASS}
ssl.protocol=TLSv1.2
ssl.enabled.protocols=TLSv1.2
ssl.endpoint.identification.algorithm=HTTPS
EOF

File kafka-onpremise_${ENV}_consumer.properties as follows:

# cat << EOF > ${KAFKA_PATH}/kafka-onpremise_nonprod_consumer.properties
bootstrap.servers=${KAFKA_ONPREMISE_BROKERS}
exclude.internal.topics=true
client.id=mirror_maker_consumer
group.id=mirror_maker_consumer
auto.offset.reset=earliest
auto.commit.enabled=false
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
EOF

File mskkafkatest-cluster-${ENV}_producer.properties as follows:

# cat << EOF > ${KAFKA_PATH}/mskkafkatest-cluster-${ENV}_producer.properties
bootstrap.servers=${AWS_MSK_BROKERS}
acks=-1
batch.size=8196
client.id=mirror_maker_producer
retries=Int.MaxValue
bock.on.buffer.full=true
max.in.flight.requests.per.connection=1
security.protocol=SSL
ssl.truststore.location=${KAFKA_PATH}/mskkafkatest-cluster-${ENV}.client.truststore.jks
ssl.keystore.location=${KAFKA_PATH}/mskkafkatest-cluster-${ENV}.client.keystore.jks
ssl.keystore.password=${STOREPASS}
ssl.key.password=${KEYPASS}
ssl.protocol=TLSv1.2
ssl.enabled.protocols=TLSv1.2
ssl.endpoint.identification.algorithm=HTTPS
EOF

File kafka-onpremise_${ENV}_producer.properties as follows:

# cat << EOF > ${KAFKA_PATH}/kafka-onpremise_nonprod_producer.properties
bootstrap.servers=${KAFKA_ONPREMISE_BROKERS}
acks=-1
batch.size=8196
client.id=mirror_maker_producer
retries=Int.MaxValue
bock.on.buffer.full=true
max.in.flight.requests.per.connection=1
EOF

Not very difficult to start. The developers recommend to run MirrorMaker on the target cluster because latency will be less.

You can use other parameters to run, for example:

  • —blacklist — select a topic sheet which is to block ( not to sinkat).

Others will add later, if you need them. Used AWS EC2 machine with t2.xlarge the shapefiles. Ideally — to make the Helm chart with everything you need to deploy this solution in Kubernetes. But this is a little late.

Check the operation of the mirror-and

For example, if you run the following command:

# kafka-consumer-offset-checker.sh 
 --group KafkaMirror 
 --zkconnect dc1-zookeeper:2181 
 --topic test-topic
Group Topic Pid Offset logSize Lag Owner
KafkaMirror test-topic 0 5 5 0 none
KafkaMirror test-topic 1 3 4 1 none
KafkaMirror test-topic 2 6 9 3 none

The validation tool is useful for evaluating how well executed the mirroring between clusters. If topic is not specified, it will display information about all the topic in this consumer group.

Found online start MM another way:

# kafka-run-class.sh kafka.tools.MirrorMaker 
 --consumer.config sourceCluster1Consumer.config 
 --consumer.config sourceCluster2Consumer.config 
 --num.streams 2 
 --producer.config targetClusterProducer.config 
 --whitelist=".*"

Tested, working fine.

Performance tuning MirrorMaker

What portunity?

The increase in throughput with high latency

I was faced with three main problems affecting throughput: bad compression, insufficient number of packets and low concurrency.

Change this parameter to:

linger.ms = 15000

To enable compression

Smaller messages are sent faster than the large, so reducing the size of large messages, as a rule, accelerates their delivery. It also saves on bandwidth. It is important to note that the compression is more efficient for large messages, because there are more opportunities to find duplicate chunks. Change this parameter:

compression.type = gzip

The default value is none. The result was a significant decrease in the number of bytes transmitted from MirrorMaker to another Kafka cluster. I also reduced the amount of disk space required to store the messages, because Kafka keeps them in their compressed packages.

Exhibit batches for messages

Initially, I have increased the batch.size MirrorMaker with default values from 16384 up to 1000000 bytes, which is the size the default theme max.message.bytes. The problem with this approach is that Kafka populates a batch of messages not based on the total size of the uncompressed message, but rather on the estimated compressed message size. So it can happen that data compression is sometimes higher than the calculated one, which leads to the fact that the size of the compressed packet is larger than the partition size max.message.bytes. This leads to the fact that a batch of messages will be rejected by broker-Ohm. When this happens, the broker logs the exception kafka.common.MessageSizeTooLargeException. MirrorMaker will either reject the message or it will stop working depending on how it is configured.

The first part of the decision was to ensure that the producer MirrorMaker uses max.request.size = max topic.message.bytes. In addition, producer max.request.size = broker message.max.bytes. This ensures that regular (not packaged) messages read from the final cluster of Kafka, can be successfully sent to the cluster aggregator Kafka.

The second part of the solution is to provide batch.size producer-a <“expected compression ratio” * max.request.size. Producer in Kafka will dynamically update the expected compression ratio based on compression ratios of previous messages. Thus, to avoid compression of the compressed package in excess of max.message.bytes, select batch.size, which is smaller than the max.message.bytes * “maximum compression”. Thus, indeed a conservative value would be 20: 1 or 0.05. It depends on the input messages, but the theoretical maximum may be 1000: 1, and in this moment you should doubt the value of sent data, because they are almost completely redundant. Producer Kafka reports the average compression ratio, which can be a good starting point, but most accurate will be a full histogram of all parties. In my case I used (0,05 * batch.size = 50000) as we reflect various themes and do not have direct control over the content of the message:

batch.size = 50000

Set the buffer memory for all sections (partitions)

The producer collects Kafka messages into packets in a common memory block. The unit is divided into pieces, each of which has a size of batch.size. When messages are sent to the producer, they are added to a new package or an existing one if it is. If the number of sections on the side of the aggregate cluster is much larger than the number of packets that can fit in the buffer, then call the producer.send() will block until one of them becomes available. This can have a significant negative impact on performance:

The solution was to set the producer buffer.size = “the total number of aggregated cluster partitions” * batch.size. This leads to the fact that the process MirrorMaker produces a large static chunk of memory, but the consumer threads are not blocked waiting for space in the buffer and the producer can use the data as soon as they become available. The manufacturer may compress and send the full party that maximizes the total throughput:

The result will look like:

Can photounit some settings to MM worked quickly, for example, by MirrorMaker-producer-have to add the following settings:

batch.size = 50000
buffer.memory = 2000000000
compression.type = gzip
linger.ms = 15000
max.request.size = 1000000

That’s all.

The use of a programming language for Kafka synchronization

Until the examples, because I don’t want to reinvent the wheel. But theoretically ( and practically the same), it is possible to write a utility which will read from a single stream/topic and write in another.

For Python there are some ready-made lib, you can use:

  • kafka-python
  • aiokafka
  • pykafka
  • confluent-kafka-python

That’s all the article “data Synchronization MSK AWS and on-premises Kafka on Unix/Linux” is completed.

Source: linux-notes.org

(Visited 21 times, 1 visits today)