> 本节推荐非常白的小白上手,如果有基础的,请看下一节进阶配置。
### 拉取最新的kafka版本
[下载地址](https://www.apache.org/dyn/closer.cgi?path=/kafka/2.7.0/kafka_2.13-2.7.0.tgz)
```bash
wget https://mirrors.bfsu.edu.cn/apache/kafka/2.7.0/kafka_2.13-2.7.0.tgz
```
### 解压kafka
```bash
tar -xzf kafka_2.13-2.7.0.tgz
cd kafka_2.13-2.7.0
```
### 启动zk
zk官方启动教程:https://zookeeper.apache.org/doc/r3.7.0/zookeeperStarted.html
### 启动kafka
```bash
bin/kafka-server-start.sh config/server.properties
```
成功启动9092端口,该端口用于监听客户端的连接请求。

下图证明kafka已经成功启动了。

### 新建Topic
```bash
bin/kafka-topics.sh --create --topic mytopic --bootstrap-server localhost:9092
```
### 查看Topic分区信息
```bash
bin/kafka-topics.sh --describe --topic mytopic --bootstrap-server localhost:9092
```
### 新建Producer向Topic中打入消息
```bash
bin/kafka-console-producer.sh --topic mytopic --bootstrap-server localhost:9092
```
### 新建Consumer消费Topic中的消息
```bash
bin/kafka-console-consumer.sh --topic mytopic --from-beginning --bootstrap-server localhost:9092
```
kafka的topic信息都存储在zk中。目录如下
连接到zk
`$ ls /brokers/topics`

**上图中所指示的topic信息就是我们刚刚创建的topic信息。**
## kafka进阶
### 修改kafka的配置文件
`vi server.properties`
1. 注意每个broker的id都不同
```properties
broker.id=0
```
2. 修改log路径
```properties
# log.dirs=/tmp/kafka-logs
log.dirs=/var/kafka-data
```
3. 修改zookeeper连接
```properties
# zookeeper.connect=localhost:2181
# 追加下面的路径配置,使得在zk的目录下,会将kafka的根路径创建进去。否则,所有的数据目录,将会散列到zk的根路径下,比较难看。
zookeeper.connect=localhost:2181/kafka
```
### 新建多分区
在同一个broker中新建两个分区提供给数据写入和读取。
```bash
bin/kafka-topics.sh --create --topic ooxx --partitions 2 --bootstrap-server localhost:9092
```
### 展示所有Topic信息
```bash
bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
```

### 显示指定Topic详情
```bash
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic ooxx
```

### 查看指定消费组消费消息的偏移量
```bash
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group msb --offsets
```

三个参数的意义:
1. CURRENT-OFFSET: 当前消费进度。
2. LOG-END-OFFSET:末尾最大消息长度。
3. LAG:二者差值。
## 配置文件解析
位置:`conf/server.properties`
配置注意点如下
1、如果要启动多个kafka进程构成集群,那么每个kafka的broker.id**不应相同**。
```properties
broker.id=0
```
2、log.dirs=/tmp/kafka-logs
kafka 快速开始