原创
car_5

消息队列(MQ)可以用在很多地方,如用户登录第一步需要判断帐号密码是否正确,第二步需要记录该用户本次登录IP、时间等等,为了加快客户端响应,第二步可以放到消息队列中异步执行;当用户浏览一篇文章时,需要给文章浏览数+1,也可以消息队列异步处理;当用户购买商品或抢购商品时,存在高并发,也可以采用消息队列,来解决高并发引起的库存超量卖出。

比较出名的消息队列有很多,如ActiveMQ、ZeroMQ、MSMQ等等,通过各方面对比了解以及考虑服务器环境,本站决定采用MSMQ,因此也做了一个MsmqHelper.cs帮助类,集成于Dos.Common中。

本站也大量用到了该MsmqHelper,如文章浏览数+1,新增、修改文章时异步更新Lucene索引等等。

使用方法:

1、安装消息队列服务

开始—》控制面板—》程序—》程序和功能—》打开或关闭Windows功能—》依次展开Microsoft Message Queue (MSMQ) 服务器、Microsoft Message Queue (MSMQ) 服务器核心—》 功能全部勾选并确定

2、发送消息

//加入队列。(构造函数可以传入一些参数)
new MsmqHelper().Send<T>(new T());
//或使用静态方法。(在MsmqParma中传入一些参数)
MsmqHelper.Send<T>(new MsmqParam<T>()
{
    Message = new T()
});

3、接收消息

//读取队列并移除该队列。(构造函数可以传入一些参数)
new MsmqHelper().ReceiveAsyn<T>(MyCallBack);
//或使用静态方法。(在MsmqParma中传入一些参数)
MsmqHelper.ReceiveAsyn<T>(new MsmqParam<Abc>()
{
    CallBack = MyCallBack
});
private static void MyCallBack(T t)
{
    //你的业务逻辑...
}
//也可以这样传入某个方法:
new MsmqHelper().ReceiveAsyn<T>(d => new BizOrderLogic().AddOrder(d));

4、实际应用场景举例

//订单参数类
public class Order
{
    //购买用户
    public string User {get; set;}
    //购买数量
    public int Count {get; set;}
}
//抢购小米手机方法
public class BaseResult AddOrder(Order param)
{
    var queueName = "自定义队列名";
    
    //创建一个队列凭证。可以将此凭证写入redis或其它介质中,状态为等待处理。
    var token = Guid.NewGuid();
    
    //加入队列
    MsmqHelper.Send<Order>(queueName, new Order(){
                User = "ITdos",
                Count = 2,
                Token = token      
            });
    //接收队列,并且异步处理业务逻辑
    MsmqHelper.ReceiveAsyn<Order>(queueName, MyCallBack);
    
    //以上两个步骤也可以直接使用以下方法:
    MsmqHelper.SendAndReceiveAsyn(queueName, new Order(){
                User = "ITdos",
                Count = 2,
                Token = token      
            }, MyCallBack)
    
    //当然,接收消息队列最好还是单独做一个监听程序,应用程序只发送消息。
    
    //前端页面显示排队中,并且开启轮循请求一个“消息处理结果接口”。如果你服务器够强悍,长连接也可以。
    return new BaseResult(true, token);
}
//业务逻辑
private static void MyCallBack(Order model)
{
    //查询库存
    //下单
    //...
    //标记Model.Token处理结果,也就达到了通知用户抢购成功或失败
}

5、附一个我的测试例子(控制台程序),确认子线程是按顺序执行,解决高并发。

public class Program
{
	public static void Main(string[] args)
	{
		Stopwatch t = new Stopwatch();
		Console.WriteLine("主线程开始");
		t.Restart();
		MsmqHelper.Send("Test", new Abc() { cc = 1 });
		MsmqHelper.Send("Test", new Abc() { cc = 2 });
		MsmqHelper.Send("Test", new Abc() { cc = 3 });
		MsmqHelper.Send("Test", new Abc() { cc = 4 });
		MsmqHelper.Send("Test", new Abc() { cc = 5 });
		MsmqHelper.Send("Test", new Abc() { cc = 6 });
		MsmqHelper.Send("Test", new Abc() { cc = 7 });
		MsmqHelper.Send("Test", new Abc() { cc = 8 });
		MsmqHelper.Send("Test", new Abc() { cc = 9 });
		MsmqHelper.Send("Test", new Abc() { cc = 10 });
		MsmqHelper.ReceiveAsynLoop<Abc>("Test", MyCallBack);
		t.Stop();
		Console.WriteLine("主线程结束");
		Console.WriteLine("主线程执行时间:" + t.ElapsedMilliseconds);
		//等待子线程执行完毕
		Thread.Sleep(15000);
	}
	private static void MyCallBack(Abc message)
	{
		Stopwatch t = new Stopwatch();
		Console.WriteLine(message.cc + "_次线程开始");
		t.Restart();
		for (int i = 0; i < 1000; i++)
		{
		        //此段代码只是为了增加子线程的执行时间
			var a = 8121 * 128787 - 328787 / 12878 + 7888787;
			var b = 9212 * 3212 - 1287 / 12878 + 12387 * 123;
			var c = a * b / 2;
			LogHelper.Debug(message.cc + "_次线程_" + message.cc);
		}
		t.Stop();
		Console.WriteLine(message.cc + "_次线程结束");
		Console.WriteLine(message.cc + "_次线程执行时间:" + t.ElapsedMilliseconds);
	}
	public class Abc
	{
		public object cc { get; set; }
	}
}

执行结果达到预期:

也可以在计算机管理中查看未被处理的消息队列:


【本文原创自:http://www.itdos.com/CSharp/201604/20153855656.html 转载请注明链接,谢谢。】