当前位置: 首页 > 编程日记 > 正文

ActiveMQ在C#中的应用

ActiveMQ是个好东东,不必多说。ActiveMQ提供多种语言支持,如Java, C, C++, C#, Ruby, Perl, Python, PHP等。由于我在windows下开发GUI,比较关心C++和C#,其中C#的ActiveMQ很简单,Apache提供NMS(.Net Messaging Service)支持.Net开发,只需如下几个步骤即能建立简单的实现。C++的应用相对麻烦些,稍后写文章介绍。

1、去ActiveMQ官方网站下载最新版的ActiveMQ,网址:http://activemq.apache.org/download.html。我之前下的是5.3.1,5.3.2现在也已经出来了。

2、去ActiveMQ官方网站下载最新版的Apache.NMS,网址:http://activemq.apache.org/nms/download.html,需要下载Apache.NMS和Apache.NMS.ActiveMQ两个bin包,如果对源码感兴趣,也可下载src包。这里要提醒一下,如果下载1.2.0版本的NMS.ActiveMQ,Apache.NMS.ActiveMQ.dll在实际使用中有个bug,即停止ActiveMQ应用时会抛WaitOne函数异常,查看src包中的源码发现是由于Apache.NMS.ActiveMQ-1.2.0-src\src\main\csharp\Transport\InactivityMonitor.cs中的如下代码造成的,修改一下源码重新编译即可。看了一下最新版1.3.0已经修复了这个bug,因此下载最新版即可。

view plaincopy to clipboardprint?
private void StopMonitorThreads()  
        {  
            lock(monitor)  
            {  
                if(monitorStarted.CompareAndSet(true, false))  
                {  
                    AutoResetEvent shutdownEvent = new AutoResetEvent(false);  
                    // Attempt to wait for the Timers to shutdown, but don't wait  
                    // forever, if they don't shutdown after two seconds, just quit.  
                    this.readCheckTimer.Dispose(shutdownEvent);  
                    shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000));  
                    this.writeCheckTimer.Dispose(shutdownEvent);  
                    shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000));  
                                                    //WaitOne的定义:public virtual bool WaitOne(TimeSpan timeout,bool exitContext)  
                    this.asyncTasks.Shutdown();  
                    this.asyncTasks = null;  
                    this.asyncWriteTask = null;  
                    this.asyncErrorTask = null;  
                }  
            }  
        } 
private void StopMonitorThreads()
        {
            lock(monitor)
            {
                if(monitorStarted.CompareAndSet(true, false))
                {
                    AutoResetEvent shutdownEvent = new AutoResetEvent(false);

// Attempt to wait for the Timers to shutdown, but don't wait
                    // forever, if they don't shutdown after two seconds, just quit.
                    this.readCheckTimer.Dispose(shutdownEvent);
                    shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000));
                    this.writeCheckTimer.Dispose(shutdownEvent);
                    shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000));
                                                    //WaitOne的定义:public virtual bool WaitOne(TimeSpan timeout,bool exitContext)
                    this.asyncTasks.Shutdown();
                    this.asyncTasks = null;
                    this.asyncWriteTask = null;
                    this.asyncErrorTask = null;
                }
            }
        }

3、运行ActiveMQ,找到ActiveMQ解压后的bin文件夹:...\apache-activemq-5.3.1\bin,执行activemq.bat批处理文件即可启动ActiveMQ服务器,默认端口为61616,这可在配置文件中修改。

4、写C#程序实现ActiveMQ的简单应用。新建C#工程(一个Producter项目和一个Consumer项目),WinForm或Console程序均可,这里建的是Console工程,添加对Apache.NMS.dll和Apache.NMS.ActiveMQ.dll的引用,然后即可编写实现代码了,简单的Producer和Consumer实现代码如下:

producer:

