当前位置: 首页 > 编程日记 > 正文

CQRS学习——最小单元的Cqrs(CommandEvent)[其一]


【说明:博主采用边写边思考的方式完成这一系列的博客,所以代码以附件为准,文中代码仅为了说明。】

结构

在学习和实现CQRS的过程中,首要参考的项目是这个【http://www.cnblogs.com/yangecnu/p/Introduction-CQRS.html】。所以Dpfb.Cqrs中的整体结构都是参考这个例子来的,在这个基础之上添加和改进。总的来说,.Cqrs项目的整体结构如下所示:

主要包含了(命令,事件,通信,命令处理,事件处理这几个方面)。具体的角色则如下图所示:

通信中包含了事件总线和命令总线。由于查询入口(QueryEntry)如何处置暂时没想好,所以先放一个文件夹卖萌。

实现

此时的目标是实现一个最小单元的CQRS(主要是命令和事件部分)。其中命令总线和事件总线的实现比较固定,他们的职责就是为命令和事件找到对应的处理类,然后依次调用对象的接口方法,从而将一般的直接调用“打断”,同时提供各种额外操作的注入点。为了更好的控制他们如何寻找处理类,这里引入两个接口ICommandHandlerSearcher以及IEventHandlerSearcher。放在项目的Configuration名称空间下面。现在,可以为这些接口提供一个基础的实现,供测试使用。以下是接口定义:

    public interface ICommand{Guid Id { get; set; } } public interface IEvent { Guid Id { get; set; } } public interface IEventHandler<T> where T : IEvent { void Handle(T @event); } public interface ICommandHandler<T> where T: ICommand { void Execute(T command); } public interface ICommandBus { void Send<T>(T command) where T : ICommand; } public interface IEventBus { void Publish<T>(T @event) where T : IEvent; } public interface ICommandHandlerSearcher { ICommandHandler<T> Find<T>() where T : ICommand; } public interface IEventHandlerSearcher { IEnumerable<IEventHandler<T>> Find<T>() where T : IEvent; }
接口定义

以及通信(Buses)部分的实现:

public class DpfbEventBus : IEventBus{void IEventBus.Publish<T>(T @event){foreach (var handler in EventHandlerSearcher.Find<T>()){handler.Handle(@event);}}public IEventHandlerSearcher EventHandlerSearcher { get; set; }}public class DpfbCommandBus : ICommandBus{void ICommandBus.Send<T>(T command){var handler = CommandHandlerSearcher.Find<T>();handler.Execute(command);}public ICommandHandlerSearcher CommandHandlerSearcher { get; set; }}
Buses实现

其中,两个ISearcheres的实现包含了如何寻找Handlers以及到哪里寻找Handlers,这些具体的内容,.Cqrs这层其实并不关心。处于测试的目的,将这些实现代码移到Test名称空间下。并实现本程序集内的查找:

 public class TestCommandHandlerSearcher:ICommandHandlerSearcher{ICommandHandler<T> ICommandHandlerSearcher.Find<T>(){var assembly = Assembly.GetCallingAssembly();var declaredType = typeof (ICommandHandler<T>);var handlers = assembly.GetTypes().Where(i => declaredType.IsAssignableFrom(i)).Select(i => Activator.CreateInstance(i)).ToArray();if (handlers.Count() != 1)throw new Exception();return handlers.First() as ICommandHandler<T>;}}public class TestEventHandlerSearcher : IEventHandlerSearcher{IEnumerable<IEventHandler<T>> IEventHandlerSearcher.Find<T>(){var assembly = Assembly.GetCallingAssembly();var declaredType = typeof (IEventHandler<T>);var handlers = assembly.GetTypes().Where(i => declaredType.IsAssignableFrom(i)).Select(i => Activator.CreateInstance(i));return handlers.Cast<IEventHandler<T>>();}}
Searchers实现

测试

