Work with Kafka in Unix/Linux

I would like to make a note (something like a cheat-sheet) with commands for working with Kafka. I will give examples of using commands ready example you will see the most of Kafka.

Setting the Kafka scripts in Unix/Linux

To use the scripts to work with Kafka, perform a small installation:

$ KAFKA_VERSION=2.12-2.2.1 && 
wget -q -O - https://www-us.apache.org/dist/kafka/2.2.1/kafka_${KAFKA_VERSION}.tgz | (cd /opt; tar-zxvf -) && cd /opt/kafka_${KAFKA_VERSION}

For ease of use, you can add PATH (in ~/.bashrc):

export KAFKA_VERSION=2.12-2.2.1
export KAFKA_HEAP_OPTS="-Xmx1g -Xms1g -XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M-XX:MinMetaspaceFreeRatio=50-XX:MaxMetaspaceFreeRatio=80"
export PATH="$PATH:/opt/kafka_${KAFKA_VERSION}/bin"

After this, it is necessary to update the file:

$ . ~/.bashrc

Or:

$ source ~/.bashrc

Useful reading:

Install Kafka on Unix/Linux

Get down to work!

Work with Kafka in Unix/Linux

List of topics:

$ kafka-topics.sh --list 
 --zookeeper zookeeper_host:zookeeper_host_port

Where:

  • zookeeper_host — Host sukapura, for example, this can be localhost, 192.168.13.113 and so on.
  • zookeeper_host_port Port sukapura, for example, the default is 2181-th port.

Also, you can use:

$ kafka-topics.sh --list 
 --zookeeper $(cat ZooKeeperList.txt)

Where:

  • ZooKeeperList.txt is the host list itself shkipera (zookeeper:port).

To obtain information on the topic:

$ kafka-topics.sh --describe 
 --zookeeper zookeeper_host:zookeeper_host_port 
 --topic test_topic

Where:

  • zookeeper_host — Host sukapura, for example, this can be localhost, 192.168.13.113 and so on.
  • zookeeper_host_port Port sukapura, for example, the default is 2181-th port.

Creating a topic:

$ kafka-topics.sh --create 
 --zookeeper $(cat ZooKeeperList.txt) 
 --topic test_topic 
 --replication-factor 2 
 --partitions 3

Here is a small script to create topics:

ENV=nonprod


# 1: A name for the topic
# 2: A replication factor
# 3: A partition
KAFKA_TOPIC_LIST=(
"MyKafkaTopic:3:1"
"MyKafkaTopic2:3:1"
)


for topic in "${KAFKA_TOPIC_LIST[@]}" ; do
 a_topic=$(echo "$topic"| cut-d ":" -f1)
 a_replication_factor=$(echo "$topic"| cut-d ":" -f2)
 a_partition=$(echo "$topic"| cut-d ":" -f3)

 kafka-topics.sh --create 
 --zookeeper $(cat /opt/kafka_2.12-2.2.1/mskkafkatest-${ENV}_ZooKeeperList.txt) 
 --topic $a_topic 
 --replication-factor $a_replication_factor 
 --partitions $a_partition
done;

Write into the topic of mesaji:

$ kafka-console-producer.sh 
--broker-list $(cat BrokersList.txt) 
--producer.config producer.properties 
--topic topic_name_here

Read Topeka of mesaji:

$ kafka-console-consumer.sh 
 --bootstrap-server $(cat BrokersList.txt) 
 --consumer.config producer.properties 
 --topic topic_name_here 
 --from-beginning

To remove a topic:

$ kafka-topics.sh --delete 
 --zookeeper zookeeper_host:zookeeper_host_port 
 --topic test_topic

Where:

  • zookeeper_host — Host sukapura, for example, this can be localhost, 192.168.13.113 and so on.
  • zookeeper_host_port Port sukapura, for example, the default is 2181-th port.

So, I wrote a small script to uninstall:

#--------------
# Delete topics
#--------------
ENV=nonprod


KAFKA_TOPIC_LIST=(
"MyKafkaTopic2"
"test"
"test2"
)


for topic in "${KAFKA_TOPIC_LIST[@]}" ; do
 echo "Deleting ${topic} ....."
 kafka-topics.sh --delete 
 --zookeeper $(cat ZooKeeperList.txt) 
 -topic $topic
done;

To add a partition:

$ kafka-topics.sh --alter 
 --zookeeper zookeeper_host:zookeeper_host_port 
 --topic test_topic 
 --partitions 3

Where:

  • zookeeper_host — Host sukapura, for example, this can be localhost, 192.168.13.113 and so on.
  • zookeeper_host_port Port sukapura, for example, the default is 2181-th port.

View the configuration for a topic:

$ kafka-configs.sh --describe 
 --zookeeper zookeeper_host:zookeeper_host_port 
 --entity-type topics 
 --entity-name test_topic

Where:

  • zookeeper_host — Host sukapura, for example, this can be localhost, 192.168.13.113 and so on.
  • zookeeper_host_port Port sukapura, for example, the default is 2181-th port.

To set the storage time (not recommended) records in Topeka:

$ kafka-topics.sh --alter 
 --zookeeper zookeeper_host:zookeeper_host_port 
 --topic test_topic 
 --config retention.ms=1000