view plaincopy to clipboardprint?
using System;  
using System.Collections.Generic;  
using System.Text;  
using Apache.NMS;  
using Apache.NMS.ActiveMQ;  
using System.IO;  
using System.Xml.Serialization;  
using System.Runtime.Serialization.Formatters.Binary;  
namespace Publish  
{  
    class Program  
    {  
        static void Main(string[] args)  
        {  
            try 
            {  
                //Create the Connection Factory  
                IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");  
                using (IConnection connection = factory.CreateConnection())  
                {  
                    //Create the Session  
                    using (ISession session = connection.CreateSession())  
                    {  
                        //Create the Producer for the topic/queue  
                        IMessageProducer prod = session.CreateProducer(  
                            new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing"));  
                        //Send Messages  
                        int i = 0;  
                        while (!Console.KeyAvailable)  
                        {  
                            ITextMessage msg = prod.CreateTextMessage();  
                            msg.Text = i.ToString();  
                            Console.WriteLine("Sending: " + i.ToString());  
                            prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue);  
                            System.Threading.Thread.Sleep(5000);  
                            i++;  
                        }  
                    }  
                }  
                Console.ReadLine();  
           }  
            catch (System.Exception e)  
            {  
                Console.WriteLine("{0}",e.Message);  
                Console.ReadLine();  
            }  
        }  
    }  

using System;
using System.Collections.Generic;
using System.Text;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
using System.IO;
using System.Xml.Serialization;
using System.Runtime.Serialization.Formatters.Binary;

namespace Publish
{
    class Program
    {
        static void Main(string[] args)
        {
            try
            {
                //Create the Connection Factory
                IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");
                using (IConnection connection = factory.CreateConnection())
                {
                    //Create the Session
                    using (ISession session = connection.CreateSession())
                    {
                        //Create the Producer for the topic/queue
                        IMessageProducer prod = session.CreateProducer(
                            new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing"));
                        //Send Messages
                        int i = 0;

while (!Console.KeyAvailable)
                        {
                            ITextMessage msg = prod.CreateTextMessage();
                            msg.Text = i.ToString();
                            Console.WriteLine("Sending: " + i.ToString());
                            prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue);

System.Threading.Thread.Sleep(5000);
                            i++;
                        }
                    }
                }

Console.ReadLine();
           }
            catch (System.Exception e)
            {
                Console.WriteLine("{0}",e.Message);
                Console.ReadLine();
            }
        }
    }
}

consumer:

view plaincopy to clipboardprint?
using System;  
using System.Collections.Generic;  
using System.Text;  
using Apache.NMS;  
using Apache.NMS.ActiveMQ;  
using System.IO;  
using System.Xml.Serialization;  
using System.Runtime.Serialization.Formatters.Binary;  
namespace Subscribe  
{  
    class Program  
    {  
        static void Main(string[] args)  
        {  
            try 
            {  
                //Create the Connection factory  
                IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");  
                //Create the connection  
                using (IConnection connection = factory.CreateConnection())  
                {  
                    connection.ClientId = "testing listener";  
                    connection.Start();  
                    //Create the Session  
                    using (ISession session = connection.CreateSession())  
                    {  
                        //Create the Consumer  
                        IMessageConsumer consumer = session.CreateDurableConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing"), "testing listener", null, false);  
                        consumer.Listener += new MessageListener(consumer_Listener);  
                        Console.ReadLine();  
                    }  
                    connection.Stop();  
                    connection.Close();  
                }  
            }  
            catch (System.Exception e)  
            {  
                Console.WriteLine(e.Message);  
            }  
        }  
        static void consumer_Listener(IMessage message)  
        {  
            try 
            {  
                ITextMessage msg = (ITextMessage)message;  
                Console.WriteLine("Receive: " + msg.Text);  
           }  
            catch (System.Exception e)  
            {  
                Console.WriteLine(e.Message);  
            }  
        }  
    }  

using System;
using System.Collections.Generic;
using System.Text;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
using System.IO;
using System.Xml.Serialization;
using System.Runtime.Serialization.Formatters.Binary;

namespace Subscribe
{
    class Program
    {
        static void Main(string[] args)
        {
            try
            {
                //Create the Connection factory
                IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");
                //Create the connection
                using (IConnection connection = factory.CreateConnection())
                {
                    connection.ClientId = "testing listener";
                    connection.Start();

//Create the Session
                    using (ISession session = connection.CreateSession())
                    {
                        //Create the Consumer
                        IMessageConsumer consumer = session.CreateDurableConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing"), "testing listener", null, false);
                        consumer.Listener += new MessageListener(consumer_Listener);

Console.ReadLine();
                    }
                    connection.Stop();
                    connection.Close();
                }
            }
            catch (System.Exception e)
            {
                Console.WriteLine(e.Message);
            }
        }

static void consumer_Listener(IMessage message)
        {
            try
            {
                ITextMessage msg = (ITextMessage)message;
                Console.WriteLine("Receive: " + msg.Text);
           }
            catch (System.Exception e)
            {
                Console.WriteLine(e.Message);
            }
        }
    }
}

程序实现的功能:生产者producer建立名为testing的主题,并每隔5秒向该主题发送消息,消费者consumer订阅了testing主题,因此只要生产者发送testing主题的消息到ActiveMQ服务器,服务器就将该消息发送给订阅了testing主题的消费者。

编译生成producer.exe和consumer.exe,并执行两个exe,即可看到消息的发送与接收了。

这个例子是建的主题(Topic),ActiveMQ还支持另一种方式:Queue,即P2P,两者有什么区别呢?区别在于,Topic是广播,即如果某个Topic被多个消费者订阅,那么只要有消息到达服务器,服务器就将该消息发给全部的消费者;而Queue是点到点,即一个消息只能发给一个消费者,如果某个Queue被多个消费者订阅,没有特殊情况的话消息会一个一个地轮流发给不同的消费者,比如:

msg1-->consumer A

msg2-->consumer B

msg3-->consumer C

msg4-->consumer A

msg5-->consumer B

msg6-->consumer C

特殊情况是指:ActiveMQ支持过滤机制,即生产者可以设置消息的属性(Properties),该属性与消费者端的Selector对应,只有消费者设置的selector与消息的Properties匹配,消息才会发给该消费者。Topic和Queue都支持Selector。

Properties和Selector该如何设置呢?请看如下代码:

producer:

view plaincopy to clipboardprint?
ITextMessage msg = prod.CreateTextMessage();  
                            msg.Text = i.ToString();  
                            msg.Properties.SetString("myFilter", "test1");  
                            Console.WriteLine("Sending: " + i.ToString());  
                            prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue); 
ITextMessage msg = prod.CreateTextMessage();
                            msg.Text = i.ToString();
                            msg.Properties.SetString("myFilter", "test1");
                            Console.WriteLine("Sending: " + i.ToString());
                            prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue);

