当前位置: 首页 > 编程日记 > 正文

SmartRoute之大规模消息转发集群实现

为什么80%的码农都做不了架构师?>>>   hot3.png

消息转发的应用场景在现实中的应用非常普遍,我们常用的IM工具也是其中之一;现有很多云平台也提供了这种基础服务,可以让APP更容易集成相关功能而不必投入相应的开发成本。对于实现这样一个简单功能并不复杂,对于现有的技术来说用.net提个通讯服务器支持几十W用户相信也不是件困难的事情;但如果考虑可用性和更大规模那就需要下点功夫,并且对相关技术有深入的了解才能实现了。而在这里主要讲解一下如何通过SmartRoute来实现一个大规模的消息转发集群的基础服务。

说到集群那肯定由N个服务组成的一组应,那做一个消息转发集群的基础服务需要那些服务节点呢?分析一下主要包括两大块:注册中心和消息网关;网关用于和应用对接,而注册中心则是明确应用所在位置。为了达到更好的可用性和更大规模支撑注册中心和网关都是N-N的关系。

​​

看到这样一个图估计会把很不了解这方面的朋友会卡住,这样一个东西实现会很复杂吧!其实在SmartRoute基础之上实现这样这样一个集群服务并不困难,不过对于消息交互原理和设计还是需要了解一下。接下来讲解一下如何用SmartRoute实现相应注册中心和网关服务。

注册中心

注册中心的作用很简单就是保存应用标识所在位置,当网关需要转发消息的时候告诉网关这个应用标识在那个位置上。除了这一功能外当然还要考虑可用性,主要包括多中心发现和注册信息现步等;同样网关也具行指向多台中心的负载能力。

public interface ICenter : IDisposable{String ID { get; }INode Node { get; set; }IUserService UserService { get; set; }void Open();}

中心的接口定义很简单,主要都是内部针对SmartRoute的INode进行相关消息操作。

public void Open(){mCenterSubscriber = Node.Register<EventSubscriber>(ID);mCenterSubscriber.Register<Protocol.SyncUsers>(OnSyncUsers);mCenterSubscriber.Register<Protocol.CenterStarted>(OnOtherCenterStarted);mCenterSubscriber.Register<Protocol.Register>(OnSyncUser);mCenterSubscriber.Register<Protocol.UnRegister>(OnSyncUnRegister);Node.SubscriberRegisted += OnSubscriberRegisted;mStartServiceTimer = new System.Threading.Timer(OnOpen, null, 5000, 5000);Node.Loger.Process(LogType.INFO, "search other center...");}//处理用户上线所在网关信息private void OnReceiveUsersInfo(Message msg, Protocol.GetUsersInfo e){string[] users = e.Receiver.Split(';');Protocol.GetUserInfoResponse response = new Protocol.GetUserInfoResponse();response.RequestID = e.RequestID;Protocol.OperationStatus status = new Protocol.OperationStatus();foreach (string user in users){Protocol.UserInfo info = UserService.GetUserInfo(user, status);if (info != null)response.Items.Add(info);}msg.Reply(response);}//网关用户下线private void OnUserUnregister(Message msg, Protocol.UnRegister e){Protocol.OperationStatus status = new Protocol.OperationStatus();UserService.Remove(e.Name, status);msg.Reply(status);Node.Loger.Process(LogType.INFO, "{0} user unregister", e.Name);//同步到其他中心节点if (mHasOtherCenter)mCenterSubscriber.Publish(CENTER_OTHER_TAG, e, ReceiveMode.Regex);}//网关用户上线private void OnUserRegister(Message msg, Protocol.Register e){Protocol.OperationStatus status = new Protocol.OperationStatus();UserService.Register(new Protocol.UserInfo() { Name = e.Name, Gateway = e.GatewayID }, status);msg.Reply(status);Node.Loger.Process(LogType.INFO, "{0} user register from {1}", e.Name, e.GatewayID);//同步到其他中心节点if (mHasOtherCenter)mCenterSubscriber.Publish(CENTER_OTHER_TAG, e, ReceiveMode.Regex);}//同步下线private void OnSyncUnRegister(Message msg, Protocol.UnRegister e){Protocol.OperationStatus status = new Protocol.OperationStatus();UserService.Remove(e.Name, status);Node.Loger.Process(LogType.INFO, "{0} user unregister", e.Name);}//同步上线private void OnSyncUser(Message msg, Protocol.Register e){Protocol.OperationStatus status = new Protocol.OperationStatus();UserService.Register(new Protocol.UserInfo() { Name = e.Name, Gateway = e.GatewayID }, status);Node.Loger.Process(LogType.INFO, "{0} user register from {1}", e.Name, e.GatewayID);}//同步其他中心上线信息private void OnSyncUsers(Message msg, Protocol.SyncUsers e){Node.Loger.Process(LogType.INFO, "sync user info to local!");Protocol.OperationStatus status = new Protocol.OperationStatus();foreach (Protocol.UserInfo item in e.Items){UserService.Register(item, status);}}private void OnSubscriberRegisted(INode node, ISubscriber subscriber){//发现其他中心服务,向服务发起同步用户请求if (subscriber.Name.IndexOf(CENTER_TAG) == 0 && subscriber.Name != ID){mHasOtherCenter = true;mReadyToStart = false;Node.Loger.Process(LogType.INFO, "find {0} center", subscriber.Name);Protocol.CenterStarted started = new Protocol.CenterStarted();started.Name = ID;mCenterSubscriber.Publish(subscriber.Name, started);Node.Loger.Process(LogType.INFO, "request sync user info ....");}}public INode Node{get; set;}

