Kafka入门教程(非常详细)
Kafka 是 LinkedIn 开源的分布式消息订阅系统,目前归属 Apache 顶级开源项目,主要特点是基于 Pull 模式来处理消息消费,追求高吞吐量,一开始用于日志的收集和传输,适合大数据的数据收集业务
一、Kafka简介
Kafka 专为高容量发布/订阅消息和流而设计,旨在持久、快速和可扩展。从本质上讲,Kafka 提供了一个持久的消息存储,类似于日志,其具备的特点如表1所示。特点 | 具体作用 |
---|---|
解耦 | 允许独立地扩展或修改消费者和生产者的处理过程,只要确保它们遵守同样的接口约束即可。 |
冗余 | 消息队列把数据进行持久化直到它们已经被完全处理,这规避了数据丢失的风险。 |
灵活性 | 在访问量剧增的情况下,应用仍然需要继续发挥作用,使用消息队列能够使关键组件顶住突发的访问压力,不会因为突发的超负荷的请求而完全崩溃。 |
可恢复性 | 系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程失效,加入队列的消息仍然可以在系统恢复后被处理。 |
有序性 | 在大多数使用场景下,数据处理的顺序很重要。大部分消息队列是有序的,并且能保证数据会按照特定的顺序来处理,Kafka 能保证一个分区内的消息的有序性。 |
缓冲 | 有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的问题。 |
异步通信 | 在很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但不立即处理它,用户想放入队列多少消息就放入多少,然后在需要的时候去处理它们。 |
Kafka 采用 Scala 和 Java 编写,图1中包含2个生产者、1个主题、3个分区、3个副本、3个 Kafka 实例和1个消费者组,1个消费者组包含3个消费者。
图1:Kafka基础架构
下面我们逐一介绍图1中的概念。
1) 生产者(Producer)
顾名思义,生产者是生产消息的,即发送消息的。生产者发送的每一条消息必须有一个主题,即消息的类别。生产者会源源不断地向 Kafka 服务器发送消息。
2) 主题(Topic)
类似我们传统数据库中的表名,例如发送一条主题为 order(订单)的消息,那么 order 下会有多条关于订单的消息。3) 分区(Partition)
生产者发送的消息主题会被存储在分区中,Kafka 把数据分成多个块,让消息合理地分布在不同的分区,分区被分在不同的 Kafka 实例也就是服务器上,这样就实现了大量消息的负载均衡。每个主题可以指定多个分区,但是至少指定一个分区。每个分区存储的数据都是有序的,不同分区间的数据不保证有序性。因为如果有多个分区,消费消息是各个分区独立开始的,有的分区消费得慢,有的分区消费得快,因此不能保证有序。那么当需要保证消息顺序消费时,我们可以将消息设置为一个分区,这就可以保证有序了。
为了保证 Kafka 的吞吐量,一个主题可以设置多个分区,而同一分区只能被一个消费者订阅。
4) 副本(Replica)
副本是分区中数据的备份,是 Kafka 为了防止数据丢失或者服务器宕机采取的保护数据完整性的措施,一般的数据存储工具都会有这个功能。假如我们有3个分区,由于不同分区分别存放了部分数据,因此为了全部数据的完整性,我们必须备份所有分区。这时候我们的一个副本包括3个分区,每个分区有一个副本,两个副本就包含6个分区,一个分区两个副本。
Kafka 制作副本之后会把副本放到不同的服务器上,保证负载均衡。
5) Kafka实例或节点(Broker)
启动一个 Kafka 就产生一个 Kafka 实例,多个 Kafka 实例构成一个 Kafka 集群,这体现了分布式。服务器多了,吞吐率效率将会提高。6) 消费者组(Consumer Group)和消费者(Consumer)
消费者读取 Kafka 中的消息,可以消费任何主题的数据。多个消费者组成一个消费者组,一般消费者必须有一个组(Group)名,如果没有的话会被分一个默认的组名。一个组可以有多个消费者,一条消息在一个组中,只会被一个消费者获取。
提示
对于传统的消息队列,一般消费过的消息会被删除,而在 Kafka 中消费过的消息不会被删除,始终保留所有的消息,只记录一个消费者消费消息的偏移量(offset,用于记录消费位置)作为标记。Kafka 允许消费者自己设置这个偏移量,允许消费者重复消费一些消息。但始终不删除消费过的消息,日积月累,消息势必会越来越多,占用空间也越来越大。Kafka 提供了两种策略来删除消息:一种是基于时间,另一种是基于分区文件的大小,我们可以通过配置来决定使用哪种方式。
Kafka 可以处理消费者规模的网站中的所有动作流数据。Kafka 的优势如下:
- 高吞吐量、低延迟。Kafka 每秒可以处理几十万条消息,它的延迟最低只有几毫秒。
- 可扩展。Kafka 集群支持热扩展。
- 持久、可靠。消息被持久化到本地磁盘,并且支持数据备份,防止数据丢失。
- 容错。Kafka 允许集群中出现节点故障(若副本数量为 n,则允许 n-1 个节点故障)。
- 高并发。Kafka 支持数千个客户端同时读写。
Kafka 适合如下应用场景:
- 日志收集。我们可以用 Kafka 收集各种服务的日志,通过 Kafka 以统一接口服务的方式开放给消费者。
- 消息系统。Kafka 可以解耦生产者和消费者、缓存消息等。
- 用户活动跟踪。Kafka 经常被用来记录 Web 用户或者 APP 用户的各种活动,如浏览网页、搜索、点击等,这些活动信息被各服务器发布到 Kafka 的主题中,然后消费者通过订阅这些主题可以进行实时的监控分析,或保存到数据库。
- 运营指标。Kafka 经常用来记录运营监控数据,包括收集各种分布式应用的数据、生产各种操作的集中反馈,如报警和报告。
- 流式处理。例如 Spark Streaming 和 Storm。
二、Kafka 的使用
1. 安装部署
Kafka 运行在 JVM 上,因此我们要先确保计算机安装了 JDK,Kafka 需要 Java 运行环境。旧版的 Kafka 还需要 ZooKeeper,新版的 Kafka 已经内置了一个 ZooKeeper 环境,所以可以直接使用。
在本教程,我们将使用 Kafka_2.12-3.1.0,待部署的环境的服务器系统版本为 CentOS Linux release 7.6.1810 (Core) ,内核版本为 3.10.0-1127.13.1.el7.x86_64。1) 首先下载源码包后解压并进入目录:
tar -zxvf Kafka_2.12-3.1.0.tgz cd Kafka_2.12-3.1.0/
2) 修改配置文件。
在 Kafka 解压后的目录下有一个 config 文件夹,里面放置如下3个配置文件:
- consumer.properites:消费者配置,该配置文件用于配置消费者,此处我们使用默认的即可。
- producer.properties:生产者配置,该配置文件用于配置生产者,此处我们使用默认的即可。
- server.properties:Kafka 服务器的配置,该配置文件用来配置 Kafka 服务器,目前仅介绍几个最基础的配置,如表2所示。
基本配置名称 | 描述 |
---|---|
broker.id | 声明当前 Kafka 服务器在集群中的唯一 ID,需配置为 integer,并且集群中的每一个 Kafka 服务器的 ID 都应是唯一的,我们这里采用默认配置即可。 |
listeners |
声明当前 Kafka 服务器需要监听的端口号:
|
zookeeper.connect | 声明当前 Kafka 服务器连接的 ZooKeeper 的地址,需配置为 ZooKeeper 的地址,由于本次使用的 Kafka 版本中自带 ZooKeeper,因此使用默认配置 zookeeper.connect= localhost:2181 即可。 |
log.dirs Kafka | 存放日志数据目录。 |
log.retention.hours | 保留日志数据时间,默认为 7 天,超过该时间就分段(segment)。 |
log.segment.bytes | 日志分段的大小,默认为 1GB,超过该大小就分段(segment)。 |
delete.topic.enable | 生产环境不允许删除主题数据,测试环境可以将该配置设置为 true。 |
3) 启动相关的服务。
执行如下命令,先启动 ZooKeeper,再启动 Kafka:
bin/zookeeper-server-start.sh config/zookeeper.properties bin/Kafka-server-start.sh config/server.propertiesZooKeeper 启动成功如图2所示,Kafka 启动成功如图3所示。
图2:ZooKeeper 启动成功
图3:Kafka 启动成功
4) 验证是否启动成功。执行如下命令:
ps ax | grep -i 'Kafka\.Kafka' | grep java | grep -v grep
若成功启动 Kafka 服务器端,则如图4所示,可以看到 Kafka 的后台进程。图4:Kafka 后台进程
至此,我们完成了 Kafka 的服务器端进程的部署。
2. 使用说明
Python 中用于连接 Kafka 客户端的标准库有3种:PyKafka、kafka-python 和 confluent-Kafka。其中,kafka-python 使用的人多,是比较成熟的库。在本教程中,我们使用 kafka-python 2.0.2。我们可以通过执行命令来安装:
pip install kafka-python
也可以在 PyCharm 的集成工具中安装,如图5所示。图5:在 PyCharm 的集成工具中安装 kafka-python
3. KafkaProducer
KafkaProducer 用于发送消息到 Kafka 服务器,它是线程安全的且共享单一生产者实例。我们要往 Kafka 写入消息,首先要创建一个生产者对象,并设置一些属性,服务器在收到消息之后会返回一个响应:- 如果消息成功写入 Kafka,就返回一个 RecordMetadate 对象,包含主题和分区信息,以及记录在分区里的偏移量;
- 如果写入失败,则会返回一个错误。
KafkaProducer 有3种发送消息的方法:
- 立即发送。只管发送消息到服务器端,不管消息是否成功发送。大部分情况下,这种发送方式会成功,因为 Kafka 具有高可用性,所以生产者会自动重试,但有时也会丢失消息。
- 同步发送。通过 send() 方法发送消息,并返回 Future 对象。get() 方法会等待 Future 对象,看 send() 方法是否成功。
- 异步发送。通过带有回调函数的 send() 方法发送消息,在生产者收到 Kafka 实例的响应时会触发回调函数。
注意,对于以上所有情况,我们一定要关注发送消息可能会失败的异常处理。
单个生产者启用多个线程发送消息如代码清单1所示。
代码清单1:producerDemo1
# -*- coding: utf-8 -*- # @Time : 2023/7/27 10:41 上午 # @Project : msgUtil # @File : producerDemo1.py # @Version: Python3.9.8 from Kafka import KafkaProducer, KafkaConsumer from Kafka.errors import Kafka_errors import traceback import json def producer_demo(): # 假设生产的消息为键值对(不是一定为键值对),且序列化方式为JSON producer = KafkaProducer( bootstrap_servers=['localhost:9092'], key_serializer=lambda k: json.dumps(k).encode(), value_serializer=lambda v: json.dumps(v).encode()) # 发送3条消息 for i in range(0, 3): # producer默认是异步的 future = producer.send( 'Kafka_demo', key='count_num', # 同一个key值,会被送至同一个分区 value=str(i), partition=1) # 向分区1发送消息 print("send {}".format(str(i))) try: # 加了get()方法就变成了同步,即要等待获取服务器端返回的结果后再往下执行 future.get(timeout=10) # 监控是否发送成功 except Kafka_errors: # 发送失败抛出Kafka_errors traceback.format_exc()运行上述代码后,生产者往消息队列发送消息成功,如图6所示。
图6:生产者发送消息成功
分区
分区是 Kafka 中一个很重要的部分,合理使用分区,可以提升 Kafka 的整体性能。Kafka 分区有如下好处:
- 便于合理使用存储资源。不同分区在不同 Kafka 实例上存储,我们可以把海量的数据按照分区切割成一块一块的数据存储在多个 Kafka 实例上。合理控制分区的任务,可以实现负载均衡。
- 提高并行度。生产者可以以分区为单位发送数据,消费者可以以分区为单位进行消费数据。
- 在某些情况下,可以实现顺序消费。
在生产环境中,我们需要保证消费者的消费速度大于生产者的生产速度,所以需要检测 Kafka 中的剩余堆积量是在增加还是在减小,时刻监测队列消息剩余量。
查看 Kafka 堆积剩余量如代码清单2所示。
代码清单2:producerDemo2
# -*- coding: utf-8 -*- # @Time : 2023/7/27 11:41 上午 # @Project : msgUtil # @File : producerDemo2.py # @Version: Python3.9.8 from Kafka import KafkaProducer, KafkaConsumer consumer = KafkaConsumer(topic, **kwargs) partitions = [TopicPartition(topic, p) for p in consumer.partitions_for_topic(topic)] print("start to cal offset:") # total toff = consumer.end_offsets(partitions) toff = [(key.partition, toff[key]) for key in toff.keys()] toff.sort() print("total offset: {}".format(str(toff))) # current coff = [(x.partition, consumer.committed(x)) for x in partitions] coff.sort() print("current offset: {}".format(str(coff))) # cal sum and left toff_sum = sum([x[1] for x in toff]) cur_sum = sum([x[1] for x in coff if x[1] is not None]) left_sum = toff_sum - cur_sum print("Kafka left: {}".format(left_sum))在代码清单2中,在 KafkaProducer 初始化的时候,除了需要参数 servers、key_serializer 和 value_serializer,还需要其他初始化参数,如表4所示。
初始化参数 | 含义 |
---|---|
bootstrap.servers |
指定 Kafka 实例的地址清单,地址格式为 host:port。 清单不需要包含所有的 Kafka 实例地址,生产者会从给定的 Kafka 实例中查找其他的 Kafka 实例信息。建议至少提供两个 Kafka 实例信息,这样即使其中一个宕机,生产者仍然能连接集群。 |
buffer_memory |
生产者缓存消息的缓冲区大小,默认为 33 554 432 字节(32MB)。 如果采用异步发送消息,那么生产者启动后会创建一个内存缓冲区用于存放待发送的消息,然后由专属线程发送放在缓冲区的消息。如果生产者要给很多分区发消息,那么需防止参数设置过小而降低吞吐量。 |
compression_type |
是否启用压缩,默认是 none,可选类型为 gzip、lz4 和 snappy。 压缩会降低网络 IO,但是会增加生产者端的 CPU 消耗。Kafka 实例端的压缩设置和生产者的压缩设置不同也会给 Kafka 实例带来重新解压缩和重新压缩的 CPU 负担。 |
retries | 重试次数,即当消息发送失败后会尝试几次重发,默认为 0。一般考虑网络抖动或者分区的 leader 切换,而不是服务器端真的故障,所以我们可以设置为重试3次。 |
retry_backoff_ms | 每次重试间隔多少毫秒,默认为 100 毫秒。 |
max_in_flight_ requests_per_ connection |
生产者会将多个发送请求缓存在内存中,默认可以缓存5个发送请求。 如果我们开启了重试,即设置了 retries 参数,那么可能导致同一分区的消息出现顺序错乱。为了防止这种情况,我们需要把该参数设置为 1,来保障同一分区的消息顺序 |
batch_size |
该参数值默认为 16 384 字节(16KB)。 我们可以将 buffer_memory 看作池子,将 batch_size 看作池子中装有消息的盒子。生产者会把发往同一分区的消息放在一个 batch 中,当 batch 满了就会发送里面的消息,但是不一定非要等到满了才发送。 如果该参数值大,那么生产者吞吐量就高,但是性能会降低,因为盒子太大会占用内存,此时发送的数据量也会大。该参数对调优生产者吞吐量和延迟性能指标有重要的作用。 |
max_request_size | 最大请求大小,可以理解为一条消息的最大大小,默认为 1 048 576 字节。 |
request_timeout_ms | 生产者发送消息后,Kafka 实例需要在规定时间内将处理结果返回给生产者,规定的时间就是该参数控制的,默认为 30 000 毫秒,即 30 秒。如果 Kafka 实例在 30 秒内没有给生产者响应,那么生产者会认为请求超时,并在回调函数中进行特殊处理,或者进行重试。 |
key_serializer | Kafka 在生产者端序列化消息,序列化后,消息才能在网络上传输。该参数就是 key 指定的序列化方式,通常可以指定为 lambda k: json.dumps(k).encode('utf-8')。 |
value_serializer |
该参数指定 value 的序列化方式,通常可以设置为 lambda v: json.dumps(v).encode('utf-8')。 注意,无论是 key 还是value,它们的序列化和反序列化实现都是一样的 |
acks |
Kafka 收到消息的响应数:
|
linger_ms | 逗留时间,即消息不立即发送,而是逗留一定时间后一起发送,默认为 0。有时候消息产生比消息发送快,该参数完美地实现了人工延迟,使得大批量消息可以聚合到一个 batch 里一起发送。 |
4. KafkaConsumer
首先我们需要明确消费者的关键术语,方便后面的理解,如表4所示。关键术语 | 含义 |
---|---|
消费者 | 从 Kafka 中拉取数据并进行处理。 |
消费者组 | 一个消费者组由一个或者多个消费者实例组成。 |
偏移量 | 记录当前分区消费数据的位置。 |
偏移提交(offset commit) | 将消费完成的消息的最大偏移提交确认。 |
偏移主题(_consumer_offset) | 保存消费偏移的主题。 |
Kafka 的消费模式有3种:最多一次、最少一次和正好一次。
1) 最多一次
在这种消费模式下,客户端在收到消息后,处理消息前自动提交,这样 Kafka 将认为消费者已经消费,偏移量增加。具体的实现方法是,设置 enable.auto.commit 为 true,设置 auto.commit.interval.ms 为一个较小的时间间隔,客户端不调用 commitSync(),Kafka 在特定的时间间隔内自动提交。
2) 最少一次
在这种消费模式下,客户端收到消息,先处理消息,再提交。这可能出现在消息处理完,提交前,网络中断或者程序终止的情况,此时 Kafka 认为这个消息还没有被消费者消费,从而产生重复消息推送。具体的实现方法是,设置 enable.auto.commit 为 false,客户端调用 commitSync(),增加消息偏移量。
3) 正好一次
在这种消费模式下,消息处理和提交在同一个事务中,即有原子性。具体的实现方法是,控制消息的偏移量,记录当前的偏移量,对消息的处理和偏移必须保持在同一个事务中,例如在同一个事务中,把消息处理的结果存到 MySQL 数据库并更新此时消息的偏移。
消费者的两种消息处理方式——定时拉取和实时处理示例如代码清单3所示。
代码清单3:consumerDemo
# -*- coding: utf-8 -*- # @Time : 2023/7/27 11:48 上午 # @Project : msgUtil # @File : consumerDemo.py # @Version: Python3.9.8 from Kafka import KafkaConsumer, KafkaProducer import json def consumer_demo(): consumer = KafkaConsumer( 'Kafka_demo', bootstrap_servers=':9092', group_id='test' ) # 实时处理Kafka消息 for message in consumer: print("receive, key: {}, value: {}".format( json.loads(message.key.decode()), json.loads(message.value.decode()) ) ) # 指定拉取数据间隔,定时拉取 # 在特定时候为了性能考虑,需要以固定时间从Kafka中拉取数据列表,这样可以降低服务器端压力 poll_interval = 5000 while True: msgs = consumer.poll(poll_interval, max_records=50) for msg in msgs: print( msgs.get(msg)[0]) # 返回ConsumerRecord对象,可以通过字典的形式获取内容 print(msgs.get(msg)[0].value)
表5列举了一些 KafkaConsumer 初始化时的重要参数,大家可以根据自己的需要有选择地添加参数。
初始化参数 | 含义 |
---|---|
group_id |
标识一个消费者组的名称。高并发量需要多个消费者协作,消费进度由该消费者组统一。 例如,消费者 A 与消费者 B 在初始化时使用同一个 group_id,在进行消费时,一条消息被消费者 A 消费,在 Kafka 中会被标记,这条消息将不会再被 B 消费(前提是A消费后正确提交)。 |
auto_offset_reset | 消费者启动时,消息队列中或许已经有堆积的未消费消息,有时候需求是从上一次未消费消息的位置开始读取(此时该参数应设置为 earliest),有时候需求是读取当前时刻之后产生的未消费消息,之前产生的数据不再消费(此时该参数应设置为 latest)。 |
enable_auto_commit | 是否自动提交,当前消费者消费完消息后,需要提交,才可以将消费完的消息传回消息队列的控制中心,enable_auto_commit 设置为 True 后,消费者将自动提交。 |
auto_commit_interval_ms | 消费者两次自动提交的时间间隔为 auto_commit_interval_ms。 |
key_deserializer | Kafka 反序列化消息在消费端,反序列化后,消息才能被正常解读。该参数指定 key 的反序列化方式,通常可以设置为 lambda k: json.loads(k, encoding="utf-8")。 |
value_deserializer | 该参数指定 value 的反序列化方式,通常可以设置为 lambda v: json.loads(v, encoding="utf-8")。 |
session.timeout.ms | 消费者和群组协调器的最大心跳时间,如果超过该时间,群组协调器将认为该消费者已经死亡或者故障,需要将其从消费者组中删除。 |
max.poll.interval .ms | 一次轮询消息间隔的最大时间。 |
connections.max.idle .ms | 消费者默认和 Kafka 实例建立长连接,当连接的空闲时间超过该参数值,连接将断开,在下一次使用时重新连接。 |
fetch.max.bytes | 消费者端一次拉取数据的最大字节数。 |
max.poll.records | 消费者端一次拉取数据的最大条数。 |
三、封装示例
为了方便日常编写代码,我们封装了简单的 Kafka 功能,以提升工作效率,大家也可以在此基础上扩展或优化。封装的代码内容包括:
- producerKafka 类封装了生产者同步发送消息和异步发送消息(如代码清单4所示);
- consumerKafka 类封装了消费者手动拉取消息和非手动拉取消息(如代码清单5所示)。
代码清单4:producerKafka
# -*- coding: utf-8 -*- # @Time : 2023/7/27 10:41 上午 # @Project : msgUtil # @File : producerKafka.py # @Version: Python3.9.8 import time import random import sys from Kafka import KafkaProducer from Kafka.errors import KafkaError, KafkaTimeoutError import json '''生产者, 一个生产者其实是两个线程,后台有一个IO线程用于真正发送消息出去,前台有一个线程用于把消息发送到本地缓冲区''' class Producer(object): def __init__(self, KafkaServerList=['127.0.0.1:9092'], ClientId="Producer01", Topic='TestTopic'): self._kwargs = { "bootstrap_servers": KafkaServerList, "client_id": ClientId, "acks": 1, "buffer_memory": 33554432, 'compression_type': None, "retries": 3, "batch_size": 1048576, "linger_ms": 100, "key_serializer": lambda m: json.dumps(m).encode('utf-8'), "value_serializer": lambda m: json.dumps(m).encode('utf-8'), } self._topic = Topic try: self._producer = KafkaProducer(**self._kwargs) except Exception as err: print(err) def _onSendSuccess(self, record_metadata): """ 异步发送成功的回调函数,也就是真正发送到Kafka集群且成功才会执行的函数。如果发送到缓冲区,则不会执行回调函数 :param record_metadata: :return: """ print("发送成功") print("被发往的主题:", record_metadata.topic) print("被发往的分区:", record_metadata.partition) print("队列位置:", record_metadata.offset) # 这个偏移量是相对偏移量,也就是相对起止位置,也就是队列偏移量。 def _onSendFailed(self): print("发送失败") # 异步发送数据 def sendMessage_asyn(self, value=None, partition=None): if not value: return None # 发送的消息必须是序列化后的,或者是字节 # message = json.dumps(msg, encoding='utf-8', ensure_ascii=False) kwargs = { "value": value, # value必须为字节或者被序列化为字节,由于之前我们初始化时已经通过value_serializer实现了序列化,因此上面的语句已注释 "key": None, # 与value对应的key,可选,也就是把一个key关联到这个消息上,key相同就会把消息发送到同一分区,所以如果有该设置,则可以设置key,key也需要序列化 "partition": partition # 发送到哪个分区,值为整型。如果不指定分区将会自动分配 } try: # 异步发送,发送到缓冲区,同时注册两个回调函数,一个是发送成功的回调,一个是发送失败的回调。 # send()的返回值是RecordMetadata,即记录的元数据,包括主题、分区和偏移量 future = self._producer.send(self._topic, **kwargs).add_callback(self._onSendSuccess).add_errback( self._onSendFailed) print("发送消息:", value) # 注册回调也可以这样写,上面的写法是为了简化 # future.add_callback(self._onSendSuccess) # future.add_errback(self._onSendFailed) except KafkaTimeoutError as err: print(err) except Exception as err: print(err) def closeConnection(self, timeout=None): # 关闭生产者,可以指定超时时间,即等待关闭成功最多等待多久 self._producer.close(timeout=timeout) def sendNow(self, timeout=None): # 调用flush()方法可以让所有在缓冲区的消息立即发送,即使ligner_ms值大于0 # 此时后台发送消息的线程立即发送消息并且阻塞在这里,等待消息发送成功,当然是否阻塞取决于acks的值。 # 如果不调用flush()方法,那么什么时候发送消息取决于ligner_ms或者batch,满足任意一个条件都会发送。 try: self._producer.flush(timeout=timeout) except KafkaTimeoutError as err: print(err) except Exception as err: print(err) # 同步发送数据 def sendMessage_sync_(self, data): """ 同步发送数据 :param topic: 主题 :param data_li: 发送数据 :return: """ future = self._producer.send(self._topic, data) record_metadata = future.get(timeout=10) # 同步确认消费 partition = record_metadata.partition # 数据所在的分区 offset = record_metadata.offset # 数据所在分区的位置 print("save success, partition: {}, offset: {}".format(partition, offset)) def main(): p = Producer(KafkaServerList=["172.21.26.54:9092"], ClientId="Producer01", Topic="TestTopic") for i in range(10): time.sleep(1) closePrice = random.randint(1, 500) msg = { "Publisher": "Producer01", "股票代码": 60000 + i, "昨日收盘价": closePrice } # p.sendMessage_asyn(value=msg,partition=0) p.sendMessage_sync_(msg) # p.sendNow() p.closeConnection() if __name__ == "__main__": try: main() finally: sys.exit()
代码清单5:consumerKafka
# -*- coding: utf-8 -*- # @Time : 2023/7/27 10:48 上午 # @Project : msgUtil # @File : consumerKafka.py # @Version: Python3.9.8 import sys import traceback from Kafka import KafkaConsumer, TopicPartition import json '''单线程消费者''' class Consumer(object): def __init__(self, KafkaServerList=['172.21.26.54:9092'], GroupID='TestGroup', ClientId="Test", Topics=['TestTopic', ]): """ 用于设置消费者配置项,这些配置项可以从Kafka模块的源代码中找到,下面为必要参数。 :param KafkaServerList: Kafka服务器的IP地址和端口列表 :param GroupID: 消费者组ID :param ClientId: 消费者名称 :param Topic: 主题 """ """ 初始化一个消费者实例,消费者不是线程安全的,所以建议一个线程实现一个消费者,而不是一个消费者让多个线程共享 下面是可选参数,可以在初始化KafkaConsumer实例的时候传送进去 enable_auto_commit表示是否自动提交,默认是true auto_commit_interval_ms表示自动提交间隔的毫秒数 auto_offset_reset="earliest"表示重置偏移量,earliest指移到最早的可用消息,latest指移到最新的消息,默认为latest """ self._kwargs = { "bootstrap_servers": KafkaServerList, "client_id": ClientId, "group_id": GroupID, "enable_auto_commit": False, "auto_offset_reset": "latest", "key_deserializer": lambda m: json.loads(m.decode('utf-8')), "value_deserializer": lambda m: json.loads(m.decode('utf-8')), } try: self._consumer = KafkaConsumer(**self._kwargs) self._consumer.subscribe(topics=(Topics)) except Exception as err: print("Consumer init failed, %s" % err) def consumeMsg(self): try: while True: # 手动拉取消息 data = self._consumer.poll(timeout_ms=5, max_records=100) # 拉取消息,使用字典类型 if data: for key in data: consumerrecord = data.get(key)[0] # 返回ConsumerRecord对象,可以通过字典的形式获取内容。 if consumerrecord != None: # 消息消费逻辑 message = { "Topic": consumerrecord.topic, "Partition": consumerrecord.partition, "Offset": consumerrecord.offset, "Key": consumerrecord.key, "Value": consumerrecord.value } print(message) # 消费逻辑执行完成后提交偏移量 self._consumer.commit() else: print("%s consumerrecord is None." % key) # 非手动拉取消息 ''' for consumerrecord in self._consumer: if consumerrecord: message = { "Topic": consumerrecord.topic, "Partition": consumerrecord.partition, "Offset": consumerrecord.offset, "Key": consumerrecord.key, "Value": consumerrecord.value } print(message) self._consumer.commit() ''' except Exception as err: print(err) # 获取规定数量的数据(可修改为无限、持续地获取数据) def get_message(self, count=1): """ :param topic: topic :param count: 获取数据条数 :return: msg """ counter = 0 msg = [] try: for message in self._consumer: print( "%s:%d:%d: key=%s value=%s header=%s" % ( message.topic, message.partition, message.offset, message.key, message.value, message.headers ) ) msg.append(message.value) counter += 1 if count == counter: break else: continue self._consumer.commit() except Exception as e: print("{0}, {1}".format(e, traceback.print_exc())) return None return msg # 查看剩余量 def get_count(self, topic): """ :param topic: topic :return: count """ try: partitions = [TopicPartition(topic, p) for p in self._consumer.partitions_for_topic(topic)] # print("start to cal offset:") # total toff = self._consumer.end_offsets(partitions) toff = [(key.partition, toff[key]) for key in toff.keys()] toff.sort() # print("total offset: {}".format(str(toff))) # current coff = [(x.partition, self._consumer.committed(x)) for x in partitions] coff.sort() # print("current offset: {}".format(str(coff))) # cal sum and left toff_sum = sum([x[1] for x in toff]) cur_sum = sum([x[1] for x in coff if x[1] is not None]) left_sum = toff_sum - cur_sum # print("Kafka left: {}".format(left_sum)) except Exception as e: print("{0}, {1}".format(e, traceback.print_exc())) return None return left_sum def closeConnection(self): # 关闭消费者 self._consumer.close() def main(): try: c = Consumer(KafkaServerList=['172.21.26.54:9092'], Topics=['TestTopic']) # c.consumeMsg() c.get_message(2) print(c.get_count('TestTopic')) except Exception as err: print(err) if __name__ == "__main__": try: main() finally: sys.exit()
生产者和消费者的简易示例,首先我们执行 consumerKafka,启动消费并进行监听,然后启动 producerKafka 生成消息,生产者消息情况如图7所示,消费者消息情况如图8所示。
图7:生产者消息情况
图8:消费者消息情况
在部署 Kafka 的服务器上执行以下命令,就可以查看目前的消费队列和消息堆积情况:
./kafka-consumer-groups.sh --bootstrap-server 172.21.26.54:9092 --describe --group TestGroup
其中,LOG-END-OFFSET 表示下一条被加入日志的消息的偏移;CURRENT-OFFSET 表示当前消费的偏移;LAG 表示消息堆积量,即消息队列服务器端留存的消息与消费掉的消息之间的差值,如图9所示。图9:Kafka 消息队列信息
四、Kafka 常见命令
Kafka 的常见命令行操作如下。1) 创建主题:
kafka-topics.sh --create --zookeeper master:2181/Kafka2 --replication-factor 2 --partitions 3 --topic mydemo5
2) 列出主题:
kafka-topics.sh --list --zookeeper master:2181/Kafka2
3) 查看主题描述:
kafka-topics.sh --describe --zookeeper master:2181/Kafka2 --topic mydemo5
4) 生产者生产消息:
kafka-console-producer.sh --broker-list master:9092 --topic mydemo5
5) 消费者消费消息并指定消费者组名:
kafka-console-consumer.sh --bootstrap-server master:9092,node01:9092,node02:9092 --new-consumer --consumer-property group.id=test_Kafka_game_x_g1 --topic mydemo5
6) 查看正在运行的消费者组:
kafka-consumer-groups.sh --bootstrap-server master:9092 --list --new-consumer
7) 计算消息的消息堆积情况:
kafka-consumer-groups.sh --bootstrap-server master:9092 --describe --group test_Kafka_game_x_g