Windows 搭建Kafka 集群

无情 阅读:652 2021-03-31 12:47:04 评论:0

kafka名词介绍

  1. Message:消息,就是要发送的内容,一般包装成一个消息对象。
  2. Topic:通俗来讲的话就是放置“消息”的地方,也就是说消息投递的一个容器。假如把消息看作是信封的话,那么Topic就是一个邮箱。
  3. Partition && Log
    Partition分区,可以理解为一个逻辑上的分区,像是我们电脑的磁盘 C,D,E盘一样。
    Kafka为每个分区维护着一份日志Log文件。
  4. Producers(生产者)
    和其他消息队列一样,生产者通常都是消息的产生方。
    在Kafka中它决定消息发送到指定Topic的哪个分区上。
  5. Consumers(消费者)
    消费者就是消息的使用着,在消费者端也有几个名词需要区分一下。
    一般消息队列有两种模式的消费方式,分别是队列模式订阅模式
    队列模式:一对一,就是一个消息只能被一个消费者消费,不能重复消费。一般情况队列支持存在多个消费者,但是对于一个消息,只会有一个消费者可以消费它。
    订阅模式:一对多,一个消息可能被多次消费,消息生产者将消息发布到Topic中,只要是订阅改Topic的消费者都可以消费。

Kafka broker集群

1、这里我们以在同一台服务器配置3套Kafka为例,将Kafka分别安装到3个目录中。如:

编辑server.properties
(1) 打开 kafka_a 服务器的server.properties文件,修改或增加如下配置:

# Licensed to the Apache Software Foundation (ASF) under one or more 
# contributor license agreements.  See the NOTICE file distributed with 
# this work for additional information regarding copyright ownership. 
# The ASF licenses this file to You under the Apache License, Version 2.0 
# (the "License"); you may not use this file except in compliance with 
# the License.  You may obtain a copy of the License at 
# 
#    http://www.apache.org/licenses/LICENSE-2.0 
# 
# Unless required by applicable law or agreed to in writing, software 
# distributed under the License is distributed on an "AS IS" BASIS, 
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
# See the License for the specific language governing permissions and 
# limitations under the License. 
 
# see kafka.server.KafkaConfig for additional details and defaults 
 
############################# Server Basics ############################# 
 
# The id of the broker. This must be set to a unique integer for each broker.  
# 唯一标识 
broker.id=0 
 
############################# Socket Server Settings ############################# 
 
 
# The address the socket server listens on. It will get the value returned from  
# java.net.InetAddress.getCanonicalHostName() if not configured. 
#   FORMAT: 
#     listeners = listener_name://host_name:port 
#   EXAMPLE: 
#     listeners = PLAINTEXT://your.host.name:9092 
listeners=PLAINTEXT://192.168.1.73:9097 
 
# Hostname and port the broker will advertise to producers and consumers. If not set,  
# it uses the value for "listeners" if configured.  Otherwise, it will use the value 
# returned from java.net.InetAddress.getCanonicalHostName(). 
advertised.listeners=PLAINTEXT://192.168.1.73:9097 
 
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details 
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL 
 
# The number of threads that the server uses for receiving requests from the network and sending responses to the network 
num.network.threads=3 
 
# The number of threads that the server uses for processing requests, which may include disk I/O 
num.io.threads=8 
 
# The send buffer (SO_SNDBUF) used by the socket server 
socket.send.buffer.bytes=102400 
 
# The receive buffer (SO_RCVBUF) used by the socket server 
socket.receive.buffer.bytes=102400 
 
# The maximum size of a request that the socket server will accept (protection against OOM) 
socket.request.max.bytes=104857600 
 
# host name settings  
# IP设置 
host.name= 192.168.1.73 
 
# port settings 
# 端口设置 
port=9097 
############################# Log Basics ############################# 
 
# A comma separated list of directories under which to store log files  
# 日志存放地址 
log.dirs=D:/kafka/kafka_a/kafka_2.12-2.7.0/kafka-log 
 
# The default number of log partitions per topic. More partitions allow greater 
# parallelism for consumption, but this will also result in more files across 
# the brokers. 
num.partitions=1 
 
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. 
# This value is recommended to be increased for installations with data dirs located in RAID array. 
num.recovery.threads.per.data.dir=1 
 
############################# Internal Topic Settings  ############################# 
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state" 
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3. 
offsets.topic.replication.factor=1 
transaction.state.log.replication.factor=1 
transaction.state.log.min.isr=1 
 
