前置知识: java,nio,多线程
看了几天的源码,写一些自己心得,若有错误请指出。
RPCServer的作用:负责创建listener,reader,responser,handler来处理client端的请求。
RPCServer中重要的子类有:Listener,Reader,Call,Connection,Responser
其中Reader是Listener的子类
listener负责监听client端的请求,主要做nio操作中的accept操作。
while (iter.hasNext()) {key = iter.next();iter.remove();try {if (key.isValid()) {if (key.isAcceptable())doAccept(key);}} catch (IOException ignored) {}key = null; }
与client创建连接,生成新的channel,并将新的channel注册在reader上。
void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {……SocketChannel channel;……Reader reader = getReader();try {reader.startAdd();SelectionKey readKey = reader.registerChannel(channel); //listener接受的连接注册在reader上c = getConnection(channel, System.currentTimeMillis());readKey.attach(c);…… }
reader负责处理listener传过来的channel,依次读取数据,
void doRead(SelectionKey key) throws InterruptedException {int count = 0;Connection c = (Connection)key.attachment();……try {count = c.readAndProcess();} catch (InterruptedException ieo) {…… }
这里调用Connection里面的readAndProcess()方法,这个方法的做用是读取客户端的数据,存入一个buffer字节数组中,给processRequest()方法进行处理,
processRequest方法:
protected void processRequest(byte[] buf) throws IOException, InterruptedException {……//这里的call构造方法中的参数都是由buf中的数据解析出来的,前面省略的代码做了这部分工作Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,totalRequestSize,traceInfo);//这里的scheduler是一个调度器,可以简单理解为一个线程池的控制器,它初始化时会生成默认大小的线程池,参数可由REGION_SERVER_HANDLER_COUNT来指定//也就是jstack文件中的handler线程,默认是30//dispatch方法会获取线程池中的一个线程,执行callRunner中的run()方法。run()方法的功能有:查询结果,并调用sendResponseIfReady()方法来返回数据。scheduler.dispatch(new CallRunner(RpcServer.this, call, userProvider)); }
call的run()方法:
public void run() {……//查询数据,存在resultPair中resultPair = this.rpcServer.call(call.service, call.md, call.param, call.cellScanner,……if (!call.isDelayed() || !call.isReturnValueDelayed()) {Message param = resultPair != null ? resultPair.getFirst() : null;CellScanner cells = resultPair != null ? resultPair.getSecond() : null;call.setResponse(param, cells, errorThrowable, error);}//调用Responser call.sendResponseIfReady();…… }
其中rpcServer的call方法为:
public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status) throws IOException {……//此句进行查询Message result = service.callBlockingMethod(md, controller, param);……//返回给Call对象return new Pair<Message, CellScanner>(result,controller != null? controller.cellScanner(): null); …… }
再详细点的还没看。看了这些主要解决了以下几个疑惑:
reader的线程数在哪指定生成,handler的线程池在哪维护,监听连接请求的线程有几个?responser的线程又有几个?
listener只有一个,
listener中有一个Reader数组,默认是10,也就是说读取请求数据的连接池大小为10。
private class Listener extends Thread { ……private Reader[] readers = null;
handler的线程池由RPCServer中的scheduler维护,默认是30,
listener监听到一个请求后,生成对应的channel发送给Reader,然后Reader会为每一个channel创建一个connection,
connection中保存了连接的信息。然后调用connection的方法来读取请求参数,并生成call对象,这时将调用scheduler,
使用handler线程池(默认30)来查询数据,(这里就开始并行了),结果存在call对象用,call对象最后再调用responser类的方法,将结果返回给client。
responser只有一个线程,它维护了一个call链表,采用非阻塞的方式(这里要说也是并行),依次将call对象送出。
大致过程就是这样