consumer:

view plaincopy to clipboardprint?
//生成consumer时通过参数设置Selector  
IMessageConsumer consumer = session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("testing"), "myFilter='test1'"); 
//生成consumer时通过参数设置Selector
IMessageConsumer consumer = session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("testing"), "myFilter='test1'");

本文来自CSDN博客,出处:http://blog.csdn.net/bodybo/archive/2010/06/04/5647968.aspx

转载于:https://www.cnblogs.com/guthing/archive/2010/06/17/1759333.html

相关文章:

数据中心防雷SPD技术漫谈

雷电是大自然里一种普遍现象,在世界的任意角落都会有雷电天气出现,只不过数量不同而已。雷电对大地及地面物体的放电现象成为雷击,这种放电过程会产生强烈的闪电并伴随巨大的声音,对被击物体有严重的危害,会在物体的两…

【怎样写代码】确保对象的唯一性 -- 单例模式(六):扩展案例

如果喜欢这里的内容,你能够给我最大的帮助就是转发,告诉你的朋友,鼓励他们一起来学习。 If you like the content here, you can give me the greatest help is forwarding, tell your friends, encourage them to learn together.

Python数据分析可以应用到哪些领域

随着大数据的应用越来越广泛,应用的行业也越来越多,我们每天都可以看到一些关于数据分析的新鲜应用,从而帮助人们获取到有价值的信息。例如,网购时经常发现电商平台向我们推荐商品,往往这类商品都是我们最近浏览的&…

printf(%d, -10u); 这个输出什么呀, 0或1?

printf("%d", -1<0u); 这个输出什么呀, 0或1?周银辉 既然我这么问了, 那么答案自然不是1&#xff0c;而是0看看下面的代码: 对于-10u输出为-1&#xff0c;似乎理所当然&#xff0c;但为什么-1<0u却输出0呢&#xff0c;也就是说-1不小于0u&#xff0c;好神奇啊…

Enterprise Library 4 缓存应用程序块的设计

缓存应用程序为以下目的而设计&#xff1a; 提供一个大小可管理的 API 集合。允许开发人员添加标准的缓存操作到他们的应用程序中&#xff0c;而不用学习应用程序块的内部工作。用 Enterprise Library 配置控制台来简化配置。有效率的执行。线程安全。某些东西在被多个程序线程…

【软件测试培训】了解jmeter分布式测试

一提到分布式测试&#xff0c;大家肯定会觉得&#xff0c;哇!好高大上&#xff0c;一定很高深的吧&#xff0c;这里老师推出不做傻白甜系列文章&#xff0c;带同学们一步一步理解jmeter的分布式测试。 首先我们来看下jmeter自动化测试的流程 &#xff0c;见如下图1 【软件测试…

存储过程中返回结果集

存储过程中返回结果 从存储过程中返回结果有三种方式&#xff1a; 1、 返回结果集 这是客户端应用程序返回结果的最通用的方法。结果集是通过使用select语句选择数据产生的。结果集可以从永久表、临时表或局部变量中产生。将结果返回到另一个存储过程不是一种有效的方法。存储…