实现并不复杂,主要是开启相关订阅并注册消息处理方法即可,主要针对注册,同步和获取用户所在网关信息。

网关

网关的作用主要是接收消息,从注册中心获取用户标识对应的网关并把消息推送过去;所以功能也并不复杂主要也是针对INode的操作。

public interface IGateway : IDisposable{INode Node { get; set; }Protocol.OperationStatus Register(UserToken userToken);Protocol.OperationStatus UnRegister(string username);void SendMessage(string receivers, string sender, object message);void Open();}

功能比较简单用户标识注册和注销功能,还加上一个消息推送方法即可。

public OperationStatus Register(UserToken userToken){OperationStatus result;Register register = new Register();register.Name = userToken.Name;register.GatewayID = Node.DefaultEventSubscriber.Name;result = Node.DefaultSwitchSubscriber.SyncToService<Protocol.OperationStatus>(Center.USER_SERVICE_TAG, register);mUserActions[userToken.Name] = userToken;return result;}public void SendMessage(string receivers, string sender, object message){MessageQueue.MessageItem item = new MessageQueue.MessageItem();item.ID = GetRequestID();item.Receives = receivers;item.Sender = sender;item.Data = message;mMsgQueue.Push(item);GetUsersInfo getinfo = new GetUsersInfo();getinfo.RequestID = item.ID;getinfo.Receiver = receivers;Node.DefaultSwitchSubscriber.ToService(Center.USER_SERVICE_TAG, getinfo);}public void Dispose(){if (mMsgQueue != null)mMsgQueue.Dispose();}public void Open(){mMsgQueue = new MessageQueue(this, 2);mMsgQueue.Open();Node.DefaultSwitchSubscriber.DefaultEventSubscriber.Register<GetUserInfoResponse>(OnGetUserInfoRequest);Node.DefaultEventSubscriber.Register<UserMessage>(OnUserMessage);}public OperationStatus UnRegister(string username){UnRegister unregister = new UnRegister();unregister.Name = username;UserToken token = null;mUserActions.TryRemove(username, out token);return Node.DefaultSwitchSubscriber.SyncToService<OperationStatus>(Center.USER_SERVICE_TAG, unregister);}

中心启动

由于基于SmartRoute的设计,所以中心的启动并不需要进行其他配置,直接开启动行即可;对于多节点的中心怎办?如果有需要多启一个实例即可达到多中心负载能力。

public class Program{public static void Main(string[] args){INode node = NodeFactory.Default;node.Loger.Type = LogType.ALL;node.AddLogHandler(new SmartRoute.ConsoleLogHandler(LogType.ALL));node.Open();MRC.MCRFactory.Center.Open();System.Threading.Thread.Sleep(-1);		}}

网关应用

