架构师_程序员_码农网

查看: 683|回复: 7
打印 上一主题 下一主题

[资料] .NET/C# 消息队列之 Kafka 操作[附源码]

[复制链接]
跳转到指定楼层
楼主
发表于 2021-4-13 11:45:31 | 只看该作者
Kafka 是LinkedIn 开发的一个高性能、分布式的消息系统,广泛用于日志收集、流式数据处理、在线和离线消息分发等场景。虽然不是作为传统的MQ来设计,在大部分情况,Kafaka 也可以代替原先ActiveMQ 等传统的消息系统。

Kafka 将消息流按Topic 组织,保存消息的服务器称为Broker,消费者可以订阅一个或者多个Topic。为了均衡负载,一个Topic 的消息又可以划分到多个分区(Partition),分区越多,Kafka并行能力和吞吐量越高。

Kafka 集群需要zookeeper 支持来实现集群,最新的kafka 发行包中已经包含了zookeeper,部署的时候可以在一台服务器上同时启动一个zookeeper Server 和 一个Kafka Server,也可以使用已有的其他zookeeper集群。

和传统的MQ不同,消费者需要自己保留一个offset,从kafka 获取消息时,只拉去当前offset 以后的消息。Kafka 的scala/java 版的client 已经实现了这部分的逻辑,将offset 保存到zookeeper 上。每个消费者可以选择一个id,同样id 的消费者对于同一条消息只会收到一次。一个Topic 的消费者如果都使用相同的id,就是传统的 Queue;如果每个消费者都使用不同的id, 就是传统的pub-sub.

回顾:

Windows下将ActiveMQ添加到系统服务
https://www.itsvse.com/thread-6210-1-1.html

activemq控制面板里的NumberOfPendingMessages、MessagesEnqueued、Messag...
https://www.itsvse.com/thread-4954-1-1.html

关于activemq和rabbitmq资料汇总
https://www.itsvse.com/thread-4659-1-1.html

【实战】centos activemq添加到服务
https://www.itsvse.com/thread-4617-1-1.html

【实战】Centos 6.2 64位安装activemq教程
https://www.itsvse.com/thread-4616-1-1.html

【实战】ActiveMQ5.15.3不能启动,UnsupportedClassVersionError报错
https://www.itsvse.com/thread-4615-1-1.html

activemq 主题topic权限设置
https://www.itsvse.com/thread-4495-1-1.html

User itsvse is not authorized to read from: ActiveMQ.Advisory.TempQueue,Activ...
https://www.itsvse.com/thread-4476-1-1.html

c# activemq客户端订阅源码
https://www.itsvse.com/thread-4470-1-1.html

.net/c# activemq设置连接账号和密码
https://www.itsvse.com/thread-4282-1-1.html

ACTIVEMQ主题、队列设置用户名密码
https://www.itsvse.com/thread-4281-1-1.html

activemq修改网站管理密码
https://www.itsvse.com/thread-4280-1-1.html

activemq Persistent store is Full
https://www.itsvse.com/thread-4125-1-1.html

.net/c# activemq操作示例[源码]
https://www.itsvse.com/thread-3907-1-1.html

Activemq用户权限配置
https://www.itsvse.com/thread-3906-1-1.html

activemq Queue与Topic区别
https://www.itsvse.com/thread-3863-1-1.html

.Net平台下ActiveMQ入门实例
https://www.itsvse.com/thread-3452-1-1.html

ActiveMQ持久订阅设置
https://www.itsvse.com/thread-3451-1-1.html

RabbitMQ BasicQos消费者并行处理限制
https://www.itsvse.com/thread-4667-1-1.html

【实战】rabbitMQ Queue队列消息持久化[附源码]
https://www.itsvse.com/thread-4657-1-1.html

【实战】rabbitMQ控制台添加账户信息
https://www.itsvse.com/thread-4655-1-1.html

深入解析RabbitMQ消息应答ack机制
https://www.itsvse.com/thread-4639-1-1.html

.net/c# RabbitMQ 连接断开处理-断线重连
https://www.itsvse.com/thread-4636-1-1.html

RabbitMQ三种Exchange模式(fanout,direct,topic)介绍
https://www.itsvse.com/thread-4635-1-1.html

【实战】RabbitMQ安装Web管理插件
https://www.itsvse.com/thread-4631-1-1.html

【实战】RabbitMQ在Windows下安装教程
https://www.itsvse.com/thread-4630-1-1.html
kafka 消费