############################# Log Flush Policy ############################# 
 
# Messages are immediately written to the filesystem but by default we only fsync() to sync 
# the OS cache lazily. The following configurations control the flush of data to disk. 
# There are a few important trade-offs here: 
#    1. Durability: Unflushed data may be lost if you are not using replication. 
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. 
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks. 
# The settings below allow one to configure the flush policy to flush data after a period of time or 
# every N messages (or both). This can be done globally and overridden on a per-topic basis. 
 
# The number of messages to accept before forcing a flush of data to disk 
#log.flush.interval.messages=10000 
 
# The maximum amount of time a message can sit in a log before we force a flush 
#log.flush.interval.ms=1000 
 
############################# Log Retention Policy ############################# 
 
# The following configurations control the disposal of log segments. The policy can 
# be set to delete segments after a period of time, or after a given size has accumulated. 
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens 
# from the end of the log. 
 
# The minimum age of a log file to be eligible for deletion due to age 
log.retention.hours=168 
 
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining 
# segments drop below log.retention.bytes. Functions independently of log.retention.hours. 
#log.retention.bytes=1073741824 
 
# The maximum size of a log segment file. When this size is reached a new log segment will be created. 
log.segment.bytes=1073741824 
 
# The interval at which log segments are checked to see if they can be deleted according 
# to the retention policies 
log.retention.check.interval.ms=300000 
 
############################# Zookeeper ############################# 
 
# Zookeeper connection string (see zookeeper docs for details). 
# This is a comma separated host:port pairs, each corresponding to a zk 
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". 
# You can also append an optional chroot string to the urls to specify the 
# root directory for all kafka znodes. 
# zookeeper 单机配置 
zookeeper.connect=192.168.1.73:2181 
 
# Timeout in ms for connecting to zookeeper 
zookeeper.connection.timeout.ms=18000 
 
 
############################# Group Coordinator Settings ############################# 
 
# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance. 
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms. 
# The default value for this is 3 seconds. 
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing. 
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup. 
group.initial.rebalance.delay.ms=0 

(2) 打开 kafka_b 服务器的server.properties文件,修改或增加如下配置:

# Licensed to the Apache Software Foundation (ASF) under one or more 
# contributor license agreements.  See the NOTICE file distributed with 
# this work for additional information regarding copyright ownership. 
# The ASF licenses this file to You under the Apache License, Version 2.0 
# (the "License"); you may not use this file except in compliance with 
# the License.  You may obtain a copy of the License at 
# 
#    http://www.apache.org/licenses/LICENSE-2.0 
# 
# Unless required by applicable law or agreed to in writing, software 
# distributed under the License is distributed on an "AS IS" BASIS, 
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
# See the License for the specific language governing permissions and 
# limitations under the License. 
 
# see kafka.server.KafkaConfig for additional details and defaults 
 
############################# Server Basics ############################# 
 
# The id of the broker. This must be set to a unique integer for each broker. 
# 唯一标识 
broker.id=1 
 
############################# Socket Server Settings ############################# 
 
 
# The address the socket server listens on. It will get the value returned from  
# java.net.InetAddress.getCanonicalHostName() if not configured. 
#   FORMAT: 
#     listeners = listener_name://host_name:port 
#   EXAMPLE: 
#     listeners = PLAINTEXT://your.host.name:9092 
listeners=PLAINTEXT://192.168.1.73:9098 
 
# Hostname and port the broker will advertise to producers and consumers. If not set,  
# it uses the value for "listeners" if configured.  Otherwise, it will use the value 
# returned from java.net.InetAddress.getCanonicalHostName(). 
advertised.listeners=PLAINTEXT://192.168.1.73:9098 
 
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details 
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL 
 
# The number of threads that the server uses for receiving requests from the network and sending responses to the network 
num.network.threads=3 
 
# The number of threads that the server uses for processing requests, which may include disk I/O 
num.io.threads=8 
 
# The send buffer (SO_SNDBUF) used by the socket server 
socket.send.buffer.bytes=102400 
 
# The receive buffer (SO_RCVBUF) used by the socket server 
socket.receive.buffer.bytes=102400 
 
# The maximum size of a request that the socket server will accept (protection against OOM) 
socket.request.max.bytes=104857600 
 
# host name settings 
# IP 绑定 
host.name= 192.168.1.73 
 
# port settings 
# 端口绑定 
port=9098 
############################# Log Basics ############################# 
 
