memcached(二)事件模型源码分析
在memcachedd中,作者为了专注于缓存的设计,使用了libevent来开发事件模型。memcachedd的时间模型同nginx的类似,拥有一个主进行(master)以及多个工作者线程(woker)。
流程图
在memcached中,是先对工作者线程进行初始化并启动,然后才会创建启动主线程。
工作者线程
初始化
memcached对工作者线程进行初始化,参数分别为线程数量以及`main_base`,
/* start up worker threads if MT mode */ thread_init(settings.num_threads, main_base);

/** Initializes the thread subsystem, creating various worker threads.** nthreads Number of worker event handler threads to spawn* main_base Event base for main thread*/ void thread_init(int nthreads, struct event_base *main_base) {int i;int power;pthread_mutex_init(&cache_lock, NULL);pthread_mutex_init(&stats_lock, NULL);pthread_mutex_init(&init_lock, NULL);pthread_cond_init(&init_cond, NULL);pthread_mutex_init(&cqi_freelist_lock, NULL);cqi_freelist = NULL;/* Want a wide lock table, but don't waste memory */if (nthreads < 3) {power = 10;} else if (nthreads < 4) {power = 11;} else if (nthreads < 5) {power = 12;} else {/* 8192 buckets, and central locks don't scale much past 5 threads */power = 13;}item_lock_count = hashsize(power);item_lock_hashpower = power;item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t));if (! item_locks) {perror("Can't allocate item locks");exit(1);}for (i = 0; i < item_lock_count; i++) {pthread_mutex_init(&item_locks[i], NULL);}pthread_key_create(&item_lock_type_key, NULL);pthread_mutex_init(&item_global_lock, NULL);threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));if (! threads) {perror("Can't allocate thread descriptors");exit(1);}dispatcher_thread.base = main_base;dispatcher_thread.thread_id = pthread_self();for (i = 0; i < nthreads; i++) {int fds[2];if (pipe(fds)) {perror("Can't create notify pipe");exit(1);}threads[i].notify_receive_fd = fds[0];threads[i].notify_send_fd = fds[1];setup_thread(&threads[i]);/* Reserve three fds for the libevent base, and two for the pipe */stats.reserved_fds += 5;}/* Create threads after we've done all the libevent setup. */for (i = 0; i < nthreads; i++) {create_worker(worker_libevent, &threads[i]);}/* Wait for all the threads to set themselves up before returning. */pthread_mutex_lock(&init_lock);wait_for_thread_registration(nthreads);pthread_mutex_unlock(&init_lock); }
在memcachedd中为了避免多线程共享资源的使用使用了很多锁,这里对锁不做介绍。
线程的结构体
typedef struct {pthread_t thread_id; /* unique ID of this thread 线程ID*/struct event_base *base; /* libevent handle this thread uses libevent事件*/struct event notify_event; /* listen event for notify pipe 注册事件*/int notify_receive_fd; /* receiving end of notify pipe 管道中接收端*/int notify_send_fd; /* sending end of notify pipe 管道中发送端*/struct thread_stats stats; /* Stats generated by this thread 线程状态*/struct conn_queue *new_conn_queue; /* queue of new connections to handle 消息队列*/cache_t *suffix_cache; /* suffix cache */uint8_t item_lock_type; /* use fine-grained or global item lock */ } LIBEVENT_THREAD;
初始化工作者线程
for (i = 0; i < nthreads; i++) {int fds[2];/* 创建管道 */if (pipe(fds)) {perror("Can't create notify pipe");exit(1);}/* 设置线程管道的读写入口 */threads[i].notify_receive_fd = fds[0];threads[i].notify_send_fd = fds[1];/* 设置线程属性 */setup_thread(&threads[i]);/* Reserve three fds for the libevent base, and two for the pipe */stats.reserved_fds += 5;}
设置线程属性
/** Set up a thread's information.*/ static void setup_thread(LIBEVENT_THREAD *me) {me->base = event_init(); //初始化线程事件if (! me->base) {fprintf(stderr, "Can't allocate event base\n");exit(1);}/* 初始化监听事件 *//* Listen for notifications from other threads */event_set(&me->notify_event, me->notify_receive_fd,EV_READ | EV_PERSIST, thread_libevent_process, me);/* 把事件绑定到线程事件 */event_base_set(me->base, &me->notify_event);/* 注册事件到监听状态 */if (event_add(&me->notify_event, 0) == -1) {fprintf(stderr, "Can't monitor libevent notify pipe\n");exit(1);}... }
READ回调函数
/** Processes an incoming "handle a new connection" item. This is called when* input arrives on the libevent wakeup pipe.*/ static void thread_libevent_process(int fd, short which, void *arg) {.../* 从管道读取消息 */if (read(fd, buf, 1) != 1)if (settings.verbose > 0)fprintf(stderr, "Can't read from libevent pipe\n");item = cq_pop(me->new_conn_queue); //读取连接 ... }
启动工作者线程
/* Create threads after we've done all the libevent setup. */ for (i = 0; i < nthreads; i++) {create_worker(worker_libevent, &threads[i]); }
`create_woker`函数创建工作者线程,
/** Creates a worker thread.*/ static void create_worker(void *(*func)(void *), void *arg) {pthread_t thread;pthread_attr_t attr;int ret;pthread_attr_init(&attr);if ((ret = pthread_create(&thread, &attr, func, arg)) != 0) {fprintf(stderr, "Can't create thread: %s\n",strerror(ret));exit(1);} }
`worker_libevent`函数进入线程循环监听状态,
/** Worker thread: main event loop*/ static void *worker_libevent(void *arg) {LIBEVENT_THREAD *me = arg;/* Any per-thread setup can happen here; thread_init() will block until* all threads have finished initializing.*//* set an indexable thread-specific memory item for the lock type.* this could be unnecessary if we pass the conn *c struct through* all item_lock calls...*/me->item_lock_type = ITEM_LOCK_GRANULAR;pthread_setspecific(item_lock_type_key, &me->item_lock_type);register_thread_initialized();event_base_loop(me->base, 0);return NULL; }
主线程
初始化
static struct event_base* mian_base;/* initialize main thread libevent instance */ main_base = event_init();
在`memcached.c`的主函数中,使用`libevent`的事件初始化函数来初始化`main_base`。
初始化socket
这里只介绍tcp连接,其中使用`server_sockets`来调用`server_socket`来初始化连接。
if (settings.port && server_sockets(settings.port, tcp_transport, portnumber_file)) {vperror("failed to listzhefen on TCP port %d", settings.port);exit(EX_OSERR); }
static int server_sockets(int port, enum network_transport transport,FILE *portnumber_file) {if (settings.inter == NULL) {return server_socket(settings.inter, port, transport, portnumber_file);}... }
而在`server_socket`中完成了socket的初始化、绑定等操作。

/*** Create a socket and bind it to a specific port number* @param interface the interface to bind to* @param port the port number to bind to* @param transport the transport protocol (TCP / UDP)* @param portnumber_file A filepointer to write the port numbers to* when they are successfully added to the list of ports we* listen on.*/ static int server_socket(const char *interface,int port,enum network_transport transport,FILE *portnumber_file) {int sfd;struct linger ling = {0, 0};struct addrinfo *ai;struct addrinfo *next;struct addrinfo hints = { .ai_flags = AI_PASSIVE,.ai_family = AF_UNSPEC };char port_buf[NI_MAXSERV];int error;int success = 0;int flags =1;hints.ai_socktype = IS_UDP(transport) ? SOCK_DGRAM : SOCK_STREAM;if (port == -1) {port = 0;}snprintf(port_buf, sizeof(port_buf), "%d", port);error= getaddrinfo(interface, port_buf, &hints, &ai);if (error != 0) {if (error != EAI_SYSTEM)fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(error));elseperror("getaddrinfo()");return 1;}for (next= ai; next; next= next->ai_next) {conn *listen_conn_add;if ((sfd = new_socket(next)) == -1) {/* getaddrinfo can return "junk" addresses,* we make sure at least one works before erroring.*/if (errno == EMFILE) {/* ...unless we're out of fds */perror("server_socket");exit(EX_OSERR);}continue;}#ifdef IPV6_V6ONLYif (next->ai_family == AF_INET6) {error = setsockopt(sfd, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &flags, sizeof(flags));if (error != 0) {perror("setsockopt");close(sfd);continue;}} #endifsetsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));if (IS_UDP(transport)) {maximize_sndbuf(sfd);} else {error = setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));if (error != 0)perror("setsockopt");error = setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));if (error != 0)perror("setsockopt");error = setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));if (error != 0)perror("setsockopt");}if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) {if (errno != EADDRINUSE) {perror("bind()");close(sfd);freeaddrinfo(ai);return 1;}close(sfd);continue;} else {success++;if (!IS_UDP(transport) && listen(sfd, settings.backlog) == -1) {perror("listen()");close(sfd);freeaddrinfo(ai);return 1;}if (portnumber_file != NULL &&(next->ai_addr->sa_family == AF_INET ||next->ai_addr->sa_family == AF_INET6)) {union {struct sockaddr_in in;struct sockaddr_in6 in6;} my_sockaddr;socklen_t len = sizeof(my_sockaddr);if (getsockname(sfd, (struct sockaddr*)&my_sockaddr, &len)==0) {if (next->ai_addr->sa_family == AF_INET) {fprintf(portnumber_file, "%s INET: %u\n",IS_UDP(transport) ? "UDP" : "TCP",ntohs(my_sockaddr.in.sin_port));} else {fprintf(portnumber_file, "%s INET6: %u\n",IS_UDP(transport) ? "UDP" : "TCP",ntohs(my_sockaddr.in6.sin6_port));}}}}if (IS_UDP(transport)) {int c;for (c = 0; c < settings.num_threads_per_udp; c++) {/* Allocate one UDP file descriptor per worker thread;* this allows "stats conns" to separately list multiple* parallel UDP requests in progress.** The dispatch code round-robins new connection requests* among threads, so this is guaranteed to assign one* FD to each thread.*/int per_thread_fd = c ? dup(sfd) : sfd;dispatch_conn_new(per_thread_fd, conn_read,EV_READ | EV_PERSIST,UDP_READ_BUFFER_SIZE, transport);}} else {if (!(listen_conn_add = conn_new(sfd, conn_listening,EV_READ | EV_PERSIST, 1,transport, main_base))) {fprintf(stderr, "failed to create listening connection\n");exit(EXIT_FAILURE);}listen_conn_add->next = listen_conn;listen_conn = listen_conn_add;}}freeaddrinfo(ai);/* Return zero iff we detected no errors in starting up connections */return success == 0; }
主线程事件
在主线程中通过`conn_new`函数来建立主线程和工作者线程之间的关系。
/* 设置线程事件 */ event_set(&c->event, sfd, event_flags, event_handler, (void *)c); event_base_set(base, &c->event); c->ev_flags = event_flags;/* 注册事件到监听 */ if (event_add(&c->event, 0) == -1) {perror("event_add");return NULL; }
事件处理
上面中设置了事件的回调函数`event_handler`,而在`event_handler`中,主要调用了`driver_machine`函数。
driver_machine看名字就知道,想发动机一样的函数,那么该函数主要是处理各种事件以及相应的处理方法。
这里只简要介绍一个函数调用`dispatch_conn_new`。
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,int read_buffer_size, enum network_transport transport) {CQ_ITEM *item = cqi_new();char buf[1];if (item == NULL) {close(sfd);/* given that malloc failed this may also fail, but let's try */fprintf(stderr, "Failed to allocate memory for connection object\n");return ;}int tid = (last_thread + 1) % settings.num_threads;LIBEVENT_THREAD *thread = threads + tid; //循环获取工作者线程 last_thread = tid;item->sfd = sfd;item->init_state = init_state;item->event_flags = event_flags;item->read_buffer_size = read_buffer_size;item->transport = transport;cq_push(thread->new_conn_queue, item); //连接加入懂啊队列 memcachedD_CONN_DISPATCH(sfd, thread->thread_id);buf[0] = 'c';if (write(thread->notify_send_fd, buf, 1) != 1) {//向管道写入消息perror("Writing to thread notify pipe");} }
本文 由 cococo点点 创作,采用 知识共享 署名-非商业性使用-相同方式共享 3.0 中国大陆 许可协议进行许可。欢迎转载,请注明出处:
转载自:cococo点点 http://www.cnblogs.com/coder2012
相关文章:

【MATLAB】MATLAB的控制流
1、if-else-end if expressioncommands1 elseif expression2commands2 ... else commandsn end 2、switch-case switch valuecase1 test1%如果value等于test1,执行command1,并结束此结构command1case2 test2command2...case3 testkcommandk otherw…

Linux查看本机端口
查看指定的端口 # lsof -i:port 查看所有端口 # netstat -aptn 安装telnet #yum install -y telnet.x86_64 #telnet ip 端口

Node.js安装
通过nvm安装 下载nvm并执行wget -qO- https://raw.github.com/creationix/nvm/v0.33.11/install.sh | sh将命令输出到终端命令中~/.bashrcexport NVM_DIR"$HOME/.nvm"更新文件source .bashrc通过nvm安装node.jsnvm install 10.13安装的版本是10.13的版本 通过命令查看…

mongodb常用语句以及SpringBoot中使用mongodb
普通查询 某个字段匹配数组内的元素数量的,假如region只有一个元素的 db.getCollection(map).find({region:{$size:1}}) 假如region只有0个元素的 db.getCollection(map).find({region:{$size:0}}) db.getCollection(map).find({region:{$size:1}}).count() db.get…

2002高教社杯---A车灯线光源的优化设计
A题 车灯线光源的优化设计 安装在汽车头部的车灯的形状为一旋转抛物面,车灯的对称轴水平地指向正前方, 其开口半径36毫米,深度21.6毫米。经过车灯的焦点,在与对称轴相垂直的水平方向,对称地放置一定长度的均匀分布的线光源。要求…

从Date类型转为中文字符串
//主方法public static String DateToCh(Date date) {Calendar cal Calendar.getInstance();cal.setTime(date);int year cal.get(Calendar.YEAR);int month cal.get(Calendar.MONTH) 1;int day cal.get(Calendar.DAY_OF_MONTH);return getYear(year) getTenString(month…

第十四课 如何在DAPP应用实现自带钱包转账功能?
1,为什么DAPP生态需要自带钱包功能? 区块链是一个伟大的发明,它改变了生产关系。很多生态,有了区块链技术,可以由全公司员工的"全员合伙人"变成了全平台的”全体合伙人”了,是真正的共享经济模式…

为什么jdk源码推荐ThreadLocal使用static
ThreadLocal是线程私有变量,本身是解决多线程环境线程安全,可以说单线程实际上没必要使用。 既然多线程环境本身不使用static,那么又怎么会线程不安全。所以这个问题本身并不是问题,只是有人没有理解ThreadLocal的真正使用场景&a…

C与C++之间相互调用
1、导出C函数以用于C或C的项目 如果使用C语言编写的DLL,希望从中导出函数给C或C的模块访问,则应使用 __cplusplus 预处理器宏确定正在编译的语言。如果是从C语言模块使用,则用C链接声明这些函数。如果使用此技术并为DLL提供头文件,…
【MATLAB】三维图形的绘制mesh
步骤如下: (1)确定自变量x和y的取值范围和取值间隔 x x1 :dx :x2 , y y1 : dy : y2 (2)构成xoy平面上的自变量采样“格点”矩阵 ①利用格点矩阵的原理生成矩阵。 xx1:dx:x2; yy1:dy:y2; Xones(size(y))*x; Yy*o…

ORA-01919: role 'PLUSTRACE' does not exist
环境:Oracle 10g,11g.现象:在一次迁移测试中,发现有这样的角色赋权会报错不存在: SYSorcl> grant PLUSTRACE to jingyu; grant PLUSTRACE to jingyu* ERROR at line 1: ORA-01919: role PLUSTRACE does not exist 查询发现这个…

Java反射以及应用
需求:需要通过反射动态获取类的字段类型,然后做特殊处理 Java反射getDeclaredField和getField的区别 getDeclaredFiled 只能获取类本身的属性成员(包括私有、共有、保护) getField 仅能获取类(及其父类可以自己测试) public属性…

【MATLAB】雅可比矩阵jacobi matrix
参考页面: https://baike.baidu.com/item/%E9%9B%85%E5%8F%AF%E6%AF%94%E7%9F%A9%E9%98%B5/10753754?fraladdin#1 在向量微积分中,雅可比矩阵是一阶偏导数以一定方式排列成的矩阵,其行列式称为雅可比行列式。 由球坐标系到直角坐标系的转…

Laravel:使用Migrations
1、首先利用artisan创建一个可迁移的数据表模板,该命令运行后会在database/migrations目录下生成一个文件 php artisan make:migration create_fees_count_table --createfees_count 2、生成的文件包含up和down两个方法,其中up中是包含了添加表ÿ…

基于libevent和unix domain socket的本地server
https://www.pacificsimplicity.ca/blog/libevent-echo-server-tutorial 根据这一篇写一个最简单的demo。然后开始写client。 client调优 client最初的代码如下: 1 #include <sys/socket.h>2 #include <sys/un.h>3 #include <stdio.h>4 #include …

软件体系架构模式之一什么是软件架构模式
什么是软件架构模式 计划启动未开发的软件项目?然后选择正确的架构模式将对项目的结果起关键作用。选择市场上最流行或最新的技术并不总是意味着会带来最好的结果。但是,选择最合适的解决方案将为行之有效的问题和反复出现的问题提供可靠的解决方案。 …

HP 服务器 iLO 远程控制软件 介绍
iLO了解:iLO 是一组芯片,内部是vxworks的嵌入操作系统,在服务器的背后有一个标准RJ45口对外连接生产用交换机或者带外管理的交换机。iLO 全名是 Integrated Lights-out,它是惠普某些型号的服务器上集成的远程管理端口,它能够允许用…
【MATLAB】数据分析之数据插值
插值:求过已知有限个数据点的近似函数。 区别于拟合: 拟合:已知有限个数据点求近似函数,不要求过已知数据点,只要求在某种意义下它在这些点上的总偏差最小。 基本常用的插值方法:拉格朗日多项式插值&…

迈斯!啊呸~数学
1.数论 快速幂 int po(int x,int y) {int ans1;while(y){if(y%21)ans1ll*ans*x%p;x1ll*x*x%p;y/2;}return ans; } 乘法逆元(保证模域p与求逆元的数互质) po(a,p-2);//a为需要求逆元的数 扩展欧几里得(exgcd) #include<cstdio&g…

软件体系架构模式之二分层体系结构
分层体系结构模式是n层模式,其中组件被组织在水平层中。这是设计大多数软件的传统方法,并且具有独立性。这意味着所有组件都是互连的,但彼此之间不依赖。 图1:分层架构 在此体系结构中有四层,其中每一层在模块和其中的…

linux下mysql的root密码忘记解决方法
1.首先确认服务器出于安全的状态,最安全的状态是到服务器的Console上面操作,并且拔掉网线,或者可以使用--skip-networking限制只能从本地连接2.修改MySQL的登录设置: # vim /etc/my.cnf在[mysqld]的段中加上…
【Python】turtle库的小应用
心血来潮,哈哈哈,画的不好,请多见谅 大家如果想要尝试turtle库,可以借鉴: https://www.cnblogs.com/nowgood/p/turtle.html 导入库,我的pycharm里可以直接使用,哈哈哈,不行就pip…

[转]MySQL修改时区的方法小结
本文转自:https://www.cnblogs.com/mracale/p/6064447.html 这篇文章主要介绍了MySQL修改时区的方法,总结分析了三种常见的MySQL时区修改技巧,包括命令行模式、配置文件方式及代码方式,需要的朋友可以参考下 方法一:通过mysql命令行模式下动态修改 1.1 查…

swift轮播图代码
import UIKit private let CycleCellID "CycleCellID" class BannerCycleView: UIView { var layout : UICollectionViewFlowLayout! var collectionView :UICollectionView! var pageContol : UIPageControl? var cycleTimer : Timer? let timeInterval : TimeI…

软件体系架构模式之三微内核体系架构
当您的软件系统或产品包含许多与外部实体的集成点时,微内核体系结构模式将非常有用。最重要的是,您将无法准确判断出将来将哪些实体集成到您的系统中。可以将微内核架构模式识别为基于插件的模式。。也称为插件架构模式,它由两个主要组件组成…
【MATLAB】交互式绘图(ginput,gtext,zoom)
1、ginput指令: [x,y]ginput(n) %(其功能是用鼠标从二维图形中获取n个点的数据坐标) x-8*pi:pi/100:8*pi; y1sin(x); y24*sin(x/4); plot(x,y1,r-,x,y2,k-); legend(sin(x),4sin(x/4)); grid on; [x,y]ginput(8); %在下方命令窗…

【斗医】【18】Web应用开发20天
原创作品,允许转载,转载时请务必以超链接形式标明文章 原始出处 、作者信息和本声明。否则将追究法律责任。http://qingkechina.blog.51cto.com/5552198/1544928 本文通过获取首页内容来讲解Web页面拖拽获取当前页数据,功能类似在google查看图…

数据库和缓存一致性的问题
经常看到有人问怎么解决数据库和缓存一致性的问题,这个问题我觉得是不要去解决。 如果你不信你先看我列的几种情况 假设 数据库一开始和缓存都是1元。 用户更新数据库的同时双写缓存。 1.双写不删 写库充值10元>>>返回成功 ----- 时间间隔 <<<…
【MATLAB】数据分析之求函数的极限(limit)
在MATLAB中采用limit函数求某个具体函数的极限,其调用格式如下: limit(expr,x,a):当xa时,对函数expr求极限,返回值为函数极限。limit(expr):默认当x0时,对函数expr求极限,返回值为函…

Spring Boot集成Swagger导入YApi@无界编程
接口APi开发现状 现在开发接口都要在类似YApi上写文档,这样方便不同的团队之间协作,同步更新接口,提高效率。 但是如果接口很多,你一个个手工在YApi去录入无疑效率很低。 如果是使用Spring Boot集成Swagger可以直接导入YApi非常…