1.写各种文件

1.1.写docker-compose文件

1.1.1.文件内容

文件如下:

services:
  kafka-0:
    image: docker.io/bitnami/kafka:3.9
    ports:
      # kafka-0 暴露 9094 端口
      - "9094:9094"
    environment:
      # Kafka KRaft 配置
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9093,1@kafka-1:9093,2@kafka-2:9093
      - KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv
      # 监听器配置 - 修改为支持SASL
      - KAFKA_CFG_LISTENERS=SASL_PLAINTEXT://:9092,CONTROLLER://:9093,SASL_EXTERNAL://0.0.0.0:9094
      - KAFKA_CFG_ADVERTISED_LISTENERS=SASL_PLAINTEXT://kafka-0:9092,SASL_EXTERNAL://你的ip:9094
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=SASL_PLAINTEXT:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT,SASL_EXTERNAL:SASL_PLAINTEXT
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=SASL_PLAINTEXT
      # SASL配置
      - KAFKA_CFG_SASL_ENABLED_MECHANISMS=PLAIN
      - KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN
      # cluster
      - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
      - KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
      - KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
      # 认证配置
      - KAFKA_CLIENT_USERS=user
      - KAFKA_CLIENT_PASSWORDS=password
    volumes:
      - kafka_0_data:/bitnami/kafka

  kafka-1:
    image: docker.io/bitnami/kafka:3.9
    ports:
      # kafka-1 暴露 9095 端口
      - "9095:9094"
    environment:
      - KAFKA_CFG_NODE_ID=1
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9093,1@kafka-1:9093,2@kafka-2:9093
      - KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv
      # 监听器配置 - 修改为支持SASL
      - KAFKA_CFG_LISTENERS=SASL_PLAINTEXT://:9092,CONTROLLER://:9093,SASL_EXTERNAL://0.0.0.0:9094
      - KAFKA_CFG_ADVERTISED_LISTENERS=SASL_PLAINTEXT://kafka-1:9092,SASL_EXTERNAL://你的ip:9095
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=SASL_PLAINTEXT:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT,SASL_EXTERNAL:SASL_PLAINTEXT
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=SASL_PLAINTEXT
      # SASL配置
      - KAFKA_CFG_SASL_ENABLED_MECHANISMS=PLAIN
      - KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN
      # cluster
      - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
      - KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
      - KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
      # 认证配置
      - KAFKA_CLIENT_USERS=user
      - KAFKA_CLIENT_PASSWORDS=password
    volumes:
      - kafka_1_data:/bitnami/kafka

  kafka-2:
    image: docker.io/bitnami/kafka:3.9
    ports:
      # kafka-2 暴露 9096 端口
      - "9096:9094"
    environment:
      - KAFKA_CFG_NODE_ID=2
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9093,1@kafka-1:9093,2@kafka-2:9093
      - KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv
      # 监听器配置 - 修改为支持SASL
      - KAFKA_CFG_LISTENERS=SASL_PLAINTEXT://:9092,CONTROLLER://:9093,SASL_EXTERNAL://0.0.0.0:9094
      - KAFKA_CFG_ADVERTISED_LISTENERS=SASL_PLAINTEXT://kafka-2:9092,SASL_EXTERNAL://你的ip:9096
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=SASL_PLAINTEXT:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT,SASL_EXTERNAL:SASL_PLAINTEXT
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=SASL_PLAINTEXT
      # SASL配置
      - KAFKA_CFG_SASL_ENABLED_MECHANISMS=PLAIN
      - KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN
      # cluster
      - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
      - KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
      - KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
      # 认证配置
      - KAFKA_CLIENT_USERS=user
      - KAFKA_CLIENT_PASSWORDS=password
    volumes:
      - kafka_2_data:/bitnami/kafka

volumes:
  kafka_0_data:
    driver: local
  kafka_1_data:
    driver: local
  kafka_2_data:
    driver: local

其中,有3处地方的你的ip需要改成linux系统对本电脑开放的ip地址,我使用的是wsl,用以下指令获取:

ip a | grep inet

获取到如下内容:

root@Losita-PC:/home/losita# ip a | grep inet
    inet 127.0.0.1/8 scope host lo
    inet 10.255.255.254/32 brd 10.255.255.254 scope global lo
    inet6 ::1/128 scope host
    inet 172.23.75.194/20 brd 172.23.79.255 scope global eth0
    inet6 fe80::215:5dff:fe47:46c9/64 scope link
    inet 172.17.0.1/16 brd 172.17.255.255 scope global docker0

然后丢给ai,问它需要选择哪个ip作为kafka的上面代码块的ip(把整坨代码丢给他),一般来说都是中间的这个,我也不确定。

然后我选择了172.23.75.194,用它填充了上面composer的你的ip,后面docker容器成功跑通。

1.1.2.将其保存到linux系统的根目录

首先创建空文件:

sudo touch /docker-compose.yml

然后打开编辑器进行编辑:

sudo nano /docker-compose.yml

然后在 nano 编辑器中输入上方的 docker-compose.yml 内容,编辑完成后:

  • Ctrl + X 退出
  • 输入 Y 确认保存
  • Enter 退出

1.1.3.docker-compose运行

输入shell命令:

docker-compose -p <你的项目名称> up -d

记得替换<你的项目名称>为你自己的项目名(一整个换掉),然后回车,等待部署完毕后就ok了。