网关的启动和中心一样,不过需要根据实际需要发起用户标识注册,注册后就可以向集群中的任何标识发送消息。

public class Program{public static void Main(string[] args){INode node = NodeFactory.Default;node.Loger.Type = LogType.ALL;node.AddLogHandler(new SmartRoute.ConsoleLogHandler(LogType.ALL));node.Open();MRC.MCRFactory.Gateway.Open();System.Threading.ThreadPool.QueueUserWorkItem(OnTest);System.Threading.Thread.Sleep(-1);}private static void OnTest(object state){System.Threading.Thread.Sleep(10000);UserToken token = new UserToken("ken");token.Register();token.Receive = OnUserReceive;}private static void OnUserReceive(UserToken token, Protocol.UserMessage e){Console.WriteLine("receive message from {0} {1}", e.Sender, e.Data);}}

构建相应标识的UserToken注册到网关,网关会自动把标识同步到中心;然后定义UserToken相应的消息接收方法即可处理接收的消息。实际应用中可以继承UserToken并挂相应的客户端连接然后当接收消息做相应的网络转发就可以达到用户和用户间的通讯。

由一这样功能并不复杂所以已经封装起方便扩展应用,具体项目地址:https://github.com/IKende/SmartRoute.MRC

转载于:https://my.oschina.net/ikende/blog/855487

相关文章:

unity项目警告之 LF CRLF问题

unity中创建的脚本&#xff0c;以LF结尾。 Visual studio中创建的脚本&#xff0c;以 CRLF结尾。 当我们创建一个unity脚本后&#xff0c;再用VS打开编辑保存后&#xff0c;这个文件既有LF结尾符&#xff0c;也有CRLF结尾符。 解决办法&#xff1a;更改unity的代码生成模板&…

Eigen向量化内存对齐/Eigen的SSE兼容,内存分配/EIGEN_MAKE_ALIGNED_OPERATOR_NEW

1.总结 对于基本数据类型和自定义类型&#xff0c;我们需要用预编译指令来保证栈内存的对齐&#xff0c;用重写operator new的方式保证堆内存对齐。对于嵌套的自定义类型&#xff0c;申请栈内存时会自动保证其内部数据类型的对齐&#xff0c;而申请堆内存时仍然需要重写operat…

c/c++文件遍历

//CBrowseDir.h#pragma once#include <stdlib.h> #include <direct.h> #include <string.h> #include <io.h> #include <stdio.h> #include <iostream> using namespace std; class CBrowseDir { protected: //存放初始目录的绝对…

优化应用启动时的体验

2019独角兽企业重金招聘Python工程师标准>>> 对于应用的启动时间&#xff0c;只能是尽量的避免一些耗时的、非必要性的操作在主线程中&#xff0c;这样相对减少一部分启动的耗时&#xff0c;同时在等待第一帧显示的时间里&#xff0c;可以加入一些配置用来增加用户体…

系列四、SpringMVC响应数据和结果视图

2019独角兽企业重金招聘Python工程师标准>>> 项目结构如下 一、返回值分类 一 返回字符串 Controller方法返回字符串可以指定逻辑视图的名称&#xff0c;根据视图解析器为物理视图的地址&#xff0c;根据字符串最后跳转到对应jsp页面 第一步、导入依赖坐标文件、配置…

numpy数组切片:一维/二维/数组

文章目录numpy数组切片操作一维数组&#xff08;冒号&#xff1a;&#xff09;1、一个参数&#xff1a;a[i]2、两个参数&#xff1a;ba[i:j]3、三个参数&#xff1a;格式b a[i:j:s]4、例子二维数组&#xff08;逗号&#xff0c;&#xff09;取元素 X[n0,n1]切片 X[s0:e0,s1:e1…

行列式求值、矩阵求逆

#include <iostream> #include <string> #include <assert.h> #include <malloc.h> #include <iostream> #include <stdlib.h> #include <memory.h> #include <time.h>using namespace std;//动态分配大小位size的一维数组 te…

IP 地址子网划分

