C#中关于ActiveMQ的应用详解

activemq是个好东东,不必多说。activemq提供多种语言支持,如java, c, c++, c#, ruby, perl, python, php等。由于我在windows下开发gui,比较关心c++和c#,其中c#的activemq很简单,apache提供nms(.net messaging service)支持.net开发,只需如下几个步骤即能建立简单的实现。c++的应用相对麻烦些,稍后写文章介绍。

1、去ActiveMQ官方网站下载最新版的ActiveMQ,我之前下的是5.3.1,5.3.2现在也已经出来了。

2、去ActiveMQ官方网站下载最新版的Apache.NMS,需要下载Apache.NMS和Apache.NMS.ActiveMQ两个bin包,如果对源码感兴趣,也可下载src包。这里要提醒一下,如果下载1.2.0版本的NMS.ActiveMQ,Apache.NMS.ActiveMQ.dll在实际使用中有个bug,即停止ActiveMQ应用时会抛WaitOne函数异常,查看src包中的源码发现是由于Apache.NMS.ActiveMQ-1.2.0-src\src\main\csharp\Transport\InactivityMonitor.cs中的如下代码造成的,修改一下源码重新编译即可。看了一下最新版1.3.0已经修复了这个bug,因此下载最新版即可。

private void StopMonitorThreads()           {               lock(monitor)               {                   if(monitorStarted.CompareAndSet(true, false))                   {                       AutoResetEvent shutdownEvent = new AutoResetEvent(false);                       // Attempt to wait for the Timers to shutdown, but don't wait                       // forever, if they don't shutdown after two seconds, just quit.                       this.readCheckTimer.Dispose(shutdownEvent);                       shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000));                       this.writeCheckTimer.Dispose(shutdownEvent);                       shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000));                                                       //WaitOne的定义:public virtual bool WaitOne(TimeSpan timeout,bool exitContext)                       this.asyncTasks.Shutdown();                       this.asyncTasks = null;                       this.asyncWriteTask = null;                       this.asyncErrorTask = null;                   }               }           }       private void StopMonitorThreads()         {             lock(monitor)             {                 if(monitorStarted.CompareAndSet(true, false))                 {                     AutoResetEvent shutdownEvent = new AutoResetEvent(false);                    // Attempt to wait for the Timers to shutdown, but don't wait                     // forever, if they don't shutdown after two seconds, just quit.                     this.readCheckTimer.Dispose(shutdownEvent);                     shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000));                     this.writeCheckTimer.Dispose(shutdownEvent);                     shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000));                                                     //WaitOne的定义:public virtual bool WaitOne(TimeSpan timeout,bool exitContext)                     this.asyncTasks.Shutdown();                     this.asyncTasks = null;                     this.asyncWriteTask = null;                     this.asyncErrorTask = null;                 }             }         }

3、运行ActiveMQ,找到ActiveMQ解压后的bin文件夹:…\apache-activemq-5.3.1\bin,执行activemq.bat批处理文件即可启动ActiveMQ服务器,默认端口为61616,这可在配置文件中修改。

4、写C#程序实现ActiveMQ的简单应用。新建C#工程(一个Producter项目和一个Consumer项目),WinForm或Console程序均可,这里建的是Console工程,添加对Apache.NMS.dll和Apache.NMS.ActiveMQ.dll的引用,然后即可编写实现代码了,简单的Producer和Consumer实现代码如下:

producer:

using System;   using System.Collections.Generic;   using System.Text;   using Apache.NMS;   using Apache.NMS.ActiveMQ;   using System.IO;   using System.Xml.Serialization;   using System.Runtime.Serialization.Formatters.Binary;   namespace Publish   {       class Program       {           static void Main(string[] args)           {               try              {                   //Create the Connection Factory                   IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");                   using (IConnection connection = factory.CreateConnection())                   {                       //Create the Session                       using (ISession session = connection.CreateSession())                       {                           //Create the Producer for the topic/queue                           IMessageProducer prod = session.CreateProducer(                               new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing"));                           //Send Messages                           int i = 0;                           while (!Console.KeyAvailable)                           {                               ITextMessage msg = prod.CreateTextMessage();                               msg.Text = i.ToString();                               Console.WriteLine("Sending: " + i.ToString());                               prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue);                               System.Threading.Thread.Sleep(5000);                               i++;                           }                       }                   }                   Console.ReadLine();              }               catch (System.Exception e)               {                   Console.WriteLine("{0}",e.Message);                   Console.ReadLine();               }           }       }   }

consumer:

using System;   using System.Collections.Generic;   using System.Text;   using Apache.NMS;   using Apache.NMS.ActiveMQ;   using System.IO;   using System.Xml.Serialization;   using System.Runtime.Serialization.Formatters.Binary;   namespace Subscribe   {       class Program       {           static void Main(string[] args)           {               try              {                   //Create the Connection factory                   IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");                   //Create the connection                   using (IConnection connection = factory.CreateConnection())                   {                       connection.ClientId = "testing listener";                       connection.Start();                       //Create the Session                       using (ISession session = connection.CreateSession())                       {                           //Create the Consumer                           IMessageConsumer consumer = session.CreateDurableConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing"), "testing listener", null, false);                           consumer.Listener += new MessageListener(consumer_Listener);                           Console.ReadLine();                       }                       connection.Stop();                       connection.Close();                   }               }               catch (System.Exception e)               {                   Console.WriteLine(e.Message);               }           }           static void consumer_Listener(IMessage message)           {               try              {                   ITextMessage msg = (ITextMessage)message;                   Console.WriteLine("Receive: " + msg.Text);              }               catch (System.Exception e)               {                   Console.WriteLine(e.Message);               }           }       }   }

程序实现的功能:生产者producer建立名为testing的主题,并每隔5秒向该主题发送消息,消费者consumer订阅了testing主题,因此只要生产者发送testing主题的消息到ActiveMQ服务器,服务器就将该消息发送给订阅了testing主题的消费者。

编译生成producer.exe和consumer.exe,并执行两个exe,即可看到消息的发送与接收了。

这个例子是建的主题(Topic),ActiveMQ还支持另一种方式:Queue,即P2P,两者有什么区别呢?区别在于,Topic是广播,即如果某个Topic被多个消费者订阅,那么只要有消息到达服务器,服务器就将该消息发给全部的消费者;而Queue是点到点,即一个消息只能发给一个消费者,如果某个Queue被多个消费者订阅,没有特殊情况的话消息会一个一个地轮流发给不同的消费者,比如:

msg1–>consumer A

msg2–>consumer B

msg3–>consumer C

msg4–>consumer A

msg5–>consumer B

msg6–>consumer C

特殊情况是指:ActiveMQ支持过滤机制,即生产者可以设置消息的属性(Properties),该属性与消费者端的Selector对应,只有消费者设置的selector与消息的Properties匹配,消息才会发给该消费者。Topic和Queue都支持Selector。

Properties和Selector该如何设置呢?请看如下代码:

producer:

public void SetProperties(){ITextMessage msg = prod.CreateTextMessage();                               msg.Text = i.ToString();                               msg.Properties.SetString("myFilter", "test1");                               Console.WriteLine("Sending: " + i.ToString());                               prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue);  ITextMessage msg = prod.CreateTextMessage();                             msg.Text = i.ToString();                             msg.Properties.SetString("myFilter", "test1");                             Console.WriteLine("Sending: " + i.ToString());                             prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue);}

consumer:

public void SetSelector(){//生成consumer时通过参数设置Selector   IMessageConsumer consumer = session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("testing"), "myFilter='test1'");  //生成consumer时通过参数设置Selector IMessageConsumer consumer = session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("testing"), "myFilter='test1'");}

以上就是C#中关于ActiveMQ的应用详解的详细内容,更多请关注创想鸟其它相关文章!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/1432608.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2025年12月17日 08:27:32
下一篇 2025年12月14日 13:44:47

相关推荐

  • .net中关于异步性能测试的示例代码

    很久没有写博客了,今年做的产品公司这两天刚刚开了发布会,稍微清闲下来,想想我们做的产品还有没有性能优化空间,于是想到了.net的异步可以优化性能,但到底能够提升多大的比例呢?恰好有一个朋友正在做各种语言的异步性能测试(有关异步和同步的问题,请参考客《aio与bio接口性能对比》),于是我今天写了一个…

    2025年12月17日 好文分享
    000
  • .NET Core中遇到的一些坑的图文详解

     最近.net core升级到2.0后开始慢慢捣鼓的多了起来,但遇到了不少坑,所以特来记录下。 第一个坑  条件编译符   我们在编写一些方法的时候通常会为Debug模式增加一些输出日志等以便我们检查,也会为Release模式增加或修改一些特定的参数,但今天我在写这些的时候就遇到了这个坑#if !D…

    2025年12月17日 好文分享
    000
  • C#实现表格隔行换色

    这篇文章主要介绍了c# 根据表格偶数、奇数加载不同颜色,需要的朋友可以参考下 效果图:        //偶数随机 Random evenRanm = new Random(); //奇数随机 Random oddRanm = new Random(); string[] listColor = n…

    2025年12月17日
    000
  • C#中关于表达式树的简单介绍

    表达式树可以说是linq的核心之一,为什么是linq的核心之一呢?因为表达式树使得c#不再是仅仅能编译成il,我们可以通过c#生成一个表达式树,将结果作为一个中间格式,在将其转换成目标平台上的本机语言。比如sql。我们常用的linq to sql就是这样生成sql的。 表达式树是.NET 3.5之后…

    2025年12月17日
    000
  • C# WinForm跨线程访问控件的图文详解

     问题出现:  在WinForm 处理多线程访问主线程的控件时候,就会出现如图所示的错误对话框:          解决方案:      方案一:去掉线程访问主线程UI控件的安全检查,使用: Control.CheckForIllegalCrossThreadCalls = false;    方案…

    2025年12月17日 好文分享
    000
  • C#中VB.NET给Word文档添加/撤销书签的实例

    在现代办公环境中,阅读或者编辑较长篇幅的word文档时,想要在文档中某一处或者几处留下标记,方便日后查找、修改时,需要在相对应的文档位置插入书签。那对于开发者而言,在c#或者vb.net语言环境中,如何来快速、简便的插入书签呢,我分享一下我的经验。这里我用到了一款e-iceblue公司发布的一款免费…

    2025年12月17日
    000
  • .Net实现微信JS-SDK分享功能代码展示

    这篇文章主要介绍了微信js-sdk分享功能的.net实现代码的相关资料,需要的朋友可以参考下 JS-SDK接口是什么? 为了方便开发者实现微信内的网页(基于微信浏览器访问的网页)功能,比如拍照、选图、语音、位置等手机系统的能力,并方便开发者直接使用微信分享、扫一扫等微信特有的能力,微信推出了JS-S…

    2025年12月17日
    000
  • C#如何通过对象属性名修改值的实例

    摘自:csdn 给一个对象属性赋值可以通过PropertyInfo.SetValue()方式进行赋值,但要注意值的类型要与属性保持一致。    创建对象实例的两种方法:  1.  var obj = Assembly.Load(“AssemblyName”).CreateInstance(“Asse…

    好文分享 2025年12月17日
    000
  • C#中引用类型之特例string的详细介绍

        在c#编程的时候经常会使用字符串(string)类型,它也是引用类型,但是处处都不作为引用的用法来使用,实属特例,下来我一一罗列出来,供自己记忆方便:      1)字符串的直接赋值:本身字符串就是引用类型,应该使用  new 对象方法一个实例,但是微软为了方便大家,可以直接定义字符串变量 …

    2025年12月17日
    000
  • C#中关于List的并集与交集以及差集解析

    集合的并集是合并集合的项,如下图所示: List ls1 = new List() { 1,2,3,5,7,9 };List ls2 = new List() { 2,4,6,8,9,10};IEnumerable unionLs = ls1.Union(ls2);foreach (int item…

    2025年12月17日 好文分享
    000
  • C#中Socket框架的使用教程

    最近一个项目因为要用到socket传输问题,所以决定学习一下,将自己学习的内容总结分享出来,下面这篇文章主要给大家介绍了关于c# .net中socket简单实用框架使用的相关资料,文中通过示例代码介绍的非常详细,需要的朋友可以参考下。 前言 一说到Socket,想必大家都或多或少有所涉及,从最初的计…

    2025年12月17日
    000
  • C#编写SqlHelper类的使用详解

    本篇文章主要介绍了使用c#编写sqlhelper类,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧 无聊的周末,学习、编码无力。想找点事干但又不知道干点什么,猛然发现自己学过的SqlHelper快忘记了。于是乎虎躯一震心想怎能如此堕落下去,立马打开电脑,双手摸上键盘。写下…

    好文分享 2025年12月17日
    000
  • .Net中Core使用Socket与树莓派进行通信的实例分析(图文)

    前言 去年买的树莓派一直放在抽屉里吃灰,前些阵子debian 9发布,也不出意外的支持了树莓派。 于是重新拿出读卡器又重新了装上了Debian桌面版系统。 介绍 现在这个东西目前的程度只是了解一下Python和.Net的通信。最佳的版本应该是,可以通过服务器端远程执行树莓派命令。 这样做的原因大家也…

    2025年12月17日
    000
  • .NET中core如何利用Redis发布订阅的实例分析

    本篇文章主要介绍了.net core如何使用redis发布订阅,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧 Redis是一个性能非常强劲的内存数据库,它一般是作为缓存来使用,但是他不仅仅可以用来作为缓存,比如著名的分布式框架dubbo就可以用Redis来做服务注册中心…

    2025年12月17日 好文分享
    000
  • C#中关于反射和dynamic最佳组合的示例分享

    这篇文章主要介绍了c# 反射与dynamic最佳组合示例代码,需要的朋友可以参考下 在 C# 中反射技术应用广泛,至于什么是反射………你如果不了解的话,请看下段说明,否则请跳过下段。广告一下:喜欢我文章的朋友请关注一下我的blog,这也有助于提高本人写作的动力。 …

    2025年12月17日
    000
  • C#如何使用Socket发送HTTP/HTTPS请求实例详解

    这篇文章主要介绍了c#使用socket发送http/https请求的实现代码,需要的朋友可以参考下 C# 自带的HttpWebRequest效率太低,对于自组HTTP封包不好操作。 在写超级SQL注入工具时,研究了很长一段时间如何使用Socket来发送HTTP、HTTPS请求。 经过一年的修改和测试…

    2025年12月17日
    000
  • C#中关于Dictionary的用法详解

    1.要使用Dictionary集合,需要导入C#泛型命名空间 System.Collections.Generic //程序集:mscorlib 2.Dictionary的描述 从一组键(Key)到一组值(Value)的映射,每一个添加项都是由一个值及其相关连的键组成 任何键都必须是唯一的 键不能为…

    2025年12月17日
    000
  • C#如何使用键值对取代Switch…Case语句的示例

    swich….case 条件分支多了之后,会严重的破坏程序的美观性。比如这个 上述代码是用于两个进程之间通信的代码,由于通信的枚举特别的多,所以case的分支特别的多。导致了代码的可读性,可维护性严重下降。经过查找资料和重构,想到了一种可行的在这种情况替代switch…cas…

    2025年12月17日
    000
  • C#中关于Cookies的读取实例详解

    C#中Cookies的读取 链接: 一 、写入Cookie   1. Name 和 Value 属性由程序设定,默认值都是空引用。   2. Domain属性的默认值为当前URL的域名部分,不管发出这个cookie的页面在哪个目录下的。 Domain属性缺省就是www.kent.com ,可以由程序…

    好文分享 2025年12月17日
    000
  • C#异步之APM模式异步程序开发的示例分享

    c#已有10多年历史,单从微软2年一版的更新进度来看活力异常旺盛,c#中的异步编程也经历了多个版本的演化,从今天起着手写一个系列博文,记录一下c#中的异步编程的发展历程。广告一下:喜欢我文章的朋友,请点下面的“关注我”。谢谢 我是2004年接触并使用C#的,那时C#版本为1.1,所以我们就从就那个时…

    2025年12月17日
    000

发表回复

登录后才能评论
关注微信