# A comma separated list of directories under which to store log files 
# 日志文件存储配置 
log.dirs=D:/kafka/kafka_b/kafka_2.12-2.7.0/kafka-log 
 
# The default number of log partitions per topic. More partitions allow greater 
# parallelism for consumption, but this will also result in more files across 
# the brokers. 
num.partitions=1 
 
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. 
# This value is recommended to be increased for installations with data dirs located in RAID array. 
num.recovery.threads.per.data.dir=1 
 
############################# Internal Topic Settings  ############################# 
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state" 
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3. 
offsets.topic.replication.factor=1 
transaction.state.log.replication.factor=1 
transaction.state.log.min.isr=1 
 
############################# Log Flush Policy ############################# 
 
# Messages are immediately written to the filesystem but by default we only fsync() to sync 
# the OS cache lazily. The following configurations control the flush of data to disk. 
# There are a few important trade-offs here: 
#    1. Durability: Unflushed data may be lost if you are not using replication. 
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. 
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks. 
# The settings below allow one to configure the flush policy to flush data after a period of time or 
# every N messages (or both). This can be done globally and overridden on a per-topic basis. 
 
# The number of messages to accept before forcing a flush of data to disk 
#log.flush.interval.messages=10000 
 
# The maximum amount of time a message can sit in a log before we force a flush 
#log.flush.interval.ms=1000 
 
############################# Log Retention Policy ############################# 
 
# The following configurations control the disposal of log segments. The policy can 
# be set to delete segments after a period of time, or after a given size has accumulated. 
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens 
# from the end of the log. 
 
# The minimum age of a log file to be eligible for deletion due to age 
log.retention.hours=168 
 
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining 
# segments drop below log.retention.bytes. Functions independently of log.retention.hours. 
#log.retention.bytes=1073741824 
 
# The maximum size of a log segment file. When this size is reached a new log segment will be created. 
log.segment.bytes=1073741824 
 
# The interval at which log segments are checked to see if they can be deleted according 
# to the retention policies 
log.retention.check.interval.ms=300000 
 
############################# Zookeeper ############################# 
 
# Zookeeper connection string (see zookeeper docs for details). 
# This is a comma separated host:port pairs, each corresponding to a zk 
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". 
# You can also append an optional chroot string to the urls to specify the 
# root directory for all kafka znodes. 
# zookeeper 单机配置 
zookeeper.connect=192.168.1.73:2181 
 
# Timeout in ms for connecting to zookeeper 
zookeeper.connection.timeout.ms=18000 
 
 
############################# Group Coordinator Settings ############################# 
 
# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance. 
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms. 
# The default value for this is 3 seconds. 
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing. 
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup. 
group.initial.rebalance.delay.ms=0 

(3) 打开 kafka_c 服务器的server.properties文件,修改或增加如下配置:

# Licensed to the Apache Software Foundation (ASF) under one or more 
# contributor license agreements.  See the NOTICE file distributed with 
# this work for additional information regarding copyright ownership. 
# The ASF licenses this file to You under the Apache License, Version 2.0 
# (the "License"); you may not use this file except in compliance with 
# the License.  You may obtain a copy of the License at 
# 
#    http://www.apache.org/licenses/LICENSE-2.0 
# 
# Unless required by applicable law or agreed to in writing, software 
# distributed under the License is distributed on an "AS IS" BASIS, 
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
# See the License for the specific language governing permissions and 
# limitations under the License. 
 
# see kafka.server.KafkaConfig for additional details and defaults 
 
############################# Server Basics ############################# 
 
# The id of the broker. This must be set to a unique integer for each broker. 
# 唯一标识 
broker.id=2 
 
############################# Socket Server Settings ############################# 
 
 
# The address the socket server listens on. It will get the value returned from  
# java.net.InetAddress.getCanonicalHostName() if not configured. 
#   FORMAT: 
#     listeners = listener_name://host_name:port 
#   EXAMPLE: 
#     listeners = PLAINTEXT://your.host.name:9092 
listeners=PLAINTEXT://192.168.1.73:9099 
 
# Hostname and port the broker will advertise to producers and consumers. If not set,  
# it uses the value for "listeners" if configured.  Otherwise, it will use the value 
# returned from java.net.InetAddress.getCanonicalHostName(). 
advertised.listeners=PLAINTEXT://192.168.1.73:9099 
 
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details 
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL 
 
