架构师_程序员_码农网

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 50163|回复: 17

[资料] .NET/C# 消息队列之 Kafka 操作[附源码]

[复制链接]
发表于 2021-4-13 11:45:31 | 显示全部楼层 |阅读模式
Kafka 是LinkedIn 开发的一个高性能、分布式的消息系统,广泛用于日志收集、流式数据处理、在线和离线消息分发等场景。虽然不是作为传统的MQ来设计,在大部分情况,Kafaka 也可以代替原先ActiveMQ 等传统的消息系统。

Kafka 将消息流按Topic 组织,保存消息的服务器称为Broker,消费者可以订阅一个或者多个Topic。为了均衡负载,一个Topic 的消息又可以划分到多个分区(Partition),分区越多,Kafka并行能力和吞吐量越高。

Kafka 集群需要zookeeper 支持来实现集群,最新的kafka 发行包中已经包含了zookeeper,部署的时候可以在一台服务器上同时启动一个zookeeper Server 和 一个Kafka Server,也可以使用已有的其他zookeeper集群。

和传统的MQ不同,消费者需要自己保留一个offset,从kafka 获取消息时,只拉去当前offset 以后的消息。Kafka 的scala/java 版的client 已经实现了这部分的逻辑,将offset 保存到zookeeper 上。每个消费者可以选择一个id,同样id 的消费者对于同一条消息只会收到一次。一个Topic 的消费者如果都使用相同的id,就是传统的 Queue;如果每个消费者都使用不同的id, 就是传统的pub-sub.

回顾:

Windows下将ActiveMQ添加到系统服务
https://www.itsvse.com/thread-6210-1-1.html

activemq控制面板里的NumberOfPendingMessages、MessagesEnqueued、Messag...
https://www.itsvse.com/thread-4954-1-1.html

关于activemq和rabbitmq资料汇总
https://www.itsvse.com/thread-4659-1-1.html

【实战】centos activemq添加到服务
https://www.itsvse.com/thread-4617-1-1.html

【实战】Centos 6.2 64位安装activemq教程
https://www.itsvse.com/thread-4616-1-1.html

【实战】ActiveMQ5.15.3不能启动,UnsupportedClassVersionError报错
https://www.itsvse.com/thread-4615-1-1.html

activemq 主题topic权限设置
https://www.itsvse.com/thread-4495-1-1.html

User itsvse is not authorized to read from: ActiveMQ.Advisory.TempQueue,Activ...
https://www.itsvse.com/thread-4476-1-1.html

c# activemq客户端订阅源码
https://www.itsvse.com/thread-4470-1-1.html

.net/c# activemq设置连接账号和密码
https://www.itsvse.com/thread-4282-1-1.html

ACTIVEMQ主题、队列设置用户名密码
https://www.itsvse.com/thread-4281-1-1.html

activemq修改网站管理密码
https://www.itsvse.com/thread-4280-1-1.html

activemq Persistent store is Full
https://www.itsvse.com/thread-4125-1-1.html

.net/c# activemq操作示例[源码]
https://www.itsvse.com/thread-3907-1-1.html

Activemq用户权限配置
https://www.itsvse.com/thread-3906-1-1.html

activemq Queue与Topic区别
https://www.itsvse.com/thread-3863-1-1.html

.Net平台下ActiveMQ入门实例
https://www.itsvse.com/thread-3452-1-1.html

ActiveMQ持久订阅设置
https://www.itsvse.com/thread-3451-1-1.html

RabbitMQ BasicQos消费者并行处理限制
https://www.itsvse.com/thread-4667-1-1.html

【实战】rabbitMQ Queue队列消息持久化[附源码]
https://www.itsvse.com/thread-4657-1-1.html

【实战】rabbitMQ控制台添加账户信息
https://www.itsvse.com/thread-4655-1-1.html

深入解析RabbitMQ消息应答ack机制
https://www.itsvse.com/thread-4639-1-1.html

.net/c# RabbitMQ 连接断开处理-断线重连
https://www.itsvse.com/thread-4636-1-1.html

RabbitMQ三种Exchange模式(fanout,direct,topic)介绍
https://www.itsvse.com/thread-4635-1-1.html

【实战】RabbitMQ安装Web管理插件
https://www.itsvse.com/thread-4631-1-1.html

【实战】RabbitMQ在Windows下安装教程
https://www.itsvse.com/thread-4630-1-1.html
kafka 消费

