架构师_程序员

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 263|回复: 1

[插件库] 消息队列收发消息Demo

[复制链接]
跳转到指定楼层
楼主
发表于 2019-5-30 22:58:30
zu



前言


项目中,有些地方为了保证信息处理的准确性,会使用到消息队列。比如:高并发情况下,网站进行数据处理。今天本文记录RabbitMQ简单的一个demo。在进入本文之前,RbMQ环境必须提前安装好(因为我本机RbMQ环境之前安装好了,怕卸载造成一些注册表清不干净的问题,这里就不记录环境的安装)。



环境&工具



>Windows 10 系统
>RabbitMQ环境服务
>VS 2017



正文



1:新建两个控制台应用程序,分别为 “RabbitMQClient” 和 “RabbitMQServer”





2:引用 “RabbitMQ.Client” dll

3:在RabbitMQClient 项目,Program.cs中,贴入以下代码

  1. using RabbitMQ.Client;
  2. using RabbitMQ.Client.Events;
  3. using System;
  4. using System.Text;

  5. namespace RabbitMQClientt
  6. {
  7.     class Program
  8.     {
  9.         private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory()
  10.         {
  11.             HostName = "localhost",
  12.             UserName = "guest",
  13.             Password = "guest",
  14.             Port = 5672
  15.         };

  16.         static void Main(string[] args)
  17.         {
  18.             Worker();
  19.         }


  20.         private static void Worker()
  21.         {
  22.             string exchangeName = "SendEmail.exchange";
  23.             const string queueName = "SendEmail.queue";
  24.             using (IConnection conn = rabbitMqFactory.CreateConnection())
  25.             {
  26.                 using (IModel channel = conn.CreateModel())
  27.                 {
  28.                     //2 定义一个exchange
  29.                     channel.ExchangeDeclare(exchangeName, "direct", durable: true, autoDelete: false, arguments: null);

  30.                     //4 定义两个queue
  31.                     channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);

  32.                     channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

  33.                     Console.WriteLine(" [*] Waiting for messages.");

  34.                     var consumer = new EventingBasicConsumer(channel);

  35.                     consumer.Received += (model, ea) =>
  36.                     {
  37.                         var msgBody = Encoding.UTF8.GetString(ea.Body);
  38.                         Console.WriteLine(string.Format("接收时间:{0},消息内容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody));
  39.                         channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
  40.                     };
  41.                     channel.BasicConsume(queueName, autoAck:false, consumer: consumer);

  42.                     Console.WriteLine("按任意值,退出程序");
  43.                     Console.ReadKey();
  44.                 }
  45.             }
  46.         }
  47.     }
  48. }
复制代码


4:在RabbitMQServer项目,Program.cs中,贴入以下代码

  1. using RabbitMQ.Client;
  2. using System;
  3. using System.Text;

  4. namespace RabbitMQServer
  5. {
  6.     class Program
  7.     {
  8.         private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory()
  9.         {
  10.             HostName = "localhost",
  11.             UserName = "guest",
  12.             Password = "guest",
  13.             Port = 5672
  14.         };

  15.         static void Main(string[] args)
  16.         {
  17.             CommonSample();
  18.         }

  19.         public static void SendMessage(IModel channel,IBasicProperties props,String exchangeName, String queueName,  String message)
  20.         {
  21.             var msgBody = Encoding.UTF8.GetBytes(message);
  22.             channel.BasicPublish(exchange: exchangeName, routingKey: queueName, basicProperties: props, body: msgBody);
  23.             Console.WriteLine(string.Format("发送完成,发送时间:{0}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")));
  24.         }

  25.         private static void CommonSample()
  26.         {
  27.             string exchangeName = "SendEmail.exchange";
  28.             const string queueName = "SendEmail.queue";
  29.             using (IConnection conn = rabbitMqFactory.CreateConnection())
  30.             using (IModel channel = conn.CreateModel())
  31.             {

  32.                 //2 定义一个exchange
  33.                 channel.ExchangeDeclare(exchangeName, "direct", durable: true, autoDelete: false, arguments: null);

  34.                 //4 定义两个queue
  35.                 channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);

  36.                 //3 定义exchange到queue的binding
  37.                 channel.QueueBind(queueName, exchangeName, routingKey: queueName);
  38.                 var props = channel.CreateBasicProperties();
  39.                 props.Persistent = true;

  40.                 ConsoleKeyInfo kinfo;
  41.                 StringBuilder result = new StringBuilder();
  42.                 do
  43.                 {

  44.                     kinfo = Console.ReadKey();

  45.                     if (kinfo.Key == ConsoleKey.Enter)
  46.                     {
  47.                         String content = result.ToString();
  48.                         Console.WriteLine();
  49.                         Console.WriteLine($"您输入的是:{content}");
  50.                         SendMessage(channel, props, exchangeName, queueName, content);
  51.                         result.Clear();
  52.                     }
  53.                     else
  54.                     {
  55.                         result.Append(kinfo.KeyChar);
  56.                     }
  57.                 } while (kinfo.Key != ConsoleKey.Escape);
  58.             }
  59.         }
  60.     }
  61. }
