1.写各种文件
1.1.写docker-compose文件
1.1.1.文件内容
文件如下:
services:
kafka-0:
image: docker.io/bitnami/kafka:3.9
ports:
# kafka-0 暴露 9094 端口
- "9094:9094"
environment:
# Kafka KRaft 配置
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9093,1@kafka-1:9093,2@kafka-2:9093
- KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv
# 监听器配置 - 修改为支持SASL
- KAFKA_CFG_LISTENERS=SASL_PLAINTEXT://:9092,CONTROLLER://:9093,SASL_EXTERNAL://0.0.0.0:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=SASL_PLAINTEXT://kafka-0:9092,SASL_EXTERNAL://你的ip:9094
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=SASL_PLAINTEXT:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT,SASL_EXTERNAL:SASL_PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=SASL_PLAINTEXT
# SASL配置
- KAFKA_CFG_SASL_ENABLED_MECHANISMS=PLAIN
- KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN
# cluster
- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
# 认证配置
- KAFKA_CLIENT_USERS=user
- KAFKA_CLIENT_PASSWORDS=password
volumes:
- kafka_0_data:/bitnami/kafka
kafka-1:
image: docker.io/bitnami/kafka:3.9
ports:
# kafka-1 暴露 9095 端口
- "9095:9094"
environment:
- KAFKA_CFG_NODE_ID=1
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9093,1@kafka-1:9093,2@kafka-2:9093
- KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv
# 监听器配置 - 修改为支持SASL
- KAFKA_CFG_LISTENERS=SASL_PLAINTEXT://:9092,CONTROLLER://:9093,SASL_EXTERNAL://0.0.0.0:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=SASL_PLAINTEXT://kafka-1:9092,SASL_EXTERNAL://你的ip:9095
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=SASL_PLAINTEXT:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT,SASL_EXTERNAL:SASL_PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=SASL_PLAINTEXT
# SASL配置
- KAFKA_CFG_SASL_ENABLED_MECHANISMS=PLAIN
- KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN
# cluster
- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
# 认证配置
- KAFKA_CLIENT_USERS=user
- KAFKA_CLIENT_PASSWORDS=password
volumes:
- kafka_1_data:/bitnami/kafka
kafka-2:
image: docker.io/bitnami/kafka:3.9
ports:
# kafka-2 暴露 9096 端口
- "9096:9094"
environment:
- KAFKA_CFG_NODE_ID=2
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9093,1@kafka-1:9093,2@kafka-2:9093
- KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv
# 监听器配置 - 修改为支持SASL
- KAFKA_CFG_LISTENERS=SASL_PLAINTEXT://:9092,CONTROLLER://:9093,SASL_EXTERNAL://0.0.0.0:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=SASL_PLAINTEXT://kafka-2:9092,SASL_EXTERNAL://你的ip:9096
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=SASL_PLAINTEXT:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT,SASL_EXTERNAL:SASL_PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=SASL_PLAINTEXT
# SASL配置
- KAFKA_CFG_SASL_ENABLED_MECHANISMS=PLAIN
- KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN
# cluster
- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
# 认证配置
- KAFKA_CLIENT_USERS=user
- KAFKA_CLIENT_PASSWORDS=password
volumes:
- kafka_2_data:/bitnami/kafka
volumes:
kafka_0_data:
driver: local
kafka_1_data:
driver: local
kafka_2_data:
driver: local
其中,有3处地方的你的ip
需要改成linux系统对本电脑开放的ip地址,我使用的是wsl
,用以下指令获取:
ip a | grep inet
获取到如下内容:
root@Losita-PC:/home/losita# ip a | grep inet
inet 127.0.0.1/8 scope host lo
inet 10.255.255.254/32 brd 10.255.255.254 scope global lo
inet6 ::1/128 scope host
inet 172.23.75.194/20 brd 172.23.79.255 scope global eth0
inet6 fe80::215:5dff:fe47:46c9/64 scope link
inet 172.17.0.1/16 brd 172.17.255.255 scope global docker0
然后丢给ai,问它需要选择哪个ip作为kafka的上面代码块的ip(把整坨代码丢给他),一般来说都是中间的这个,我也不确定。
然后我选择了172.23.75.194
,用它填充了上面composer的你的ip
,后面docker容器成功跑通。
1.1.2.将其保存到linux系统的根目录
首先创建空文件:
sudo touch /docker-compose.yml
然后打开编辑器进行编辑:
sudo nano /docker-compose.yml
然后在 nano
编辑器中输入上方的 docker-compose.yml
内容,编辑完成后:
- 按
Ctrl + X
退出 - 输入
Y
确认保存 - 按
Enter
退出
1.1.3.docker-compose
运行
输入shell命令:
docker-compose -p <你的项目名称> up -d
记得替换<你的项目名称>
为你自己的项目名(一整个换掉),然后回车,等待部署完毕后就ok了。
1.2.写代码
1.2.1.消费者代码
package consumer
import (
"context"
"fmt"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/plain"
"log"
"os"
"os/signal"
"syscall"
"time"
)
func StartConsumer() {
// 配置信息
topic := "my-topic"
groupID := "my-consumer-group"
brokers := []string{"172.23.75.194:9094"}
// 创建SASL认证机制
mechanism := plain.Mechanism{
Username: "user",
Password: "password",
}
// 配置Dialer (不使用TLS,因为我们使用的是SASL_PLAINTEXT)
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
SASLMechanism: mechanism,
}
// 创建Reader配置
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
Topic: topic,
GroupID: groupID, // 消费者组ID
MinBytes: 10e3, // 10KB 最小批处理大小
MaxBytes: 10e6, // 10MB 最大批处理大小
MaxWait: 1 * time.Second, // 最长等待时间
StartOffset: kafka.FirstOffset, // 从最早的消息开始(可选用 kafka.LastOffset 从最新的开始)
ReadLagInterval: -1, // 禁用滞后报告
Dialer: dialer, // 使用带SASL的dialer
})
// 捕获中断信号以优雅退出
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
// 创建上下文,允许我们控制消费循环
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 在单独的goroutine中处理信号
go func() {
sig := <-sigchan
fmt.Printf("捕获到信号: %v, 正在关闭消费者...\n", sig)
cancel()
}()
fmt.Println("开始消费消息,按 Ctrl+C 停止...")
// 消费消息循环
for {
select {
case <-ctx.Done():
fmt.Println("上下文已取消,退出消费循环")
if err := r.Close(); err != nil {
log.Fatalf("关闭reader失败: %v", err)
}
return
default:
// 读取消息
m, err := r.ReadMessage(ctx)
if err != nil {
// 检查是否因为上下文取消而中断
if ctx.Err() != nil {
continue
}
log.Printf("读取消息失败: %v", err)
continue
}
// 处理消息
fmt.Printf("收到消息: 主题=%s, 分区=%d, 偏移量=%d, 键=%s, 值=%s\n",
m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
// 这里可以添加您的业务逻辑来处理消息
// kafka-go 自动处理提交偏移量,除非您使用了CommitMessages方法手动控制
}
}
}
1.2.2.生产者代码
package producer
import (
"context"
"fmt"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/plain"
"log"
"time"
)
func StartProducer() {
topic := "my-topic"
partition := 0
// 创建SASL认证机制(使用用户名和密码)
mechanism := plain.Mechanism{
Username: "user",
Password: "password",
}
// 创建无TLS的Dialer(因为我们配置的是SASL_PLAINTEXT)
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
SASLMechanism: mechanism,
}
// 连接至Kafka集群的Leader节点
conn, err := dialer.DialLeader(context.Background(), "tcp", "172.23.75.194:9094", topic, partition)
if err != nil {
log.Fatal("failed to dial leader:", err)
}
// 设置发送消息的超时时间
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
// 发送消息
_, err = conn.WriteMessages(
kafka.Message{Value: []byte("原神启动!")},
kafka.Message{Value: []byte("星铁启动!")},
kafka.Message{Value: []byte("绝区零启动!")},
)
if err != nil {
log.Fatal("failed to write messages:", err)
}
fmt.Println("write messages success")
// 关闭连接
if err := conn.Close(); err != nil {
log.Fatal("failed to close writer:", err)
}
}
1.2.3.主文件
package main
import (
"fmt"
"kafka/consumer"
"kafka/producer"
"os"
"os/signal"
"syscall"
)
func main() {
fmt.Println("Kafka 生产者 & 消费者启动...")
// 创建退出信号监听
stop := make(chan os.Signal, 1)
signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
// 启动生产者和消费者
go producer.StartProducer()
go consumer.StartConsumer()
// 等待终止信号
<-stop
fmt.Println("收到终止信号,正在退出...")
}
注:生产者和消费者代码中均有一处需要填写ip地址,填写的ip地址和上一节中获取的地址是一样的。
2.运行
2.1.运行docker
直接在docker for desktop
中启动镜像就行了。
2.2.运行go文件
go run .
3.检查效果
运行完毕后会呈现以下内容:
PS E:\GoProjects\LearningGo\2nd_semester\kafka> go run .
Kafka 生产者 & 消费者启动...
开始消费消息,按 Ctrl+C 停止...
write messages success
收到消息: 主题=my-topic, 分区=0, 偏移量=0, 键=, 值=原神启动!
收到消息: 主题=my-topic, 分区=0, 偏移量=1, 键=, 值=星铁启动!
收到消息: 主题=my-topic, 分区=0, 偏移量=2, 键=, 值=绝区零启动!
收到终止信号,正在退出...
最后一句是按ctrl+c
之后才出现的,然后就退出了。
1 条评论
123