我的股票交易策略

投资股市已经成为我们进行资产配置的一个重要选择。在这个市场中主流的投资方式无外乎 价值投资 和 趋势投资 两种。价值投资需要了解各种财务指标来评价各支股票&#xff0c;在被市场低估时买入&#xff0c;在被市场高估时卖出。这对于非财务背景的人来说门槛偏高。趋势投资需…

你负责选歌,索尼负责用 AI 谱出风格相似的曲子

近日&#xff0c;索尼巴黎计算机科学实验室&#xff08;CSL&#xff09;正在开发一套算法系统 Flow Machines&#xff0c;该系统根据用户的品味谱写歌曲&#xff0c;其歌曲在迎合用户口味的基础上&#xff0c;适用于所有现有音乐风格。 技术人员搭建了一个拥有 13000 多首音乐的…

java培训:Java的十大算法

想要学好java语言&#xff0c;就要打好基础&#xff0c;java要学习的东西有很多&#xff0c;今天小编就来和大家说下java的十大算法。 算法一&#xff1a;快速排序算法 快速排序是由东尼霍尔所发展的一种排序算法。在平均状况下&#xff0c;排序 n 个项目要Ο(nlogn) 次比较。…

PetShop之表示层设计 - 《解剖PetShop》系列之六

六 PetShop之表示层设计 表示层&#xff08;Presentation Layer&#xff09;的设计可以给系统客户最直接的体验和最十足的信心。正如人与人的相交相识一样&#xff0c;初次见面的感觉总是永难忘怀的。一件交付给客户使用的产品&#xff0c;如果在用户界面&#xff08;User Inte…

物联网时代营销怎么做?

这几年和大数据、O2O一样热门的词还包括“物联网”&#xff0c; 物联网是新一代信息技术的重要组成部分&#xff0c;也是“信息化”时代的重要发展阶段。其英文名称是&#xff1a;“internet of things(IoT)”。顾名思义&#xff0c;物联网就是物物相连的互联网。 说的再通俗一…

怎样去掉 CSDN-Markdown 上传图片中的水印

自己一直使用 Markdown 写技术图文&#xff0c;我们知道在 Markdown 中插入图片需要这个图片的URL地址&#xff0c;那么必须得找一个图床来存放图片。 // Markdown 中插入图片的语法结构![这里写图片描述](这里写图片的URL地址) 我的方法是这样的&#xff1a;在 CSDN 上发一篇…

Python编程比较好的机构怎么选择

想要进入到IT行业&#xff0c;很多人的首要选择都是学习Python编程语言&#xff0c;因为Python编程语言入门是相对比较简单的&#xff0c;但是想要学好Python技术&#xff0c;找一个好的Python培训机构是非常重要的&#xff0c;那么Python编程比较好的机构怎么选择呢?来看看下…

赚钱是刚需,如何正确的交易股票?

试着考虑下面这些情景&#xff1a; 听隔壁老王说&#xff0c;最近股市行情不错&#xff0c;自己也想买点股票&#xff0c;但不知道该买那支&#xff1f;犹犹豫豫到处打听消息。 买入一支股票后&#xff0c;时不时就会拿出手机看看它的价格&#xff0c;有时候甚至一天要看很多…

C# Windows CE使用小技巧实例

C# Windows CE使用的一些感受&#xff1a;使用Windows的开发机上用C#启动一个外部程序的方法有很多&#xff0c;但这些方法用在使用WinCE的目标工控机上都无能为力。 C# Windows CE使用1、 现在以打开一个IE为例&#xff0c;介绍如何在WinCE下使用C#来打开一个外部文件&#xf…

Ovirt 安装部署方法

官方的安装文档&#xff1a;http://www.ovirt.org/documentation/install-guide/Installation_GuideOvirt note 系统镜像的下载&#xff1a;http://www.ovirt.org/download http://mirror.isoc.org.il/pub/ovirt/ovirt-4.1/iso/ovirt-node-ng-installer-ovirt/4.1-2017072816/…

零基础如何选择适合的Java培训课程

很多人都想要学习java技术&#xff0c;但是害怕自己是零基础学不好&#xff0c;所以想要找专业的java培训机构进行学习&#xff0c;但是零基础如何选择适合的Java培训课程成了他们比较头疼的事情&#xff0c;下面小编就为大家做下详细的介绍。 ​  零基础如何选择适合的Java培…

Android Intent的几种用法全面总结

