docker安装单机版kafka
启动zookeeper
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
启动kafka
docker run -d --name kafka \
-p 9092:9092 \
-e KAFKA_BROKER_ID=0 \
-e KAFKA_ZOOKEEPER_CONNECT=192.168.43.50:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.43.50:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka
中间两个参数的192.168.43.50改为宿主机器的IP地址,如果不这么设置,可能会导致在别的机器上访问不到kafka。
创建一个topic
进入容器
docker exec -it kafka /bin/bash
cd opt/kafka
创建一个主题
bin/kafka-topics.sh --create --zookeeper 192.168.43.50:2181 --replication-factor 1 --partitions 1 --topic mykafka
运行一个生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mykafka
运行一个消费者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mykafka --from-beginning
kafka设置分区数量
分区数量的作用:有多少分区就能负载多少个消费者,生产者会自动分配给分区数据,每个消费者只消费自己分区的数据,每个分区有自己独立的offset
#进入kafka容器
vi opt/kafka/config/server.properties
修改run.partitions=2
#退出容器
ctrl+p+q
#重启容器
docker restart kafka
#修改指定topic
./kafka-topics.sh --zookeeper localhost:2181 --alter --partitions 3 --topic topicname
python代码
生产者
# -*- coding: UTF-8 -*-
from kafka import KafkaProducer
import json
import datetime
topic='myTopic'
producer = KafkaProducer(bootstrap_servers='192.168.43.50:9092',value_serializer=lambda m:json.dumps(m).encode("utf-8"))
# 连接kafka
# 参数bootstrap_servers:指定kafka连接地址
# 参数value_serializer:指定序列化的方式,我们定义json来序列化数据,当字典传入kafka时自动转换成bytes
# 用户密码登入参数
# security_protocol="SASL_PLAINTEXT"
# sasl_mechanism="PLAIN"
# sasl_plain_username="maple"
# sasl_plain_password="maple"
for i in range(1000):
data={"num":i,"ts":datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
producer.send(topic,data)
producer.close()
消费者
# -*- coding: UTF-8 -*-
from kafka import KafkaConsumer
import time
topic = 'myTopic'
consumer = KafkaConsumer(topic, bootstrap_servers=['192.168.43.50:9092'], group_id="test", auto_offset_reset="earliest")
# 参数bootstrap_servers:指定kafka连接地址
# 参数group_id:如果2个程序的topic和group_id相同,那么他们读取的数据不会重复,2个程序的topic相同,group_id不同,那么他们各自消费相同的数据,互不影响
# 参数auto_offset_reset:默认为latest表示offset设置为当前程序启动时的数据位置,earliest表示offset设置为0,在你的group_id第一次运行时,还没有offset的时候,给你设定初始offset。一旦group_id有了offset,那么此参数就不起作用了
for msg in consumer:
recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
print(recv)
运行消费者结果
test:0:3212: key=None value=b'{"num": 981, "ts": "2021-02-23 16:38:14"}'
test:0:3213: key=None value=b'{"num": 982, "ts": "2021-02-23 16:38:14"}'
test:0:3214: key=None value=b'{"num": 987, "ts": "2021-02-23 16:38:14"}'
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