1.相同group_id的消费者,只有一个消费者能够消费到消息(queue 队列模式

2.不同group_id的消费者,接受到的消息都是一样的

Kafka 的优点

分布式可高可扩展。Kafka 集群可以透明的扩展,增加新的服务器进集群。

高性能。Kafka 的性能大大超过传统的ActiveMQ、RabbitMQ等MQ 实现,尤其是Kafka 还支持batch 操作。下图是linkedin 的消费者性能压测结果:

容错。Kafka每个Partition的数据都会复制到几台服务器上。当某个Broker故障失效时,ZooKeeper服务将通知生产者和消费者,生产者和消费者转而使用其它Broker。

Kafka 的缺点

重复消息。Kafka 只保证每个消息至少会送达一次,虽然几率很小,但一条消息有可能会被送达多次。
消息乱序。虽然一个Partition 内部的消息是保证有序的,但是如果一个Topic 有多个Partition,Partition 之间的消息送达不保证有序。
复杂性。Kafka需要zookeeper 集群的支持,Topic通常需要人工来创建,部署和维护较一般消息队列成本更高

.NET/C# 消息队列之 Kafka 操作

首先使用 .NET Core 3.1 新建两个控制台项目,项目名称分别是:Kafka-Consumer(消费者)、Kafka-Producer(生产者)

使用 nuget 引用如下 Confluent.Kafka 包,命令如下:

GitHub 地址:https://github.com/confluentinc/confluent-kafka-dotnet/

我们首先启动 Producer 生产者程序,如果先启动消费者,会报错如下:
Error occured: Broker: Unknown topic or partition

本文将消费设置 EnableAutoOffsetStore 为 false,也就是手动设置偏移存储(类似手动确认消息)

消费者消费完后不设置 OffsetStore

尝试使用生产者生产两条消息,打开消费者消费,MaxPollIntervalMs = 10000 // 10 秒不手动设置,允许别的客户端进行消费,当然 10 秒内不会被其他客户端消费

MaxPollIntervalMs 说明
//对于高级使用者,在调用之间消耗消息的最大允许时间(例如rd_kafka_consumer_poll())。如果超过此时间间隔,则认为该使用者失败了,该组将重新平衡,以便将分区重新分配给另一个使用者组成员。警告:此时可能无法进行偏移提交。注意:建议为长时间处理的应用程序设置“ enable.auto.offset.store = false”,然后在消息处理后*之后显式存储偏移量(使用offsets_store()),以确保不会自动提交偏移量在处理完成之前。每秒检查一次间隔两次。有关更多信息,请参见KIP-62。

效果图如下:



消费者消费完后设置 OffsetStore

代码

设置完成后,等 10 秒后,还是会接收到最后一条消息,(当消费者连接到broker之后,从offset的位置开始消费)如果设置 c.Commit(cr); 则不会重复接收到最后一条消息。

查看源码



commit 操作会将 Offset + 1 进行提交,最终都会调用 Librdkafka.topic_partition_list_destroy(cOffsets);

https://github.com/confluentinc/confluent-kafka-dotnet/blob/dddfb9313fc9b01c16c81ccd28766218b60c3900/src/Confluent.Kafka/Consumer.cs
https://github.com/confluentinc/confluent-kafka-dotnet/blob/dddfb9313fc9b01c16c81ccd28766218b60c3900/src/Confluent.Kafka/Impl/SafeKafkaHandle.cs

设置不同 GroupId

尝试通过命令行参数设置不同 GroupId,然后通过生产者发送消息,如下图:



clinet1 和 client2 都会收到历史消息,并且生产者发出消息后,两个几乎都会同时收到消息

新的消费者只接收新的消息

如何让新的客户端只接收新的消息,忽略历史数据呢?

设置如下:

如下图:



生产者代码

如下:

消费者代码

如下:

源码下载

游客,如果您要查看本帖隐藏内容请回复






上一篇:.NET/C# 使用腾讯企业邮箱异常:The operation has timed out.
下一篇:NuGet 清除缓存遇到的问题
帖子永久地址: 

架构师_程序员_码农网 - 论坛版权1、本主题所有言论和图片纯属会员个人意见,与本论坛立场无关
2、本站所有主题由该帖子作者发表,该帖子作者与架构师_程序员_码农网享有帖子相关版权
3、其他单位或个人使用、转载或引用本文时必须同时征得该帖子作者和架构师_程序员_码农网的同意
4、帖子作者须承担一切因本文发表而直接或间接导致的民事或刑事法律责任
5、本帖部分内容转载自其它媒体,但并不代表本站赞同其观点和对其真实性负责
6、如本帖侵犯到任何版权问题,请立即告知本站,本站将及时予与删除并致以最深的歉意
7、架构师_程序员_码农网管理员和版主有权不事先通知发贴者而删除本文

码农网,只发表在实践过程中,遇到的技术难题,不误导他人。
沙发
 楼主| 发表于 2021-4-13 15:40:48 | 只看该作者
power shell 命令



每个消费者客户端会和Kafka服务保持2个连接
码农网,只发表在实践过程中,遇到的技术难题,不误导他人。
板凳
 楼主| 发表于 2021-4-13 16:26:58 | 只看该作者
消息的消费原理:

在实际生产过程中,每个 topic 都会有多个 partitions,多个 partitions 的好处在于,一方面能够对 broker 上的数据进行分片有效减少了消息的容量从而提升 io 性能。另外一方面,为了提高消费端的消费能力,一般会通过多个consumer 去消费同一个 topic ,也就是消费端的负载均衡机制,也就是我们接下来要了解的,在多个 partition 以及多个 consumer 的情况下,消费者是如何消费消息的?kafka 存在 consumer group的 概 念 , 也 就是 group.id 一 样 的 consumer ,这些consumer 属于一个 consumer group,组内的所有消费者协调在一起来消费订阅主题的所有分区。当然每一个分区只能由同一个消费组内的 consumer 来消费,那么同一个consumer group 里面的 consumer 是怎么去分配该消费哪个分区里的数据的呢?举个简单的例子就是如果存在的分区输,即partiton的数量于comsumer数量一致的时候,每个comsumer对应一个分区,如果comsumer数量多于分区,那么多出来的数量的comsumer将不工作,相反则是其中将会有comsumer消费多个分区。

分区分配策略:

在 kafka 中,存在两种分区分配策略,一种是 Range(默认)、另 一 种 另 一 种 还 是 RoundRobin ( 轮 询 )。 通 过comsumer的配置partition.assignment.strategy 这个参数来设置。


查看所有 topic


查看某一个 topic 详情




码农网,只发表在实践过程中,遇到的技术难题,不误导他人。
地板
 楼主| 发表于 2021-4-15 09:31:05 | 只看该作者
.net Kafka 客户端断线后,并不会抛出异常,等待网络正常后,会重新连接
%4|1618450028.267|FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Disconnected (after 59926ms in state UP)
%3|1618450028.267|ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Disconnected (after 59926ms in state UP)
%3|1618450028.267|FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 failed: Unknown error (after 0ms in state CONNECT)
%3|1618450028.268|ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 failed: Unknown error (after 0ms in state CONNECT)
%3|1618450028.357|FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 failed: Unknown error (after 10ms in state CONNECT, 1 identical error(s) suppressed)
%3|1618450028.357|ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 failed: Unknown error (after 10ms in state CONNECT, 1 identical error(s) suppressed)
%3|1618450062.882|FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 failed: Unknown error (after 0ms in state CONNECT, 8 identical error(s) suppressed)
%3|1618450062.882|ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 failed: Unknown error (after 0ms in state CONNECT, 8 identical error(s) suppressed)
%3|1618450098.255|FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 failed: Unknown error (after 11ms in state CONNECT, 4 identical error(s) suppressed)
%3|1618450098.255|ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 failed: Unknown error (after 11ms in state CONNECT, 4 identical error(s) suppressed)
%3|1618450138.243|FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 failed: Unknown error (after 0ms in state CONNECT, 4 identical error(s) suppressed)
%3|1618450138.244|ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 failed: Unknown error (after 0ms in state CONNECT, 4 identical error(s) suppressed)
%3|1618450168.254|FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 failed: Unknown error (after 10ms in state CONNECT, 3 identical error(s) suppressed)
%3|1618450168.254|ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 failed: Unknown error (after 10ms in state CONNECT, 3 identical error(s) suppressed)

码农网,只发表在实践过程中,遇到的技术难题,不误导他人。
5#
 楼主| 发表于 2021-5-7 12:37:06 | 只看该作者
kafka,查看指定group下topic的堆积数量

码农网,只发表在实践过程中,遇到的技术难题,不误导他人。
6#
 楼主| 发表于 2021-5-8 17:17:33 | 只看该作者
Kafka 删除消费组



Deletion of requested consumer groups ('itsvse') was successful.


有可能报错如下:

Error: Deletion of some consumer groups failed:
* Group 'itsvse' could not be deleted due to: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty.
解决方案

消费完所有消息,或则设置偏移量

Kafka 手动设置 offset 偏移量
https://www.itsvse.com/thread-9641-1-1.html
然后,再次删除!

码农网,只发表在实践过程中,遇到的技术难题,不误导他人。
7#
发表于 2021-6-16 12:41:09 | 只看该作者
请问下为什么代码无法查看的~
码农网,只发表在实践过程中,遇到的技术难题,不误导他人。
8#
 楼主| 发表于 1 小时前 | 只看该作者
Kafka  获取 topic size 命令:



码农网,只发表在实践过程中,遇到的技术难题,不误导他人。
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则

免责声明:
码农网所发布的一切软件、编程资料或者文章仅限用于学习和研究目的;不得将上述内容用于商业或者非法用途,否则,一切后果请用户自负。本站信息来自网络,版权争议与本站无关。您必须在下载后的24个小时之内,从您的电脑中彻底删除上述内容。如果您喜欢该程序,请支持正版软件,购买注册,得到更好的正版服务。如有侵权请邮件与我们联系处理。

Mail To:help@itsvse.com

QQ|手机版|小黑屋|架构师 ( 鲁ICP备14021824号-2 )|网站地图

GMT+8, 2021-6-25 12:26

Powered by Discuz! X3.4

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表