Android Intent的几种用法全面总结 Intent, 用法Intent应该算是Android中特有的东西。你可以在Intent中指定程序要执行的动作&#xff08;比如&#xff1a;view,edit,dial&#xff09;&#xff0c;以及程序执行到该动作时所需要的资料。都指定好后&#xff0c;只要调用startAct…

七天学会「股票数据分析软件」的开发(中)

两天前&#xff0c;我写了 七天学会「股票数据分析软件」的开发&#xff08;上&#xff09;&#xff0c;号召大家尝试着写写代码&#xff0c;不知道大家进度如何。 如果存在掌握一种技能的刚需&#xff0c;而且知道正确的学习方法&#xff0c;经过刻意练习&#xff0c;这门技能…

《Oracle高性能SQL引擎剖析:SQL优化与调优机制详解》一1.1 生成执行计划

1.1 生成执行计划 在Oracle中&#xff0c;任何一条语句在解析过程中都会生成一个唯一的数值标识&#xff0c;即SQL_ID。而同一条语句&#xff0c;在解析过程中&#xff0c;可能会因为执行环境的改变&#xff08;例如某些优化参数被改变&#xff09;而生成多个版本的游标&#…

软件测试培训:高薪测试技术要掌握哪些

职业技能一 1. 软件测试&#xff1a; 1) 熟练灵活地运用等价类、边界值、判定表法、因果图法等各种方法设计测试用例&#xff0c;包括单元测试、集成测试、系统测试用例设计。 2) 牢固掌握了软件测试计划、测试日报、测试报告的写作方法与要领。 3) 针对B/S、C/S架构及不同…

MFC-4简单的窗口重绘(非部分重绘)

#include <afxwin.h> #include "resource.h" #include <afxtempl.h> //定义模板类的头文件class MyDocument : public CDocument { public:CArray<CPoint,CPoint &> pArray;//<保存的数据类型,读取保存数据类型的返回值>void AddPoint…

七天学会「股票数据分析软件」的开发(下)

昨天下午把《我不是药神》这部电影看了&#xff0c;搞得我哭的稀里哗啦&#xff0c;里面有一些情节触痛了内心中最薄弱的地方。药厂没有错&#xff0c;他们要收回前期投入的研发成本。主人公 程勇 只能算是整个事件的牺牲品&#xff0c;通过他的牺牲让国家关注到白血病人这个群…

那些对混合云开发和应用程序环境的错误认识

企业架构师们在开发混合云计算应用程序并为支持程序云计算而选择云计算供应商之前&#xff0c;他们应当好好完成他们的准备工作。选择错误的供应商和在错误的环境中开发应用程序都会对应用程序的运行性能、工作流程、变更成本、以及与客户的沟通和交易带来负面影响。 在本文中&…

2021年UI设计培训机构哪个好

想要学好UI设计&#xff0c;选择报一个专业的UI设计培训机构是非常明智的选择&#xff0c;为什么这么说呢?因为培训机构的课程都是根据当下的行业环境制作出来的&#xff0c;而且有专业老师&#xff0c;比自学要好很多&#xff0c;那么在2021年UI设计培训机构哪个好呢?来看看…

UVA 116 Unidirectional TSP DP

题目链接&#xff1a; https://uva.onlinejudge.org/index.php?optioncom_onlinejudge&Itemid8&category3&pageshow_problem&problem52 题目描述&#xff1a; 一个整数矩阵&#xff0c; 求第一列到最后一列的最小整数和&#xff0c; 只能从第一列出发向右&…

C++ 数据类型转换

wchar_t*,wchar_t,wchat_t数组,char,char*,char数组,std::string,std::wstring,CString....#include <string>// 使用CString必须使用MFC&#xff0c;并且不可包含<windows.h>#define _AFXDLL#include <afx.h>using namespace std;//-----------------------…

如何准备数学建模竞赛!

昨天早晨&#xff0c;我到教十一实验室的时候遇到史会峰老师&#xff0c;他说正准备给学生们进行数学建模的培训。今天早晨&#xff0c;我又遇到了孔令才老师&#xff0c;他同样也说准备给学生们进行数学建模的培训。看到这么多同事在做这个事情&#xff0c;想想自己也应该贡献…

UI设计培训:UI设计师离不开的基本版式设计

不管你是UI设计&#xff0c;还是工业设计&#xff0c;甚至动画设计&#xff0c;终究离不开基本的版式设计&#xff0c;所以版式设计这块非常考验设计师的基础功力。 1. 大且醒目&美观的排版设计 版面设计大概是一位设计师重要的部分&#xff0c;今年的版面设计会围绕着大且…