复制代码


5:F5 启动 RabbitMQServer 项目,在控制台中,输入信息,回车。



6:不要关闭 RabbitMQServer 项目控制台,右击  RabbitMQClient  项目——> “调试”——>“启动新实例”






可在右边 RabbitMQServer 控制台里面输入信息,即可显示在左边。RabbitMQServer 项目进行消息的生产。RabbitMQClient项目则对消息进行处理(消费)。这就是消息队列中的生产者和消费者。代码里还涉及到路由,信道。

在消息生产者这边,把要处理的信息丢到消息队列中。消费者这边在接收到消息后,可对这条数据进行相应的处理。

环境安装好的话,本地默认的端口地址应该是 http://localhost:15672/#/queues。直接 guest 账户访问,也可自己敲命令行配置账户和相应的权限。



刚才在RabbitMQServer 控制台输入的信息,其实都先存在 消息队列中,可先将 RabbitMQClient 窗口关闭,再在窗口输入几条信息,观察消息队列,可看到队列中有三条数据。



当我们RabbitMQClient  窗口启动,消息在正常接收后,会自动在消息队列中消失。这个是消息队列的确认机制,一旦准确接收到了我们需要的信息,消费者这边会向消息队列返回一个状态,用来销毁这条消息。如果消费者没有接收到消息,或者出现异常,没有返回这个状态值,那么这条消息将会一直在队列中,直到消息被正常的消费,这样确保了消息的准确性。



尾声




很遗憾,在项目中我用到消息队列的地方很少。没有进行深入的了解,只会基础的使用。

上面demo 里面有一个坑,在RabbitMQClient 里面 RbMQ 的连接 套了一层 using。因为是异步线程,这样会造成程序跑的时候,还没接收完消息,就把RbMQ Connection 给回收了,会报错。处理方法是:去掉using,手动释放。













上一篇:"npm ERR! Error: EPERM: operation not permitted"问题解决
下一篇:九种常规的视觉软件介绍
帖子永久地址: 

架构师_程序员 - 论坛版权1、本主题所有言论和图片纯属会员个人意见,与本论坛立场无关
2、本站所有主题由该帖子作者发表,该帖子作者与架构师_程序员享有帖子相关版权
3、其他单位或个人使用、转载或引用本文时必须同时征得该帖子作者和架构师_程序员的同意
4、帖子作者须承担一切因本文发表而直接或间接导致的民事或刑事法律责任
5、本帖部分内容转载自其它媒体,但并不代表本站赞同其观点和对其真实性负责
6、如本帖侵犯到任何版权问题,请立即告知本站,本站将及时予与删除并致以最深的歉意
7、架构师_程序员管理员和版主有权不事先通知发贴者而删除本文

码农网,只发表在实践过程中,遇到的技术难题,不误导他人。
沙发
发表于 2019-5-31 09:12:21
最好把文章的 demo 源码,上传到:https://down.itsvse.com/
码农网,只发表在实践过程中,遇到的技术难题,不误导他人。
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则

免责声明:
码农网所发布的一切软件、编程资料或者文章仅限用于学习和研究目的;不得将上述内容用于商业或者非法用途,否则,一切后果请用户自负。本站信息来自网络,版权争议与本站无关。您必须在下载后的24个小时之内,从您的电脑中彻底删除上述内容。如果您喜欢该程序,请支持正版软件,购买注册,得到更好的正版服务。

Mail To:help@itsvse.com

QQ|Archiver|手机版|小黑屋|架构师 ( 鲁ICP备14021824号-2 )|网站地图

GMT+8, 2019-9-19 01:59

Powered by Discuz! X3.4

© 2001-2017 Comsenz Inc.

快速回复 返回顶部 返回列表