|
因为项目需要一些数据得推送和接收,因为数据量比较大,一天大概3百万以上的数据
一开始采用http post的方式来提交数据,发现太慢了,而且发送的速度远远小于接收的速度,
所以,想用socket长连接来推送和接收数据,自己写socket服务端和客户端吧,一是浪费大量时间,而是可能做的不太好
最后,采用activemq来推送和接收数据
首先,生产者代码如下:
消费者,有两种模式:
1:普通订阅,不需要持久“订阅”,也就是在连接的时候,能收到消息,连接之前的消息,就收不到了,用到的是“CreateConsumer”
2:持久订阅,连接之后,能收到之前未推送过来的数据,用到的是“CreateDurableConsumer”
有关CreateDurableConsumer介绍的链接:http://docs.oracle.com/javaee/7/api/javax/jms/Session.html#createDurableConsumer-javax.jms.Topic-java.lang.String-
在指定的主题上创建非共享持久订阅(如果尚不存在),并在该持久订阅上创建一个消费者。此方法创建持久订阅而不使用消息选择器,noLocal值为false。
应用程序使用持久订阅,该应用程序需要接收在主题上发布的所有消息,包括当没有与之相关联的活动消费者时发布的消息。 JMS提供程序保留此持久订阅的记录,并确保主题发布者的所有邮件都将被保留,直到它们被传递给消费者并由该持久订阅确认,直到它们已过期为止。
持久订阅将继续累积邮件,直到使用取消订阅方式删除邮件。
此方法只能与非共享持久订阅一起使用。使用此方法创建的任何持久订阅将被取消共享。这意味着一次只能存在订阅上的一个活动(即不关闭)消费者。术语“消费者”这里表示任何客户端中的TopicSubscriber,MessageConsumer或JMSConsumer对象。
非共享持久订阅由客户端指定的名称和必须设置的客户端标识符来标识。随后希望在非共享持久订阅上创建消费者的应用程序必须使用相同的客户端标识符。
如果已经存在具有相同名称和客户端标识符的未共享持久订阅,并且已经指定了相同的主题,消息选择器和noLocal值,并且持久订阅上没有消费者已经活动(即未关闭),则此方法创建一个MessageConsumer上现有的耐用订阅。
如果已经存在具有相同名称和客户端标识符的非共享持久订阅,并且持久订阅上有消费者已经活动(即未关闭),则将抛出JMSException。
如果已经存在具有相同名称和客户端标识符但不同主题的非共享持久订阅,则已经指定了消息选择器或noLocal值,并且持久订阅上没有消费者已经活动(即未关闭),那么这等同于取消订阅(删除)旧的并创建一个新的。
共享持久订阅和非共享持久订阅可能不具有相同的名称和客户端标识。如果已经存在具有相同名称和客户端标识符的共享持久订阅,则抛出JMSException。
持久订阅和具有相同名称和clientId的共享非持久订阅没有限制。这样的订阅将是完全分开的。
此方法与相应的createDurableSubscriber方法相同,除了它返回一个MessageConsumer而不是TopicSubscriber来表示消费者。
参数:
主题 - 非临时主题订阅
name - 用于标识此订阅的名称
抛出:
InvalidDestinationException - 如果指定了无效主题。
IllegalStateException - 如果客户机标识符未设置
JMSException -
如果会话由于某些内部错误而无法创建非共享持久订阅和MessageConsumer
如果已经存在具有相同名称和客户端标识符的非共享持久订阅,并且存在消费者已经活动
如果已经存在具有相同名称和客户端标识符的共享持久订阅
以来:
JMS 2.0
IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal);
其中messageSelector为消息选择器;noLocal标志默认为false,当设置为true时限制消费者只能接收和自己相同的连接(Connection)所发布的消息,此标志只适用于主题,不适用于队列;name标识订阅主题所对应的自己的唯一标识,持久订阅时需要设置此参数。
activemq .net需要引用的包下载:http://archive.apache.org/dist/activemq/apache-nms/1.7.0/
完整的源码下载:
|
评分
-
查看全部评分
上一篇:Activemq用户权限配置下一篇:Unknown: Input variables exceeded 1000. To increase the limit change max_inpu...
|