1.相同group_id的消费者,只有一个消费者能够消费到消息(queue 队列模式

2.不同group_id的消费者,接受到的消息都是一样的

Kafka 的优点

分布式可高可扩展。Kafka 集群可以透明的扩展,增加新的服务器进集群。

高性能。Kafka 的性能大大超过传统的ActiveMQ、RabbitMQ等MQ 实现,尤其是Kafka 还支持batch 操作。下图是linkedin 的消费者性能压测结果:

容错。Kafka每个Partition的数据都会复制到几台服务器上。当某个Broker故障失效时,ZooKeeper服务将通知生产者和消费者,生产者和消费者转而使用其它Broker。

Kafka 的缺点

重复消息。Kafka 只保证每个消息至少会送达一次,虽然几率很小,但一条消息有可能会被送达多次。
消息乱序。虽然一个Partition 内部的消息是保证有序的,但是如果一个Topic 有多个Partition,Partition 之间的消息送达不保证有序。
复杂性。Kafka需要zookeeper 集群的支持,Topic通常需要人工来创建,部署和维护较一般消息队列成本更高

.NET/C# 消息队列之 Kafka 操作

首先使用 .NET Core 3.1 新建两个控制台项目,项目名称分别是:Kafka-Consumer(消费者)、Kafka-Producer(生产者)

使用 nuget 引用如下 Confluent.Kafka 包,命令如下:

GitHub 地址:https://github.com/confluentinc/confluent-kafka-dotnet/

我们首先启动 Producer 生产者程序,如果先启动消费者,会报错如下:
Error occured: Broker: Unknown topic or partition

本文将消费设置 EnableAutoOffsetStore 为 false,也就是手动设置偏移存储(类似手动确认消息)

消费者消费完后不设置 OffsetStore

尝试使用生产者生产两条消息,打开消费者消费,MaxPollIntervalMs = 10000 // 10 秒不手动设置,允许别的客户端进行消费,当然 10 秒内不会被其他客户端消费

MaxPollIntervalMs 说明
//对于高级使用者,在调用之间消耗消息的最大允许时间(例如rd_kafka_consumer_poll())。如果超过此时间间隔,则认为该使用者失败了,该组将重新平衡,以便将分区重新分配给另一个使用者组成员。警告:此时可能无法进行偏移提交。注意:建议为长时间处理的应用程序设置“ enable.auto.offset.store = false”,然后在消息处理后*之后显式存储偏移量(使用offsets_store()),以确保不会自动提交偏移量在处理完成之前。每秒检查一次间隔两次。有关更多信息,请参见KIP-62。

效果图如下:

QQ截图20210413110224.jpg

消费者消费完后设置 OffsetStore

代码

设置完成后,等 10 秒后,还是会接收到最后一条消息,(当消费者连接到broker之后,从offset的位置开始消费)如果设置 c.Commit(cr); 则不会重复接收到最后一条消息。

查看源码

QQ截图20210413112304.jpg

commit 操作会将 Offset + 1 进行提交,最终都会调用 Librdkafka.topic_partition_list_destroy(cOffsets);

https://github.com/confluentinc/confluent-kafka-dotnet/blob/dddfb9313fc9b01c16c81ccd28766218b60c3900/src/Confluent.Kafka/Consumer.cs
https://github.com/confluentinc/confluent-kafka-dotnet/blob/dddfb9313fc9b01c16c81ccd28766218b60c3900/src/Confluent.Kafka/Impl/SafeKafkaHandle.cs

设置不同 GroupId

尝试通过命令行参数设置不同 GroupId,然后通过生产者发送消息,如下图:

QQ截图20210413113003.jpg

clinet1 和 client2 都会收到历史消息,并且生产者发出消息后,两个几乎都会同时收到消息

新的消费者只接收新的消息

如何让新的客户端只接收新的消息,忽略历史数据呢?

设置如下:

如下图:

QQ截图20210413114021.jpg

生产者代码

如下:

消费者代码

如下:

源码下载

游客,如果您要查看本帖隐藏内容请回复






上一篇:.NET/C# 使用腾讯企业邮箱异常:The operation has timed out.
下一篇:NuGet 清除缓存遇到的问题
码农网,只发表在实践过程中,遇到的技术难题,不误导他人。
 楼主| 发表于 2021-4-15 09:31:05 | 显示全部楼层
.net Kafka 客户端断线后,并不会抛出异常,等待网络正常后,会重新连接
%4|1618450028.267|FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Disconnected (after 59926ms in state UP)
%3|1618450028.267|ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Disconnected (after 59926ms in state UP)
%3|1618450028.267|FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 failed: Unknown error (after 0ms in state CONNECT)
%3|1618450028.268|ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 failed: Unknown error (after 0ms in state CONNECT)
%3|1618450028.357|FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 failed: Unknown error (after 10ms in state CONNECT, 1 identical error(s) suppressed)
%3|1618450028.357|ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 failed: Unknown error (after 10ms in state CONNECT, 1 identical error(s) suppressed)
%3|1618450062.882|FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 failed: Unknown error (after 0ms in state CONNECT, 8 identical error(s) suppressed)
%3|1618450062.882|ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 failed: Unknown error (after 0ms in state CONNECT, 8 identical error(s) suppressed)
%3|1618450098.255|FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 failed: Unknown error (after 11ms in state CONNECT, 4 identical error(s) suppressed)
%3|1618450098.255|ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 failed: Unknown error (after 11ms in state CONNECT, 4 identical error(s) suppressed)
%3|1618450138.243|FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 failed: Unknown error (after 0ms in state CONNECT, 4 identical error(s) suppressed)
%3|1618450138.244|ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 failed: Unknown error (after 0ms in state CONNECT, 4 identical error(s) suppressed)
%3|1618450168.254|FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 failed: Unknown error (after 10ms in state CONNECT, 3 identical error(s) suppressed)
%3|1618450168.254|ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 failed: Unknown error (after 10ms in state CONNECT, 3 identical error(s) suppressed)
QQ截图20210415092941.jpg
码农网,只发表在实践过程中,遇到的技术难题,不误导他人。
 楼主| 发表于 2021-4-13 16:26:58 | 显示全部楼层
消息的消费原理:

在实际生产过程中,每个 topic 都会有多个 partitions,多个 partitions 的好处在于,一方面能够对 broker 上的数据进行分片有效减少了消息的容量从而提升 io 性能。另外一方面,为了提高消费端的消费能力,一般会通过多个consumer 去消费同一个 topic ,也就是消费端的负载均衡机制,也就是我们接下来要了解的,在多个 partition 以及多个 consumer 的情况下,消费者是如何消费消息的?kafka 存在 consumer group的 概 念 , 也 就是 group.id 一 样 的 consumer ,这些consumer 属于一个 consumer group,组内的所有消费者协调在一起来消费订阅主题的所有分区。当然每一个分区只能由同一个消费组内的 consumer 来消费,那么同一个consumer group 里面的 consumer 是怎么去分配该消费哪个分区里的数据的呢?举个简单的例子就是如果存在的分区输,即partiton的数量于comsumer数量一致的时候,每个comsumer对应一个分区,如果comsumer数量多于分区,那么多出来的数量的comsumer将不工作,相反则是其中将会有comsumer消费多个分区。

分区分配策略:

在 kafka 中,存在两种分区分配策略,一种是 Range(默认)、另 一 种 另 一 种 还 是 RoundRobin ( 轮 询 )。 通 过comsumer的配置partition.assignment.strategy 这个参数来设置。


查看所有 topic


查看某一个 topic 详情


QQ截图20210413162645.jpg

码农网,只发表在实践过程中,遇到的技术难题,不误导他人。
 楼主| 发表于 2021-5-8 17:17:33 | 显示全部楼层
Kafka 删除消费组


QQ截图20210508171602.jpg
Deletion of requested consumer groups ('itsvse') was successful.


有可能报错如下:

Error: Deletion of some consumer groups failed:
* Group 'itsvse' could not be deleted due to: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty.
解决方案

消费完所有消息,或则设置偏移量

Kafka 手动设置 offset 偏移量
https://www.itsvse.com/thread-9641-1-1.html
然后,再次删除!

码农网,只发表在实践过程中,遇到的技术难题,不误导他人。
 楼主| 发表于 2021-4-13 15:40:48 | 显示全部楼层
power shell 命令

QQ截图20210413153947.jpg

每个消费者客户端会和Kafka服务保持2个连接
码农网,只发表在实践过程中,遇到的技术难题,不误导他人。
 楼主| 发表于 2021-5-7 12:37:06 | 显示全部楼层
kafka,查看指定group下topic的堆积数量

码农网,只发表在实践过程中,遇到的技术难题,不误导他人。
发表于 2021-6-16 12:41:09 | 显示全部楼层
请问下为什么代码无法查看的~
码农网,只发表在实践过程中,遇到的技术难题,不误导他人。
 楼主| 发表于 2021-6-25 10:50:06 | 显示全部楼层
Kafka  获取 topic size 命令:



QQ截图20210625104953.jpg
码农网,只发表在实践过程中,遇到的技术难题,不误导他人。
 楼主| 发表于 2021-7-18 10:15:01 | 显示全部楼层
Kafka  命令行创建 topic:

码农网,只发表在实践过程中,遇到的技术难题,不误导他人。
发表于 2021-9-3 11:52:41 | 显示全部楼层
kafka还有不少坑,学习了
码农网,只发表在实践过程中,遇到的技术难题,不误导他人。
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则

免责声明:
码农网所发布的一切软件、编程资料或者文章仅限用于学习和研究目的;不得将上述内容用于商业或者非法用途,否则,一切后果请用户自负。本站信息来自网络,版权争议与本站无关。您必须在下载后的24个小时之内,从您的电脑中彻底删除上述内容。如果您喜欢该程序,请支持正版软件,购买注册,得到更好的正版服务。如有侵权请邮件与我们联系处理。

Mail To:help@itsvse.com

QQ|手机版|小黑屋|架构师 ( 鲁ICP备14021824号-2 )|网站地图

GMT+8, 2024-3-29 22:36

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表