本文是在这篇文章(kafka命令大全 - OrcHome)的基础上修改而来,因为版本问题有些指令已经过时所以内容和原文会有一些出入。
由于写文章的时候是在windows系统上测试指令的,所以能保证kafka3.5.1版本的windows指令均可用,而linux版本的指令不太能保证都能使用,还需要进一步测试。
命令都可以通过--help来查看有那些可配置项,例如:
bin/windows/kafka-topics.bat --help
可以得到如下提示:
This tool helps to create, delete, describe, or change a topic.
Option Description
------ -----------
--alter Alter the number of partitions and
replica assignment. Update the
configuration of an existing topic
via --alter is no longer supported
here (the kafka-configs CLI supports
altering topic configs with a --
bootstrap-server option).
--at-min-isr-partitions if set when describing topics, only
show partitions whose isr count is
equal to the configured minimum.
--bootstrap-server <String: server to REQUIRED: The Kafka server to connect
connect to> to.
--command-config <String: command Property file containing configs to be
config property file> passed to Admin Client. This is used
only with --bootstrap-server option
for describing and altering broker
configs.
--config <String: name=value> A topic configuration override for the
topic being created or altered. The
following is a list of valid
configurations:
cleanup.policy
compression.type
delete.retention.ms
file.delete.delay.ms
flush.messages
flush.ms
follower.replication.throttled.
replicas
index.interval.bytes
leader.replication.throttled.replicas
local.retention.bytes
local.retention.ms
max.compaction.lag.ms
max.message.bytes
message.downconversion.enable
message.format.version
message.timestamp.difference.max.ms
message.timestamp.type
min.cleanable.dirty.ratio
min.compaction.lag.ms
min.insync.replicas
preallocate
remote.storage.enable
retention.bytes
retention.ms
segment.bytes
segment.index.bytes
segment.jitter.ms
segment.ms
unclean.leader.election.enable
See the Kafka documentation for full
details on the topic configs. It is
supported only in combination with --
create if --bootstrap-server option
is used (the kafka-configs CLI
supports altering topic configs with
a --bootstrap-server option).
--create Create a new topic.
--delete Delete a topic
--delete-config <String: name> A topic configuration override to be
removed for an existing topic (see
the list of configurations under the
--config option). Not supported with
the --bootstrap-server option.
--describe List details for the given topics.
--exclude-internal exclude internal topics when running
list or describe command. The
internal topics will be listed by
default
--help Print usage information.
--if-exists if set when altering or deleting or
describing topics, the action will
only execute if the topic exists.
--if-not-exists if set when creating topics, the
action will only execute if the
topic does not already exist.
--list List all available topics.
--partitions <Integer: # of partitions> The number of partitions for the topic
being created or altered (WARNING:
If partitions are increased for a
topic that has a key, the partition
logic or ordering of the messages
will be affected). If not supplied
for create, defaults to the cluster
default.
--replica-assignment <String: A list of manual partition-to-broker
broker_id_for_part1_replica1 : assignments for the topic being
broker_id_for_part1_replica2 , created or altered.
broker_id_for_part2_replica1 :
broker_id_for_part2_replica2 , ...>
--replication-factor <Integer: The replication factor for each
replication factor> partition in the topic being
created. If not supplied, defaults
to the cluster default.
--topic <String: topic> The topic to create, alter, describe
or delete. It also accepts a regular
expression, except for --create
option. Put topic name in double
quotes and use the '\' prefix to
escape regular expression symbols; e.
g. "test\.topic".
--topic-id <String: topic-id> The topic-id to describe.This is used
only with --bootstrap-server option
for describing topics.
--topics-with-overrides if set when describing topics, only
show topics that have overridden
configs
--unavailable-partitions if set when describing topics, only
show partitions whose leader is not
available
--under-min-isr-partitions if set when describing topics, only
show partitions whose isr count is
less than the configured minimum.
--under-replicated-partitions if set when describing topics, only
show under replicated partitions
--version Display Kafka version.
第一列是配置项,第二列是配置项的解释,例如执行:
## 上面指令的最后一个
bin/windows/kafka-topics.bat --version
## 会得到(版本是3.5.1)
PS D:\javareRource\kafka> bin/windows/kafka-topics.bat --version
3.5.1
其他的指令同理,都是可以通过--help来查看可以怎么配置指令。
1、管理类
## 创建topic(4个分区,2个副本)
bin/kafka-topics.sh --create --bootstrap-server broker_host:port --replication-factor 2 --partitions 4 --topic test
## windows指令
bin/windows/kafka-topics.bat --create --bootstrap-server broker_host:port --replication-factor 2 --partitions 4 --topic test
## 分区扩容
bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic topic1 --partitions 2
## windows指令
bin/kafka-topics.bat --bootstrap-server broker_host:port --alter --topic topic1 --partitions 2
## 删除topic
bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic test
## widows指令
bin/windows/kafka-topics.bat --bootstrap-server broker_host:port --delete --topic test
有个需要注意的点,备份数量必须小于等于broker的数量,否则会创建失败。分区数量则不受限制,如果是有多个broker的话,分区会按照顺序分配。这样的好处是能够防止这个broker失效后它的数据可以从别的地方读取,不至于无法读取。
如果是将不同的broker放在不同的服务器,增加了一定风险的同时,也更加可靠了,当服务器宕机后,可以从其他服务器获取数据。
在windows删除topic会导致broker不可用,重启无效,解决方案是删除broker的log.dir指定的全部文件。同时删除zookeeper安装目录下data里面的version文件夹。
2、查询类
## 查询集群描述
bin/kafka-topics.sh --describe --bootstrap-server broker_host:port
## windows指令
bin/windows/kafka-topics.bat --describe --bootstrap-server broker_host:port
## 查询集群描述(新)
bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic test --describe
##windows指令
bin/windows/kafka-topics.bat --bootstrap-server broker_host:port --topic test --describe
## topic列表查询
bin/kafka-topics.sh --list --bootstrap-server broker_host:port
## windows指令
bin/kafka-topics.sh --list --bootstrap-server broker_host:port
## 消费者列表查询
bin/kafka-consumer-groups.sh --bootstrap-server broker_host:port --list
##windows指令
bin/windows/kafka-consumer-groups.bat --bootstrap-server broker_host:port --list
## 显示某个消费组的消费详情
bin/kafka-consumer-groups.sh --bootstrap-server broker_host:port --describe --group my-group
## windows指令
bin/windows/kafka-consumer-groups.bat --bootstrap-server broker_host:port --describe --group demoConsumerGroup
3、发送和消费
## 创建生产者
bin/kafka-console-producer.sh --broker-list broker_host:port --topic test
## windows指令
bin/windows/kafka-console-producer.bat --broker-list broker_host:port --topic test
## 消费者
bin/kafka-console-consumer.sh --bootstrap-server broker_host:port --topic test --from-beginning --consumer.config config/consumer.properties
## windows指令
bin/windows/kafka-console-consumer.bat --bootstrap-server broker_host:port --topic test --from-beginning --consumer.config config/consumer.properties
## kafka-verifiable-consumer.sh(消费者事件,例如:offset提交等,windows没有对应的指令)
bin/kafka-verifiable-consumer.sh --broker-list localhost:9092 --topic test --group-id groupName
broker-list这个指令已经废弃,但还能用。通过--help指令可以看到,但是仍然可以用,新的指令是--bootstrap-server。根据dk的回答,设定多个ip和端口的话,可以实现类似集群的效果,当一个broker不可达时,能够切换到另一个broker。
--bootstrap-server <String: server to REQUIRED unless --broker-list
connect to> (deprecated) is specified. The server
(s) to connect to. The broker list
string in the form HOST1:PORT1,HOST2:
PORT2.
--broker-list <String: broker-list> DEPRECATED, use --bootstrap-server
instead; ignored if --bootstrap-
server is specified. The broker
list string in the form HOST1:PORT1,
HOST2:PORT2.
4、切换leader
在手动切换leader之前,需要先创建一个支持切换leader的topic,下面的指令是创建一个名为leader-test的拥有4个分区,每个分区有两个备份的topic。
bin/windows/kafka-topics.bat --create --bootstrap-server broker_host:port--replication-factor 2 --partitions 4 --topic leader-test
## 切换leader
bin/kafka-preferred-replica-election.sh --bootstrap-server broker_host:port --election-type preferred --topic leader-test --partition 0
## windows preferred
bin/windows/kafka-leader-election.bat --bootstrap-server broker_host:port --election-type preferred --topic leader-test --partition 0
## 响应结果
Valid replica already elected for partitions leader-test-0
## windows unclean
bin/windows/kafka-leader-election.bat --bootstrap-server broker_host:port--election-type unclean --topic leader-test --partition 3
Valid replica already elected for partitions leader-test-3
由于上面创建的topic拥有4个分区,所以切换leader的参数中--partition后面的跟随的数字可以是0-3任意一个,超出这个范围就会报错。
同样的根据--help的反馈,--election-type的选择可以是PREFERRED或者UNCLEAN,在官方解释中他们的区别是:
Type of election to attempt. Possible values are "preferred" for preferred leader election or "unclean" for unclean leader election. If preferred election is selection, the election is only performed if the current leader is not the preferred leader for the topic partition. If unclean election is selected, the election is only performed if there are no leader for the topic partition. REQUIRED.
当前分区有leader用preferred,如果指定的分区不是原来的leader,将执行切换。如果当前topic没有leader则使用unclean,将该分区设定为leader。
5、kafka压测命令
bin/kafka-producer-perf-test.sh --topic test --num-records 100 --record-size 1 --throughput 100 --producer-props bootstrap.servers=broker_host:port
bin/windows/kafka-producer-perf-test.bat --topic test --num-records 100 --record-size 1 --throughput 100 --producer-props bootstrap.servers=broker_host:9092
压测结果如下:
## windows指令
bin/windows/kafka-producer-perf-test.bat --topic test --num-records 100 --record-size 1 --throughput 100 --producer-props bootstrap.servers=broker_host:port
## 响应结果
100 records sent, 99.403579 records/sec (0.00 MB/sec), 11.71 ms avg latency, 245.00 ms max latency, 0 ms 50th, 41 ms 95th, 245 ms 99th, 245 ms 99.9th.