1.2.写代码

1.2.1.消费者代码

package consumer

import (
"context"
"fmt"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/plain"
"log"
"os"
"os/signal"
"syscall"
"time"
)

func StartConsumer() {
// 配置信息
topic := "my-topic"
groupID := "my-consumer-group"
brokers := []string{"172.23.75.194:9094"}

// 创建SASL认证机制
mechanism := plain.Mechanism{
Username: "user",
Password: "password",
}

// 配置Dialer (不使用TLS,因为我们使用的是SASL_PLAINTEXT)
dialer := &kafka.Dialer{
Timeout:       10 * time.Second,
DualStack:     true,
SASLMechanism: mechanism,
}

// 创建Reader配置
r := kafka.NewReader(kafka.ReaderConfig{
Brokers:         brokers,
Topic:           topic,
GroupID:         groupID,           // 消费者组ID
MinBytes:        10e3,              // 10KB 最小批处理大小
MaxBytes:        10e6,              // 10MB 最大批处理大小
MaxWait:         1 * time.Second,   // 最长等待时间
StartOffset:     kafka.FirstOffset, // 从最早的消息开始(可选用 kafka.LastOffset 从最新的开始)
ReadLagInterval: -1,                // 禁用滞后报告
Dialer:          dialer,            // 使用带SASL的dialer
})

// 捕获中断信号以优雅退出
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)

// 创建上下文,允许我们控制消费循环
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// 在单独的goroutine中处理信号
go func() {
sig := <-sigchan
fmt.Printf("捕获到信号: %v, 正在关闭消费者...\n", sig)
cancel()
}()

fmt.Println("开始消费消息,按 Ctrl+C 停止...")

// 消费消息循环
for {
select {
case <-ctx.Done():
fmt.Println("上下文已取消,退出消费循环")
if err := r.Close(); err != nil {
log.Fatalf("关闭reader失败: %v", err)
}
return
default:
// 读取消息
m, err := r.ReadMessage(ctx)
if err != nil {
// 检查是否因为上下文取消而中断
if ctx.Err() != nil {
continue
}
log.Printf("读取消息失败: %v", err)
continue
}

// 处理消息
fmt.Printf("收到消息: 主题=%s, 分区=%d, 偏移量=%d, 键=%s, 值=%s\n",
m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))

// 这里可以添加您的业务逻辑来处理消息

// kafka-go 自动处理提交偏移量,除非您使用了CommitMessages方法手动控制
}
}
}

1.2.2.生产者代码

package producer

import (
"context"
"fmt"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/plain"
"log"
"time"
)

func StartProducer() {
topic := "my-topic"
partition := 0

// 创建SASL认证机制(使用用户名和密码)
mechanism := plain.Mechanism{
Username: "user",
Password: "password",
}

// 创建无TLS的Dialer(因为我们配置的是SASL_PLAINTEXT)
dialer := &kafka.Dialer{
Timeout:       10 * time.Second,
DualStack:     true,
SASLMechanism: mechanism,
}

// 连接至Kafka集群的Leader节点
conn, err := dialer.DialLeader(context.Background(), "tcp", "172.23.75.194:9094", topic, partition)
if err != nil {
log.Fatal("failed to dial leader:", err)
}

// 设置发送消息的超时时间
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))

// 发送消息
_, err = conn.WriteMessages(
kafka.Message{Value: []byte("原神启动!")},
kafka.Message{Value: []byte("星铁启动!")},
kafka.Message{Value: []byte("绝区零启动!")},
)
if err != nil {
log.Fatal("failed to write messages:", err)
}
fmt.Println("write messages success")

// 关闭连接
if err := conn.Close(); err != nil {
log.Fatal("failed to close writer:", err)
}
}

1.2.3.主文件

package main

import (
"fmt"
"kafka/consumer"
"kafka/producer"
"os"
"os/signal"
"syscall"
)

func main() {
fmt.Println("Kafka 生产者 & 消费者启动...")

// 创建退出信号监听
stop := make(chan os.Signal, 1)
signal.Notify(stop, os.Interrupt, syscall.SIGTERM)

// 启动生产者和消费者
go producer.StartProducer()
go consumer.StartConsumer()

// 等待终止信号
<-stop
fmt.Println("收到终止信号,正在退出...")
}

注:生产者和消费者代码中均有一处需要填写ip地址,填写的ip地址和上一节中获取的地址是一样的。

2.运行

2.1.运行docker

直接在docker for desktop中启动镜像就行了。

2.2.运行go文件

go run .

3.检查效果

运行完毕后会呈现以下内容:

PS E:\GoProjects\LearningGo\2nd_semester\kafka> go run .
Kafka 生产者 & 消费者启动...
开始消费消息,按 Ctrl+C 停止...
write messages success
收到消息: 主题=my-topic, 分区=0, 偏移量=0, 键=, 值=原神启动!
收到消息: 主题=my-topic, 分区=0, 偏移量=1, 键=, 值=星铁启动!
收到消息: 主题=my-topic, 分区=0, 偏移量=2, 键=, 值=绝区零启动!
收到终止信号,正在退出...

最后一句是按ctrl+c之后才出现的,然后就退出了。

最后修改:2025 年 04 月 01 日
如果觉得我的文章对你有用,请随意赞赏