# The number of threads that the server uses for receiving requests from the network and sending responses to the network 
num.network.threads=3 
 
# The number of threads that the server uses for processing requests, which may include disk I/O 
num.io.threads=8 
 
# The send buffer (SO_SNDBUF) used by the socket server 
socket.send.buffer.bytes=102400 
 
# The receive buffer (SO_RCVBUF) used by the socket server 
socket.receive.buffer.bytes=102400 
 
# The maximum size of a request that the socket server will accept (protection against OOM) 
socket.request.max.bytes=104857600 
 
# host name settings 
# IP 绑定 
host.name= 192.168.1.73 
 
# port settings 
# 端口绑定 
port=9099 
############################# Log Basics ############################# 
 
# A comma separated list of directories under which to store log files 
# 日志存储配置 
log.dirs=D:/kafka/kafka_c/kafka_2.12-2.7.0/kafka-log 
 
# The default number of log partitions per topic. More partitions allow greater 
# parallelism for consumption, but this will also result in more files across 
# the brokers. 
num.partitions=1 
 
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. 
# This value is recommended to be increased for installations with data dirs located in RAID array. 
num.recovery.threads.per.data.dir=1 
 
############################# Internal Topic Settings  ############################# 
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state" 
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3. 
offsets.topic.replication.factor=1 
transaction.state.log.replication.factor=1 
transaction.state.log.min.isr=1 
 
############################# Log Flush Policy ############################# 
 
# Messages are immediately written to the filesystem but by default we only fsync() to sync 
# the OS cache lazily. The following configurations control the flush of data to disk. 
# There are a few important trade-offs here: 
#    1. Durability: Unflushed data may be lost if you are not using replication. 
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. 
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks. 
# The settings below allow one to configure the flush policy to flush data after a period of time or 
# every N messages (or both). This can be done globally and overridden on a per-topic basis. 
 
# The number of messages to accept before forcing a flush of data to disk 
#log.flush.interval.messages=10000 
 
# The maximum amount of time a message can sit in a log before we force a flush 
#log.flush.interval.ms=1000 
 
############################# Log Retention Policy ############################# 
 
# The following configurations control the disposal of log segments. The policy can 
# be set to delete segments after a period of time, or after a given size has accumulated. 
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens 
# from the end of the log. 
 
# The minimum age of a log file to be eligible for deletion due to age 
log.retention.hours=168 
 
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining 
# segments drop below log.retention.bytes. Functions independently of log.retention.hours. 
#log.retention.bytes=1073741824 
 
# The maximum size of a log segment file. When this size is reached a new log segment will be created. 
log.segment.bytes=1073741824 
 
# The interval at which log segments are checked to see if they can be deleted according 
# to the retention policies 
log.retention.check.interval.ms=300000 
 
############################# Zookeeper ############################# 
 
# Zookeeper connection string (see zookeeper docs for details). 
# This is a comma separated host:port pairs, each corresponding to a zk 
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". 
# You can also append an optional chroot string to the urls to specify the 
# root directory for all kafka znodes. 
# zookeeper 单机版本 
zookeeper.connect=192.168.1.73:2181 
 
# Timeout in ms for connecting to zookeeper 
zookeeper.connection.timeout.ms=18000 
 
 
############################# Group Coordinator Settings ############################# 
 
# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance. 
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms. 
# The default value for this is 3 seconds. 
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing. 
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup. 
group.initial.rebalance.delay.ms=0 

启动服务
定位到3个Kafka安装目录 每个目录下都新增start.bat 指令,指令内容如下:

.\bin\windows\kafka-server-start.bat .\config\server.properties

通过启动3个Kafka 文件夹对应的start.bat 指令,来启动对应kafka 服务。

Kafka命令配置

由于是集群操作,以前的Kafka操作命令就有所改变,如:
创建主题:
单机模式:

.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic rourou 
或 
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic rourou 

集群模式:

.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic rourou 
或 
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic rourou 

查询主题:
单机模式:

.\bin\windows\kafka-topics.bat --zookeeper localhost:2181 --list 
或 
kafka-topics --zookeeper localhost:2181 --list 

集群模式:

.\bin\windows\kafka-topics.bat --zookeeper localhost:2181 --list 
或 
kafka-topics --zookeeper localhost:2181 --list 

其他命令(如设置用户读写权限、分组权限等)操作也类似。

 

 

标签:kafka
声明

1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

关注我们

一个IT知识分享的公众号