https://www.huangdf.xyz/categories/study-notes
南风
南风
发布于 2025-07-18 / 0 阅读
0
0

kafka(3.5.1版本)常用指令

本文是在这篇文章(kafka命令大全 - OrcHome)的基础上修改而来,因为版本问题有些指令已经过时所以内容和原文会有一些出入。

由于写文章的时候是在windows系统上测试指令的,所以能保证kafka3.5.1版本的windows指令均可用,而linux版本的指令不太能保证都能使用,还需要进一步测试。

命令都可以通过--help来查看有那些可配置项,例如:

bin/windows/kafka-topics.bat --help

可以得到如下提示:

This tool helps to create, delete, describe, or change a topic.
Option                                   Description
------                                   -----------
--alter                                  Alter the number of partitions and
                                           replica assignment. Update the
                                           configuration of an existing topic
                                           via --alter is no longer supported
                                           here (the kafka-configs CLI supports
                                           altering topic configs with a --
                                           bootstrap-server option).
--at-min-isr-partitions                  if set when describing topics, only
                                           show partitions whose isr count is
                                           equal to the configured minimum.
--bootstrap-server <String: server to    REQUIRED: The Kafka server to connect
  connect to>                              to.
--command-config <String: command        Property file containing configs to be
  config property file>                    passed to Admin Client. This is used
                                           only with --bootstrap-server option
                                           for describing and altering broker
                                           configs.
--config <String: name=value>            A topic configuration override for the
                                           topic being created or altered. The
                                           following is a list of valid
                                           configurations:
                                                cleanup.policy
                                                compression.type
                                                delete.retention.ms
                                                file.delete.delay.ms
                                                flush.messages
                                                flush.ms
                                                follower.replication.throttled.
                                           replicas
                                                index.interval.bytes
                                                leader.replication.throttled.replicas
                                                local.retention.bytes
                                                local.retention.ms
                                                max.compaction.lag.ms
                                                max.message.bytes
                                                message.downconversion.enable
                                                message.format.version
                                                message.timestamp.difference.max.ms
                                                message.timestamp.type
                                                min.cleanable.dirty.ratio
                                                min.compaction.lag.ms
                                                min.insync.replicas
                                                preallocate
                                                remote.storage.enable
                                                retention.bytes
                                                retention.ms
                                                segment.bytes
                                                segment.index.bytes
                                                segment.jitter.ms
                                                segment.ms
                                                unclean.leader.election.enable
                                         See the Kafka documentation for full
                                           details on the topic configs. It is
                                           supported only in combination with --
                                           create if --bootstrap-server option
                                           is used (the kafka-configs CLI
                                           supports altering topic configs with
                                           a --bootstrap-server option).
--create                                 Create a new topic.
--delete                                 Delete a topic
--delete-config <String: name>           A topic configuration override to be
                                           removed for an existing topic (see
                                           the list of configurations under the
                                           --config option). Not supported with
                                           the --bootstrap-server option.
--describe                               List details for the given topics.
--exclude-internal                       exclude internal topics when running
                                           list or describe command. The
                                           internal topics will be listed by
                                           default
--help                                   Print usage information.
--if-exists                              if set when altering or deleting or
                                           describing topics, the action will
                                           only execute if the topic exists.
--if-not-exists                          if set when creating topics, the
                                           action will only execute if the
                                           topic does not already exist.
--list                                   List all available topics.
--partitions <Integer: # of partitions>  The number of partitions for the topic
                                           being created or altered (WARNING:
                                           If partitions are increased for a
                                           topic that has a key, the partition
                                           logic or ordering of the messages
                                           will be affected). If not supplied
                                           for create, defaults to the cluster
                                           default.
--replica-assignment <String:            A list of manual partition-to-broker
  broker_id_for_part1_replica1 :           assignments for the topic being
  broker_id_for_part1_replica2 ,           created or altered.
  broker_id_for_part2_replica1 :
  broker_id_for_part2_replica2 , ...>
--replication-factor <Integer:           The replication factor for each
  replication factor>                      partition in the topic being
                                           created. If not supplied, defaults
                                           to the cluster default.
--topic <String: topic>                  The topic to create, alter, describe
                                           or delete. It also accepts a regular
                                           expression, except for --create
                                           option. Put topic name in double
                                           quotes and use the '\' prefix to
                                           escape regular expression symbols; e.
                                           g. "test\.topic".
--topic-id <String: topic-id>            The topic-id to describe.This is used
                                           only with --bootstrap-server option
                                           for describing topics.
--topics-with-overrides                  if set when describing topics, only
                                           show topics that have overridden
                                           configs
--unavailable-partitions                 if set when describing topics, only
                                           show partitions whose leader is not
                                           available
--under-min-isr-partitions               if set when describing topics, only
                                           show partitions whose isr count is
                                           less than the configured minimum.
--under-replicated-partitions            if set when describing topics, only
                                           show under replicated partitions
--version                                Display Kafka version.

第一列是配置项,第二列是配置项的解释,例如执行:

## 上面指令的最后一个
bin/windows/kafka-topics.bat --version

## 会得到(版本是3.5.1)
PS D:\javareRource\kafka> bin/windows/kafka-topics.bat --version
3.5.1

其他的指令同理,都是可以通过--help来查看可以怎么配置指令。

1、管理类

## 创建topic(4个分区,2个副本)
bin/kafka-topics.sh --create --bootstrap-server broker_host:port --replication-factor 2 --partitions 4 --topic test
## windows指令
bin/windows/kafka-topics.bat --create --bootstrap-server broker_host:port --replication-factor 2 --partitions 4 --topic test

## 分区扩容
bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic topic1 --partitions 2
## windows指令
bin/kafka-topics.bat --bootstrap-server broker_host:port --alter --topic topic1 --partitions 2