1.你所选择的子网掩码将会产生多少个子网2的x次方-2(x代表网络位借用主机的位数&#xff0c;即2进制为1的部分&#xff0c;现在的网络中&#xff0c;已经不需要-2&#xff0c;已经可以全部使用&#xff0c;不过需要加上相应的配置命令&#xff0c;例如CISCO路由器需要加上ip su…

git rebase 和 git merger

& git merge 在上图中&#xff0c;每一个绿框均代表一个commit。除了c1&#xff0c;每一个commit都有一条有向边指向它在当前branch当中的上一个commit。 图中的项目&#xff0c;在c2之后就开了另外一个branch&#xff0c;名为experiment。在此之后&#xff0c;master下的修…

matplotlib绘制三维轨迹图

1. 绘制基本三维曲线 # import necessary module from mpl_toolkits.mplot3d import axes3d import matplotlib.pyplot as plt import numpy as np# load data from file # you can replace this using with open data1 np.loadtxt("./pos.txt") # print (data1) n…

求一个矩阵的最大子矩阵

#include <iostream> #include <string> #include <assert.h> #include <malloc.h> #include <iostream> #include <stdlib.h> #include <memory.h> #include <time.h> #include <limits.h>using namespace std;//动态分…

tcpdump抓包文件提取http附加资源

2019独角兽企业重金招聘Python工程师标准>>> 前面几篇文章铺垫了一大批的协议讲解&#xff0c;主要是为了提取pcap文件中http协议附加的资源。 1、解析pcap文件&#xff0c;分为文件格式头&#xff0c;后面就是数据包头和包数据了 2、分析每个包数据&#xff0c;数据…

smobiler介绍(二)

类似开发WinForm的方式&#xff0c;使用C#开发Android和IOS的移动应用&#xff1f;听起来感觉不可思议&#xff0c;那么Smobiler平台到底是如何实现的呢&#xff0c;这里给大家介绍一下。客户端Smobiler分为两种客户端&#xff0c;一种是开发版&#xff0c;一种是打包版开发版&…

Matplotlib基本用法

Matplotlib Matplotlib 是Python中类似 MATLAB 的绘图工具&#xff0c;熟悉 MATLAB 也可以很快的上手 Matplotlib。 1. 认识Matploblib 1.1 Figure 在任何绘图之前&#xff0c;我们需要一个Figure对象&#xff0c;可以理解成我们需要一张画板才能开始绘图。 import matplot…

HDU1201 18岁生日【日期计算】

18岁生日 Time Limit: 2000/1000 MS (Java/Others) Memory Limit: 65536/32768 K (Java/Others) Total Submission(s): 32851 Accepted Submission(s): 10649Problem DescriptionGardon的18岁生日就要到了&#xff0c;他当然很开心&#xff0c;可是他突然想到一个问题&am…

TypeScript 从听说到入门(上篇)

我为什么会这样念念又不忘 / 你用什么牌的箭刺穿我心脏 我也久经沙场 / 戎马生涯 / 依然 / 被一箭刺伤 ——李荣浩《念念又不忘》 接下来我会分上、下两篇文章介绍 TypeScript。 我也是 TypeScript 初学者&#xff0c;这两篇文章是我的学习笔记&#xff0c;来源于一个系列的免费…

SLAM前端中的视觉里程计和回环检测

1. 通常的惯例是把 VSLAM 分为前端和后端。前端为视觉里程计和回环检测&#xff0c;相当于是对图像数据进行关联&#xff1b;后端是对前端输出的结果进行优化&#xff0c;利用滤波或非线性优化理论&#xff0c;得到最优的位姿估计和全局一致性地图。 1 前端&#xff1a;图像数…

粗心导致的bug

不管是调试程序还是直接看输出i都为2&#xff0c;下面是运行时输出的&#xff1a; 用vs2010以前遇到更奇葩的事&#xff0c;这次用vs2013也是遇到奇葩&#xff0c;taskNumber值为2一定&#xff0c;下面两个循环体&#xff0c;每个循环体各执行一次&#xff0c;程序输出i 2真是…

gearman中任务的优先级和返回状态

gearman中任务的优先级和返回状态 一、任务的优先级 同步阻塞调用&#xff0c;等待返回结果 doLow:最低优先 doNomal:正常优先级 doHigh:最优先执行 异步派发任务&#xff0c;不等待返回结果&#xff0c;返回任务句柄&#xff0c;通过该句柄可获取任务运行状态信息 doLowBackgr…

VMware学习使用笔记

本人在学习基础上&#xff0c;结合实际项目实现总结的笔记。以下内容都是基于VMware vSphere 6.7的官方文档中vSAN的规划和部署而来&#xff0c;网址https://docs.vmware.com/cn/VMware-vSphere/index.html。 对于ESXi系统 对于内存不足512G&#xff0c;可以从USB或Micro SD引导…

【原】Java学习笔记020 - 面向对象

1 package cn.temptation;2 3 public class Sample01 {4 public static void main(String[] args) {5 // 成员方法的参数列表&#xff1a;6 // 1、参数列表中的数据类型是值类型7 // 2、参数列表中的数据类型是引用类型8 // A&#xff1a;…

win32 wmi编程获取系统信息

//GetSysInfo.h#pragma once#include <afxtempl.h>class GetSysInfo { public:GetSysInfo(void);~GetSysInfo(void);public: /********获取操作系统版本&#xff0c;Service pack版本、系统类型************/ void GetOSVersion(CString &strOSVersion,CString &…

cmake 注意事项

1. add_subdirectory()调用 CMake将在每次add_subdirectory()调用时创建一个新的变量作用域,因此这个参数最好的用法是放在cmaklists的最后使用&#xff0c;这样的话创建的新的变量的作用范围与内存的变化就不会影响到后面的变量的使用。 查看并打印在cmake里面定义的宏在&am…

Jmeter 使用自定义变量

有些情况下比如发起测试时URL的主机名和端口需要在采样器中出现多次&#xff0c;这样就有个问题&#xff0c;当测试的主机更改时&#xff0c; 我们需要修改主机名称&#xff0c;这时就需要修改多个地方&#xff0c;如果多的情况会有遗漏。如果我们在配置脚本的时候&#xff0c;…

Kubernetes1.5源码分析(二) apiServer之资源注册

源码版本 Kubernetes v1.5.0 简介 k8s里面有各种资源&#xff0c;如Pod、Service、RC、namespaces等资源&#xff0c;用户操作的其实也就是这一大堆资源。但这些资源并不是杂乱无章的&#xff0c;使用了GroupVersion的方式组织在一起。每一种资源都属于一个Group&#xff0c;而…

opencv3 视频稳像

OpneCV3.x中提供了专门应用于视频稳像技术的模块&#xff0c;该模块包含一系列用于全局运动图像估计的函数和类。结构体videostab::RansacParams实现了RANSAC算法&#xff0c;这个算法用来实现连续帧间的运动估计。videostab::MotionEstimatorBase是基类中所有全局运动估计方法…

perf+火焰图 = 性能分析利器

perf 1. perf安装 sudo apt install linux-tools-common检查是否安装好 perf如果出现 You may need to install the following packages for this specific kernel:推荐安装可以按照提示将推荐安装包全部安装好 sudo apt-get install linux-tools-对应版本-generic linux-c…

3- MySQL数据类型

MySQL表字段类型 MySQL数据表的表示一个二维表&#xff0c;由一个或多个数据列构成。 每个数据列都有它的特定类型&#xff0c;该类型决定了MySQL如何看待该列数据&#xff0c;并且约束列存放相应类型的数据。 MySQL中的列表有三种&#xff1a;数值类&#xff0c;字符串类和日期…

AddressSanitizer+cmake

1. AddressSanitizercmake(Linux) 编译指令&#xff1a; CXXFLAGS通常需要加上 -fsanitizeaddress -fno-omit-frame-pointer #打印函数调用路径 -fsanitize-recoveraddress #AddressSanitizer遇到错误时能够继续-fsanitizeaddress-fno-omit-frame-pointer-fsanitize-rec…

vibe前景提取改进算法

// improveVibeAlgorithm.h #ifndef IMPROVED_VIBE_ALGORITHM_H #define IMPROVED_VIBE_ALGORITHM_H#include <opencv2/opencv.hpp> using namespace std;#define WINSIZE 5 // Vibe改进算法, Barnich, Olivier & Droogenbroeck, Marc. (2009). // ViBE: A powerfu…