Where:

  • zookeeper_host — Host sukapura, for example, this can be localhost, 192.168.13.113 and so on.
  • zookeeper_host_port Port sukapura, for example, the default is 2181-th port.

To set the time (modern method) records in Topeka:

$ kafka-configs.sh --alter 
 --zookeeper zookeeper_host:zookeeper_host_port 
 --entity-type topics 
 --entity-name test_topic 
 --add-config retention.ms=1000

Where:

  • zookeeper_host — Host sukapura, for example, this can be localhost, 192.168.13.113 and so on.
  • zookeeper_host_port Port sukapura, for example, the default is 2181-th port.

If you need to delete all the messages in the thread, you can use this property to set an expiration time. First set the retention time to very low value (1000 MS), wait a few seconds and then return the storage back to its previous value.

Note: the retention Time defaults to 24 hours (86400000 milliseconds).

You can get everything back as it was:

$ kafka-topics.sh --alter 
 --zookeeper zookeeper_host:zookeeper_host_port 
 --topic mytopic 
 --delete-config retention.ms

Show the message list for a specific topic:

$ kafka-console-consumer.sh 
 --zookeeper zookeeper_host:zookeeper_host_port 
 --topic test_topic 
 --from-beginning

Where:

  • zookeeper_host — Host sukapura, for example, this can be localhost, 192.168.13.113 and so on.
  • zookeeper_host_port Port sukapura, for example, the default is 2181-th port.

Or:

$ kafka-console-consumer.sh 
 --bootstrap-server $(cat BrokersList.txt) 
 --consumer.config producer.properties 
 --topic jive_invoices_staging 
 --from-beginning

To view offset positions for consumer groups (for each of the partitions):

$ kafka-consumer-offset-checker.sh 
 --zookeeper zookeeper_host:zookeeper_host_port 
 --group group_ID_here 
 --topic your_topic_here

Where:

  • zookeeper_host — Host sukapura, for example, this can be localhost, 192.168.13.113 and so on.
  • zookeeper_host_port Port sukapura, for example, the default is 2181-th port.

To restart (reset the offset to 0):

$ kafka-streams-application-reset.sh 
 --input-topics your_topic_here 
 --application-id group_ID_here 
 --bootstrap-servers bootstrap_host:bootstrap_port

Where:

  • your_topic_here Topic.
  • group_ID_here — group ID.
  • bootstrap_host Host.
  • bootstrap_port Port.

To the earliest offset in the topic:

$ kafka-run-class.sh kafka.tools.GetOffsetShell 
 --broker-list localhost:9092 
 --topic mytopic 
 --time -2

To obtain the latest offset in Topeka:

$ kafka-run-class.sh kafka.tools.GetOffsetShell 
 --broker-list localhost:9092 
 --topic mytopic 
 --time -1

To offset the consumer (consumer offsets) for Topeka:

$ kafka-consumer-offset-checker.sh 
 --zookeeper=zookeeper_host:zookeeper_host_port 
 --topic=mytopic 
 --group=my_consumer_group

Where:

  • zookeeper_host — Host sukapura, for example, this can be localhost, 192.168.13.113 and so on.
  • zookeeper_host_port Port sukapura, for example, the default is 2181-th port.

Read from __consumer_offsets:

$ kafka-console-consumer.sh 
 --consumer.config config/consumer.properties 
 --from-beginning 
 --topic __consumer_offsets 
 --zookeeper zookeeper_host:zookeeper_host_port 
 --formatter "kafka.coordinator.GroupMetadataManager$OffsetsMessageFormatter"

Where:

  • zookeeper_host — Host sukapura, for example, this can be localhost, 192.168.13.113 and so on.
  • zookeeper_host_port Port sukapura, for example, the default is 2181-th port.

List consumer groups:

$ kafka-consumer-groups.sh --list 
 --zookeeper zookeeper_host:zookeeper_host_port 

Or:

kafka-consumer-groups.sh --list 
 --new-consumer 
 --bootstrap-server localhost:9092 

View information about consumer group:

$ kafka-consumer-groups.sh --describe 
 --zookeeper zookeeper_host:zookeeper_host_port 
 --group group_name

Where:

  • zookeeper_host — Host sukapura, for example, this can be localhost, 192.168.13.113 and so on.
  • zookeeper_host_port Port sukapura, for example, the default is 2181-th port.

The look of messages in the topic:

$ kafkacat -C-b localhost:9092 -t mytopic -p 0 -o -5-e

Or:

$ kafka-console-consumer.sh 
 --bootstrap-server $(cat BrokersList.txt) 
 --consumer.config producer.properties 
 --topic test2 
 --from-beginning

To write to a topic:

$ kafka-console-producer.sh 
 --broker-list $(cat BrokersList.txt) 
 --producer.config producer.properties 
 --topic test2

Run the Zookeeper shell:

$ zookeeper-shell.sh 
 zookeeper_host:zookeeper_host_port

To conduct performance tests:

$ kafka-producer-perf-test.sh 
 --topic topic_here 
 --num-records 1 
 --record-size 2048 
 throughput ---1 
 --producer.config producer.properties 
 --producer-props 
 acks=1 
 buffer.memory=67108864 
 compression.type=none 
 batch.size=8196

That’s all the article “Working with Kafka in Unix/Linux” is completed.

Source: linux-notes.org

(Visited 20 times, 1 visits today)