## 删除topic
bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic test
## widows指令
bin/windows/kafka-topics.bat --bootstrap-server broker_host:port --delete --topic test

有个需要注意的点,备份数量必须小于等于broker的数量,否则会创建失败。分区数量则不受限制,如果是有多个broker的话,分区会按照顺序分配。这样的好处是能够防止这个broker失效后它的数据可以从别的地方读取,不至于无法读取。

如果是将不同的broker放在不同的服务器,增加了一定风险的同时,也更加可靠了,当服务器宕机后,可以从其他服务器获取数据。

在windows删除topic会导致broker不可用,重启无效,解决方案是删除broker的log.dir指定的全部文件。同时删除zookeeper安装目录下data里面的version文件夹。

2、查询类

## 查询集群描述
bin/kafka-topics.sh --describe --bootstrap-server broker_host:port
## windows指令
bin/windows/kafka-topics.bat --describe --bootstrap-server broker_host:port

## 查询集群描述(新)
bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic test --describe
##windows指令
bin/windows/kafka-topics.bat --bootstrap-server broker_host:port --topic test --describe

## topic列表查询
bin/kafka-topics.sh --list --bootstrap-server broker_host:port
## windows指令
bin/kafka-topics.sh --list --bootstrap-server broker_host:port

## 消费者列表查询
bin/kafka-consumer-groups.sh --bootstrap-server broker_host:port --list
##windows指令
bin/windows/kafka-consumer-groups.bat --bootstrap-server broker_host:port --list

## 显示某个消费组的消费详情
bin/kafka-consumer-groups.sh --bootstrap-server broker_host:port --describe --group my-group
## windows指令
bin/windows/kafka-consumer-groups.bat --bootstrap-server broker_host:port --describe --group demoConsumerGroup

3、发送和消费


## 创建生产者
bin/kafka-console-producer.sh --broker-list broker_host:port --topic test
## windows指令
bin/windows/kafka-console-producer.bat --broker-list broker_host:port --topic test

## 消费者
bin/kafka-console-consumer.sh --bootstrap-server broker_host:port --topic test --from-beginning --consumer.config config/consumer.properties
## windows指令
bin/windows/kafka-console-consumer.bat --bootstrap-server broker_host:port --topic test --from-beginning --consumer.config config/consumer.properties

## kafka-verifiable-consumer.sh(消费者事件,例如:offset提交等,windows没有对应的指令)
bin/kafka-verifiable-consumer.sh --broker-list localhost:9092 --topic test --group-id groupName

broker-list这个指令已经废弃,但还能用。通过--help指令可以看到,但是仍然可以用,新的指令是--bootstrap-server。根据dk的回答,设定多个ip和端口的话,可以实现类似集群的效果,当一个broker不可达时,能够切换到另一个broker。

--bootstrap-server <String: server to    REQUIRED unless --broker-list
  connect to>                              (deprecated) is specified. The server
                                           (s) to connect to. The broker list
                                           string in the form HOST1:PORT1,HOST2:
                                           PORT2.
--broker-list <String: broker-list>      DEPRECATED, use --bootstrap-server
                                           instead; ignored if --bootstrap-
                                           server is specified.  The broker
                                           list string in the form HOST1:PORT1,
                                           HOST2:PORT2.

4、切换leader

在手动切换leader之前,需要先创建一个支持切换leader的topic,下面的指令是创建一个名为leader-test的拥有4个分区,每个分区有两个备份的topic。

 bin/windows/kafka-topics.bat --create --bootstrap-server broker_host:port--replication-factor 2 --partitions 4 --topic leader-test
## 切换leader
bin/kafka-preferred-replica-election.sh --bootstrap-server broker_host:port --election-type preferred --topic leader-test --partition 0

## windows preferred
bin/windows/kafka-leader-election.bat --bootstrap-server broker_host:port --election-type preferred --topic leader-test --partition 0
## 响应结果
Valid replica already elected for partitions leader-test-0

## windows unclean
bin/windows/kafka-leader-election.bat --bootstrap-server broker_host:port--election-type unclean --topic leader-test --partition 3
Valid replica already elected for partitions leader-test-3

由于上面创建的topic拥有4个分区,所以切换leader的参数中--partition后面的跟随的数字可以是0-3任意一个,超出这个范围就会报错。

同样的根据--help的反馈,--election-type的选择可以是PREFERRED或者UNCLEAN,在官方解释中他们的区别是:

Type of election to attempt. Possible values are "preferred" for preferred leader election or "unclean" for unclean leader election. If preferred election is selection, the election is only performed if the  current leader is not the preferred leader for the topic partition. If unclean election is selected, the election is only performed if there are no leader for the topic partition. REQUIRED.

当前分区有leader用preferred,如果指定的分区不是原来的leader,将执行切换。如果当前topic没有leader则使用unclean,将该分区设定为leader。

5、kafka压测命令


bin/kafka-producer-perf-test.sh --topic test --num-records 100 --record-size 1 --throughput 100  --producer-props bootstrap.servers=broker_host:port

bin/windows/kafka-producer-perf-test.bat --topic test --num-records 100 --record-size 1 --throughput 100  --producer-props bootstrap.servers=broker_host:9092

压测结果如下:

## windows指令
bin/windows/kafka-producer-perf-test.bat --topic test --num-records 100 --record-size 1 --throughput 100  --producer-props bootstrap.servers=broker_host:port
## 响应结果
100 records sent, 99.403579 records/sec (0.00 MB/sec), 11.71 ms avg latency, 245.00 ms max latency, 0 ms 50th, 41 ms 95th, 245 ms 99th, 245 ms 99.9th.


评论