接下来添加一些实现代码,以进行测试:

    public class TestEventTwo : IEvent{Guid IEvent.Id{get { throw new NotImplementedException(); }set { throw new NotImplementedException(); }}}public class TestEventOne : IEvent{Guid IEvent.Id{get { throw new NotImplementedException(); }set { throw new NotImplementedException(); }}}public class TestEventHandler : IEventHandler<TestEventOne>,IEventHandler<TestEventTwo>{void IEventHandler<TestEventOne>.Handle(TestEventOne @event){Console.WriteLine("处理了事件eventOne");Configuration.EventBus.Publish(new TestEventTwo());Console.WriteLine("引发了事件eventOne");}void IEventHandler<TestEventTwo>.Handle(TestEventTwo @event){Console.WriteLine("处理了事件eventTwo");}}public class TestCommandHandler : ICommandHandler<TestCommand>{void ICommandHandler<TestCommand>.Execute(TestCommand command){Console.WriteLine("获取并处理消息:" + command.Message);var eventOne = new TestEventOne();Configuration.EventBus.Publish(eventOne);}}public class TestCommand : ICommand{Guid ICommand.Id{get { throw new NotImplementedException(); }set { throw new NotImplementedException(); }}public string Message { get; set; }}public static class Configuration{public static ICommandBus CommandBus = new DpfbCommandBus();public static IEventBus EventBus = new DpfbEventBus();}
测试代码

其中,Buses作为单例存在于一个静态类的字段中。
最后,用一个单元测试运行:

[TestMethod]public void TestMethod1(){var command = new TestCommand {Message = "国庆记得回家"};Configuration.CommandBus.Send(command);}
UnitTest

结果(倒序是正常的,总的来说,事件链的执行会是一个深度优先的调用):

审计(以及Session)

在上个测试例子中,使用了控制台输出来表明各个事件的执行顺序。在正常开发过程中,我们总不能到处输出到控制台,就算输出了也不见得有用。所以我们使用另外的方法来实现这一点。博主第一次接触审计这个概念是在接触一个名为ABP的框架的时候【此处应有链接】,里面包含了很多的信息(用户身份,性能计数,时间等...),当时非常震惊。于是就把ABP源码中的一个文件夹扒了下来自己用。ABP实现了Service方法的审计,用的是动态代理(Castle)。而Cqrs本身就留了无数的注入点,所以实现起来更加直观和方便。另外,博主同时参考了ABP的Session的实现,然后搞出一个土鳖版的Session。虽然比较无耻,但是对于以后ABP的理解应该会有帮助。

博主当初匆匆忙忙的撸了一个CQRS的框架,只是想试水,所以没有想很多,Auditing部分真的是直接扒下来改了改,现在开始写博客,有更多的时间思考,所以打算从头开始实现一个。首先开始抽象,Auditing是一种消息,同时还要考虑它的存放问题,所以定义了以下两个接口(由于Abudting还包含了其他方面,如web,所有接口定义移到Dpfb层):

public interface IAuditInfo{}public interface IAuditStorage{IEnumerable<IAuditInfo> Retrive();}
Auditing接口

顺便实现一个基于内存的存储:

 public class MemoryAuditStorage:IAuditStorage{private List<IAuditInfo> _inMemory = new List<IAuditInfo>();IEnumerable<IAuditInfo> IAuditStorage.Retrive(){return _inMemory;}void IAuditStorage.Save(IAuditInfo auditInfo){_inMemory.Add(auditInfo);}}
MemoryAuditStorage

接下来实现Cqrs的命令和时间的审计对象。由于事件的调用链是棵树,这里在Dpfb层引入一些必须的数据结构(见DaaStructure名称空间)。以下是针对Cqrs的AuditInfo实现:

public class CommandEventAuditInfo : ExtendedTreeNode<CommandEventAuditInfo>, IAuditInfo{public DateTime InvokedTime { get; set; }/// <summary>/// 单位为毫秒/// </summary>public int InvokingDuration { get; set; }public IDpfbSession DpfbSession { get; set; }public Type CommandEventHandlerType { get; set; }public Type CommandEventType { get; set; }public Type DeclaredHandlerType { get; set; }//todo 待扩展public object ThreadInfo { get; set; }public bool Stopped { get; private set; }private Stopwatch _stopwatch = new Stopwatch();public void Start(){_stopwatch.Start();}public void Stop(){_stopwatch.Stop();InvokingDuration = (int) _stopwatch.ElapsedMilliseconds;Stopped = true;Current = Parent;}public static ConcurrentDictionary<int, CommandEventAuditInfo> ConcurrentDic =new ConcurrentDictionary<int, CommandEventAuditInfo>();public override string ToString(){return PrintTree();}public string PrintTree(){var sb = new StringBuilder();var userStr = DpfbSession != null ? DpfbSession.ToString() : "匿名用户";var @abstract = string.Format("命令事件调用分析[{3}]:用户[{1}]引发了[{0}],总耗时[{2}]毫秒。调用链:",CommandEventType.Name, userStr, InvokingDuration, InvokedTime);sb.Append(@abstract);var recursionDepth = 1;return PrintTree(this, ref recursionDepth, sb);}private string PrintTree(CommandEventAuditInfo auditInfo, ref int recursionDepth, StringBuilder sb){sb.AppendLine();var span = recursionDepth == 0? "": string.Join("", new string[recursionDepth].Select(i => "         ").ToArray());sb.Append(span + "|---");sb.AppendFormat("[{2}]处理[{0}],耗时[{1}毫秒]", auditInfo.CommandEventType.Name,auditInfo.InvokingDuration, auditInfo.CommandEventType.Name);if (auditInfo.Children.Any())recursionDepth++;foreach (var commandEventAuditInfo in auditInfo.Children){PrintTree(commandEventAuditInfo, ref recursionDepth, sb);}return sb.ToString();}private static CommandEventAuditInfo CreateUnstoppedOnCurrentThread(){Func<int, CommandEventAuditInfo> creator = k => new CommandEventAuditInfo(){ThreadInfo = k};var threadName = Thread.CurrentThread.ManagedThreadId;var auditInfo = ConcurrentDic.GetOrAdd(threadName, creator);if (auditInfo.Stopped){return ConcurrentDic.AddOrUpdate(threadName, creator, (k, nv) => creator((int) nv.ThreadInfo));}return auditInfo;}public static CommandEventAuditInfo StartNewForCommand<TCommand>(Type handlerType) where TCommand : ICommand{var root = CreateUnstoppedOnCurrentThread();root.InvokedTime = DateTime.Now;root.CommandEventHandlerType = handlerType;root.CommandEventType = typeof (TCommand);root.DeclaredHandlerType = typeof (ICommandHandler<TCommand>);Current = root;return root;}public static CommandEventAuditInfo StartNewForEvent<TEvent>(Type handlerType) where TEvent : IEvent{var auditInfo = new CommandEventAuditInfo(){CommandEventType = typeof (TEvent),CommandEventHandlerType = handlerType,DeclaredHandlerType = typeof (IEventHandler<TEvent>),InvokedTime = DateTime.Now};Current.Children.Add(auditInfo);auditInfo.Parent = Current;return auditInfo;}public static CommandEventAuditInfo Root{get { return CreateUnstoppedOnCurrentThread(); }}public static CommandEventAuditInfo Current { get; private set; }}
AuditInfo4Cqrs

稍作修改之后,重新运行单元测试:

[TestMethod]public void TestAuditing(){var command = new TestCommand {Message = "国庆记得回家"};Configuration.CommandBus.Send(command);command = new TestCommand {Message = "国庆记得回家"};Configuration.CommandBus.Send(command);foreach (var auditInfo in Configuration.AuditStorage.Retrive()){Console.WriteLine(auditInfo.ToString());}}
TestAuditing

这里是运行结果[事件调用链的关系有误,请移步第二篇查看]:

至于Session,此时实现并不能表示其作用,所以只是定义了一个接口。

补丁

至此,一个可以发挥作用的Cqrs就完成了。同时也遗留下一些需要思考的问题:

【事件的继承关系】

在博主的目前实现中,是不考虑事件的继承关系的。

【事件的先后顺序】

使用Handler实现方法上的Attribute实现,这个仅仅作为辅助功能实现。逻辑上的先后顺序由事件链指定。

【CqrsAuditng的生命周期】

目前博主使用Unity作IoC,基于线程的生命周期管理,会存在并发问题(ASP.NET),需要继续考虑。

【Searchers的性能优化】

实际上调用链在编译期间就确定了,所以,可以使用ExpressionTree作缓存。

【事件/命令异步执行的支持】

在ASP.NET中,使用基于TPL的一套异步编程框架可以提高吞吐。这个可以大致这么理解:在web系统中,存在两种角色,请求入口(IIS)【标记为A】,以及请求处理者(实现代码)【标记为B】。我们先假定整个系统只有一个A。那么,在改进之前,情况是“A接到了一个请求告诉B,然后看着B把事情干完,再把结果告诉请求方”。改进之后的情况是“A接到了一个请求B,告诉B处理完事情之后给个反馈——A马上开始处理其他请求。”显然,在改进之前A的等待时间很长,改进之后A当了甩手掌柜(只等报告),等待时间很短。于是乎,A单位时间能的处理量就上去了,吞吐就上去了。然而,对于一次请求而言,时长取决于B,除非B的工作效率有所改进,不然并不会提升性能。

这些是博主的个人理解,并不求多少准确,仅希望能大致描述这么一个意思。

然而博主做了很多测试,并没有反应出这一点。详细内容,请移步这篇文章:【此处应有链接】

【Audting的配置】

这是ABP做的好的地方,也是工作量最大的地方,配置的优先级等。

此篇完成时,所使用的代码:【http://pan.baidu.com/s/1o6IeNXK】

转载于:https://www.cnblogs.com/lightluomeng/p/4738147.html

相关文章:

PHP APC安装与使用

最简单的方法&#xff0c;找到php安装目录的pecl 自动安装&#xff1a; # /usr/local/php/bin/pecl install apc 下面按提示一步步完成即可 配置/etc/php.ini 末尾加入extensionapc.so 手动安装&#xff1a; 官网 http://cn2.php.net/manual/zh/book.apc.php 下载http://p…

AIphaCode 并不能取代程序员,而是开发者的工具

编译 | 禾木木 出品 | AI科技大本营&#xff08;ID:rgznai100&#xff09; DeepMind 是 AI 研究实验室&#xff0c;它引入了一种深度学习模型&#xff0c;可以生成具有显著效果的软件源代码。该模型名为 AIphaCode&#xff0c;是基于 Transformers&#xff0c;OpenAI 在其代码生…

源码阅读:SDWebImage(六)——SDWebImageCoderHelper

该文章阅读的SDWebImage的版本为4.3.3。 这个类提供了四个方法&#xff0c;这四个方法可分为两类&#xff0c;一类是动图处理&#xff0c;一类是图像方向处理。 1.私有函数 先来看一下这个类里的两个函数 /**这个函数是计算两个整数a和b的最大公约数*/ static NSUInteger gcd(N…

基于 OpenCV 的网络实时视频流传输

作者 | 努比来源 | 小白学视觉大多数人会选择使用IP摄像机&#xff08;Internet协议摄像机&#xff09;而不是CCTV&#xff08;闭路电视&#xff09;&#xff0c;因为它们具有更高的分辨率并降低了布线成本。在本文中&#xff0c;我们将重点介绍IP摄像机。IP摄像机是一种数字 摄…

【转】让Chrome化身成为摸鱼神器,利用Chorme运行布卡漫画以及其他安卓APK应用教程...

下周就是十一了&#xff0c;无论是学生党还是工作党&#xff0c;大家的大概都会有点心不在焉&#xff0c;为了让大家更好的心不在焉&#xff0c;更好的在十一前最后一周愉快的摸鱼&#xff0c;今天就写一个如何让Chrome&#xff08;google浏览器&#xff09;运行安卓APK应用的教…

PHP安装parsekit扩展查看opcode

也可以通过VLD查看&#xff0c;具体请看本人写的http://blog.csdn.net/21aspnet/article/details/7002644安装parsekit扩展 http://pecl.php.net/package/parsekit 下载最新的 #wget http://pecl.php.net/get/parsekit-1.3.0.tgz 安装过程省略 可以参考 本人写的http://blog.c…

group by 查找订单的最新状态 join

Order&#xff1a;snProcedures&#xff1a;sn,status1、 有订单表和流程表。订单表含有订单的详细信息【假设没有订单状态哈】&#xff0c;每个订单有好多种状态&#xff1a;已付款、处理中、待收货等等。现在的需求可能是查询订单状态是待收货的所有订单的信息。【答】先找到…

Xcache安装与使用

官网&#xff1a;http://xcache.lighttpd.net 最新版本下载地址&#xff1a;http://xcache.lighttpd.net/wiki/Release-1.3.2 安装&#xff1a; # wget http://xcache.lighttpd.net/pub/Releases/1.3.2/xcache-1.3.2.tar.gz # tar zvxf xcache-1.3.2.tar.gz # cd xcache-1.3…

安装mysql_python的适合遇到mysql_config not found解决方案(mac)

为什么80%的码农都做不了架构师&#xff1f;>>> 安装mysql_python的适合遇到mysql_config not found解决方案&#xff08;mac&#xff09; 用pip安装MySQL-python时候遇到报错&#xff1a; ------我是分割线------ Complete output from command python setup.py e…

推荐 6 个好用到爆的 Pycharm 插件

作者 | 小欣来源 | Python爱好者集中营相信对于不少的Python程序员们都是用Pycharm作为开发时候的IDE来使用的&#xff0c;今天小编来分享几个好用到爆的Pycharm插件&#xff0c;在安装上之后&#xff0c;你的编程效率、工作效率都能够得到极大地提升。安装方法插件的安装方法一…

Kibana 用户指南(使用Flight仪表盘探索Kibana)

使用Flight仪表盘探索Kibana 你是Kibana的新手并希望尝试一下&#xff0c;只需单击一下&#xff0c;你就可以安装Flights样本数据并开始与Kibana交互。 Flight数据集包含四家航空公司的数据&#xff0c;你可以从Kibana主页加载数据和预配置的仪表盘。 在主页上&#xff0c;单击…

php扩展xdebug安装以及用kcachegrind系统分析

一&#xff1a;安装 安装方法一&#xff1a;编译安装1、下载PHP的XDebug扩展&#xff0c;网址&#xff1a;http://xdebug.org/# wget http://pecl.php.net/get/xdebug-2.1.2.tgz# tar -xzf xdebug-2.1.2.tgz# xdebug-2.1.2# cd xdebug-2.1.2# /usr/local/php/bin/phpize# ./con…

Meta AI 新研究,统一模态的自监督新里程碑

作者 | 青苹果来源 | 数据实战派虽然 AI 领域不断涌现出新的突破和进展&#xff0c;却始终难以逃离单一领域的束缚——一种用于个性化语音合成的新颖方法&#xff0c;却并不能用于识别人脸的表情。为了解决这个问题&#xff0c;不少研究人员正在致力于开发功能更强大、应用更广…

细说Debug和Release区别

VC下Debug和Release区别 最近写代码过程中&#xff0c;发现 Debug 下运行正常&#xff0c;Release 下就会出现问题&#xff0c;百思不得其解&#xff0c;而Release 下又无法进行调试&#xff0c;于是只能采用printf方式逐步定位到问题所在处&#xff0c;才发现原来是给定的一个…

26期20180601目录管理

6月1日任务2.1/2.2 系统目录结构2.3 ls命令2.4 文件类型2.5 alias命令系统目录结构ls - list所有的用户在系统里都有自己的家目录&#xff0c;比如现在登陆的是root用户&#xff0c;登陆进去就是在root的家目录中&#xff0c;可以看到之前创建的公钥文件也是在这。但是如果是其…

thttpd安装与调试

http://www.acme.com/software/thttpd/ thttpd是一个非常小巧的轻量级web server&#xff0c;它非常非常简单&#xff0c;仅仅提供了HTTP/1.1和简单的CGI支持&#xff0c;在其官方网站上有一个与其他web server&#xff08;如Apache, Zeus等&#xff09;的对比图Benchmark&…

7 款可替代 top 命令的工具!(二)

作者 | JackTian来源 | 杰哥的IT之旅上一篇文章中给大家介绍了《11 款可替代 top 命令的工具&#xff01;》&#xff0c;今天我再来给大家推荐 7 款可替代 top 命令的工具&#xff0c;看完这两篇替代品的文章相信能让你对 Linux 操作系统下一个小小的命令大开眼界。一、atopato…

Error:Execution failed for task ':app:dexDebug'. com.android.ide.common.process.ProcessException

异常Log&#xff1a; Error:Execution failed for task ‘:app:dexDebug’. > com.android.ide.common.process.ProcessException: org.gradle.process.internal.ExecException: Process ‘command ‘/Library/……/java” finished with non-zero exit value 2 错误原因&am…

# 学号 2017-2018-20172309 《程序设计与数据结构》第十一周学习总结

---恢复内容开始--- 学号 2017-2018-20172309 《程序设计与数据结构》第十一周学习总结 教材学习内容总结 第23章 初识Android操作系统&#xff1a;一个多用户的Linux系统&#xff0c;一个运用程序运行时与其他的运用运行是独立的。发展&#xff1a;在Android4.4之前所有的应用…

php扩展xdebug基本使用

官网&#xff1a;http://www.xdebug.org/ 使用&#xff1a;http://www.xdebug.org/docs/安装 http://blog.csdn.net/21aspnet/article/details/7036087使用1.获取文件名&#xff0c;行号&#xff0c;函数名 xdebug_call_class() <?php function fix_string($a) { …

基于 Opencv 实现眼睛控制鼠标

作者 | 小白来源 | 小白学视觉如何用眼睛来控制鼠标&#xff1f;一种基于单一前向视角的机器学习眼睛姿态估计方法。在此项目中&#xff0c;每次单击鼠标时&#xff0c;我们都会编写代码来裁剪你们的眼睛图像。使用这些数据&#xff0c;我们可以反向训练模型&#xff0c;从你们…

linux 安装安装rz/sz 和 ssh

安装rz&#xff0c;sz yum install lrzsz; 安装ssh yum install openssh-server 查看已安装包 rpm -qa | grep ssh 更新yum源 1、备份 mv /etc/yum.repos.d/CentOS-Base.repo /etc/yum.repos.d/CentOS-Base.repo.backup 2、下载新的CentOS-Base.repo 到/etc/yum.repos.d/ CentO…

css左固定右自适应常用方法

下面是几种方法的公用部分&#xff08;右自适应也是一样的&#xff0c;换一下方向&#xff09; html: <div class"demo"> <div class"sidebar">我是固定的</div> <div class"content">我是自适应的</div> </di…

nginx或httpd实现负载均衡tomcat(三)

接博客nginx或httpd实现反向代理tomcat并实现会话保持&#xff08;二&#xff09;实例四&#xff1a;使用httpd负载均衡后端tomcat服务第一步&#xff1a;准备两个tomcat服务器节172.16.240.203修改tomcat的server.xml配置文件&#xff0c;添加一个host。<Host name"to…

为 PHP 应用提速、提速、再提速

原文地址&#xff1a; http://www.ibm.com/developerworks/cn/opensource/os-php-fastapps1/ http://www.ibm.com/developerworks/cn/opensource/os-php-fastapps2/index.html为 PHP 应用提速、提速、再提速&#xff01;PHP 是一种脚本语言&#xff0c;常用于创建 Web 应用程序…

冬奥会夺金的背后杀手锏,竟是位 AI 虚拟教练

整理 | 禾木木 出品 | AI科技大本营&#xff08;ID:rgznai100&#xff09; 近日&#xff0c;一则消息登上了热搜&#xff1a; 2月14日晚&#xff0c;在北京冬奥会自由式滑雪女子空中技巧决赛中&#xff0c;徐梦桃为中国代表团再添一金。她选择了难度系数4.293的动作&#xff0c…

Socket-实例

import socket,os,time server socket.socket() server.bind(("localhost",9999)) server.listen()while True:conn,addrserver.accept()print("new conn",addr)while True:print("等待新指令")data conn.recv(1024)if not data:print("客…

kcachegrind安装

http://kcachegrind.sourceforge.net/cgi-bin/show.cgi/KcacheGrindDownload http://hi.baidu.com/wangxinhui419/blog/item/4a7409c78c22b4c8d100608a.html http://wxiner.blog.sohu.com/156841393.html说明&#xff1a;linux下如果安装不上&#xff0c;直接下载windows版的吧…

Java【小考】

课上&#xff0c; 老师出了一个题: 考察&#xff1a;1、类的定义 2、类的属性 3、类的方法、重载、构造方法、代码块 题目是这样的&#xff1a; 设计 一个 类&#xff1a;Tree 要求&#xff1a; 1、包含main方法 2、属性&#xff1a;静态&#xff1a; String name ; double hei…

首个深度强化学习AI,能控制核聚变,成功登上《Nature》

编译 | 禾木木 出品 | AI科技大本营&#xff08;ID:rgznai100&#xff09; 最近&#xff0c;DeepMind 开发出了世界上第一个深度强化学习 AI &#xff0c;可以在模拟环境和真正的核聚变装置中实现对等离子体的自主控制。 这项研究成果登上了《Nature》杂志。 托卡马克是一种用于…