当前位置: 首页 > 编程日记 > 正文

RPC-原理及RPC实例分析

还有就是:RPC支持的BIO,NIO的理解

(1)BIO: Blocking IO;同步阻塞;

(2)NIO:Non-Blocking IO, 同步非阻塞;

参考:IO多路复用,同步,异步,阻塞和非阻塞 区别

在学校期间大家都写过不少程序,比如写个hello world服务类,然后本地调用下,如下所示。这些程序的特点是服务消费方和服务提供方是本地调用关系。

 
public class Test {public static void main(String[] args) {HelloWorldService helloWorldService = new HelloWorldServiceImpl();helloWorldService.sayHello("test");}
}

而一旦踏入公司尤其是大型互联网公司就会发现,公司的系统都由成千上万大大小小的服务组成,各服务部署在不同的机器上,由不同的团队负责。

这时就会遇到两个问题:

  1. 要搭建一个新服务,免不了需要依赖他人的服务,而现在他人的服务都在远端,怎么调用?
  2. 其它团队要使用我们的新服务,我们的服务该怎么发布以便他人调用?下文将对这两个问题展开探讨。

1.  如何调用他人的远程服务?

由于各服务部署在不同机器,服务间的调用免不了网络通信过程,服务消费方每调用一个服务都要写一坨网络通信相关的代码,不仅复杂而且极易出错。

如果有一种方式能让我们像调用本地服务一样调用远程服务,而让调用者对网络通信这些细节透明,那么将大大提高生产力,比如服务消费方在执行helloWorldService.sayHello(“test”)时,实质上调用的是远端的服务。这种方式其实就是RPC(Remote Procedure Call Protocol),在各大互联网公司中被广泛使用,如阿里巴巴的hsf、dubbo(开源)、Facebook的thrift(开源)、Google grpc(开源)、Twitter的finagle(开源)等。

要让网络通信细节对使用者透明,我们需要对通信细节进行封装,我们先看下一个RPC调用的流程涉及到哪些通信细节:

  1. 服务消费方(client)调用以本地调用方式调用服务;
  2. client stub接收到调用后负责将方法、参数等组装成能够进行网络传输的消息体;
  3. client stub找到服务地址,并将消息发送到服务端;
  4. server stub收到消息后进行解码;
  5. server stub根据解码结果调用本地的服务;
  6. 本地服务执行并将结果返回给server stub;
  7. server stub将返回结果打包成消息并发送至消费方;
  8. client stub接收到消息,并进行解码;
  9. 服务消费方得到最终结果。

RPC的目标就是要2~8这些步骤都封装起来,让用户对这些细节透明。

1.1 怎么做到透明化远程服务调用?

怎么封装通信细节才能让用户像以本地调用方式调用远程服务呢?对java来说就是使用代理!java代理有两种方式:

  1. jdk 动态代理
  2. 字节码生成

尽管字节码生成方式实现的代理更为强大和高效,但代码维护不易,大部分公司实现RPC框架时还是选择动态代理方式。

下面简单介绍下动态代理怎么实现我们的需求。我们需要实现RPCProxyClient代理类,代理类的invoke方法中封装了与远端服务通信的细节,消费方首先从RPCProxyClient获得服务提供方的接口,当执行helloWorldService.sayHello(“test”)方法时就会调用invoke方法。

 
public class RPCProxyClient implements java.lang.reflect.InvocationHandler{private Object obj;public RPCProxyClient(Object obj){this.obj=obj;}/*** 得到被代理对象;*/public static Object getProxy(Object obj){return java.lang.reflect.Proxy.newProxyInstance(obj.getClass().getClassLoader(),obj.getClass().getInterfaces(), new RPCProxyClient(obj));}/*** 调用此方法执行*/public Object invoke(Object proxy, Method method, Object[] args)throws Throwable {//结果参数;Object result = new Object();// ...执行通信相关逻辑// ...return result;}
}
public class Test {public static void main(String[] args) {HelloWorldService helloWorldService = (HelloWorldService)RPCProxyClient.getProxy(HelloWorldService.class);helloWorldService.sayHello("test");}}

 

1.2  怎么对消息进行编码和解码?

1.2.1 确定消息数据结构

上节讲了invoke里需要封装通信细节(通信细节再后面几章详细探讨),而通信的第一步就是要确定客户端和服务端相互通信的消息结构。客户端的请求消息结构一般需要包括以下内容:

1)接口名称

在我们的例子里接口名是“HelloWorldService”,如果不传,服务端就不知道调用哪个接口了;

2)方法名

一个接口内可能有很多方法,如果不传方法名服务端也就不知道调用哪个方法;

3)参数类型&参数值

参数类型有很多,比如有bool、int、long、double、string、map、list,甚至如struct(class);以及相应的参数值;

4)超时时间

5)requestID,标识唯一请求id,在下面一节会详细描述requestID的用处。

同理服务端返回的消息结构一般包括以下内容。

1)返回值

2)状态code

3)requestID

1.2.2 序列化

一旦确定了消息的数据结构后,下一步就是要考虑序列化与反序列化了。

什么是序列化?序列化就是将数据结构或对象转换成二进制串的过程,也就是编码的过程。

什么是反序列化?将在序列化过程中所生成的二进制串转换成数据结构或者对象的过程。

为什么需要序列化?转换为二进制串后才好进行网络传输嘛!

为什么需要反序列化?将二进制转换为对象才好进行后续处理!

现如今序列化的方案越来越多,每种序列化方案都有优点和缺点,它们在设计之初有自己独特的应用场景,那到底选择哪种呢?从RPC的角度上看,主要看三点:

  1. 通用性,比如是否能支持Map等复杂的数据结构;
  2. 性能,包括时间复杂度和空间复杂度,由于RPC框架将会被公司几乎所有服务使用,如果序列化上能节约一点时间,对整个公司的收益都将非常可观,同理如果序列化上能节约一点内存,网络带宽也能省下不少;
  3. 可扩展性,对互联网公司而言,业务变化飞快,如果序列化协议具有良好的可扩展性,支持自动增加新的业务字段,而不影响老的服务,这将大大提供系统的灵活度。

目前互联网公司广泛使用Protobuf、Thrift、Avro等成熟的序列化解决方案来搭建RPC框架,这些都是久经考验的解决方案。

1.3  通信

消息数据结构被序列化为二进制串后,下一步就要进行网络通信了。目前有两种常用IO通信模型:1)BIO;2)NIO。一般RPC框架需要支持这两种IO模型。

如何实现RPC的IO通信框架呢?

  1. 使用java nio方式自研,这种方式较为复杂,而且很有可能出现隐藏bug,但也见过一些互联网公司使用这种方式;
  2. 基于mina,mina在早几年比较火热,不过这些年版本更新缓慢;
  3. 基于netty,现在很多RPC框架都直接基于netty这一IO通信框架,省力又省心,比如阿里巴巴的HSF、dubbo,Twitter的finagle等。

1.4  消息里为什么要有requestID?

如果使用netty的话,一般会用channel.writeAndFlush()方法来发送消息二进制串,这个方法调用后对于整个远程调用(从发出请求到接收到结果)来说是一个异步的,即对于当前线程来说,将请求发送出来后,线程就可以往后执行了,至于服务端的结果,是服务端处理完成后,再以消息的形式发送给客户端的。于是这里出现以下两个问题:

  1. 怎么让当前线程“暂停”,等结果回来后,再向后执行?
  2. 如果有多个线程同时进行远程方法调用,这时建立在client server之间的socket连接上会有很多双方发送的消息传递,前后顺序也可能是随机的,server处理完结果后,将结果消息发送给client,client收到很多消息,怎么知道哪个消息结果是原先哪个线程调用的?

如下图所示,线程A和线程B同时向client socket发送请求requestA和requestB,socket先后将requestB和requestA发送至server,而server可能将responseA先返回,尽管requestA请求到达时间更晚。我们需要一种机制保证responseA丢给ThreadA,responseB丢给ThreadB。

怎么解决呢?

  1. client线程每次通过socket调用一次远程接口前,生成一个唯一的ID,即requestID(requestID必需保证在一个Socket连接里面是唯一的),一般常常使用AtomicLong从0开始累计数字生成唯一ID;
  2. 将处理结果的回调对象callback,存放到全局ConcurrentHashMap里面put(requestID, callback);
  3. 当线程调用channel.writeAndFlush()发送消息后,紧接着执行callback的get()方法试图获取远程返回的结果。在get()内部,则使用synchronized获取回调对象callback的锁,再先检测是否已经获取到结果,如果没有,然后调用callback的wait()方法,释放callback上的锁,让当前线程处于等待状态。
  4. 服务端接收到请求并处理后,将response结果(此结果中包含了前面的requestID)发送给客户端,客户端socket连接上专门监听消息的线程收到消息,分析结果,取到requestID,再从前面的ConcurrentHashMap里面get(requestID),从而找到callback对象,再用synchronized获取callback上的锁,将方法调用结果设置到callback对象里,再调用callback.notifyAll()唤醒前面处于等待状态的线程。
 
public Object get() {synchronized (this) { // 旋锁while (!isDone) { // 是否有结果了wait(); //没结果是释放锁,让当前线程处于等待状态
            }}
}
private void setDone(Response res) {this.res = res;isDone = true;synchronized (this) { //获取锁,因为前面wait()已经释放了callback的锁了notifyAll(); // 唤醒处于等待的线程
        }}

 

2 如何发布自己的服务?

如何让别人使用我们的服务呢?有同学说很简单嘛,告诉使用者服务的IP以及端口就可以了啊。确实是这样,这里问题的关键在于是自动告知还是人肉告知。

人肉告知的方式:如果你发现你的服务一台机器不够,要再添加一台,这个时候就要告诉调用者我现在有两个ip了,你们要轮询调用来实现负载均衡;调用者咬咬牙改了,结果某天一台机器挂了,调用者发现服务有一半不可用,他又只能手动修改代码来删除挂掉那台机器的ip。现实生产环境当然不会使用人肉方式。

有没有一种方法能实现自动告知,即机器的增添、剔除对调用方透明,调用者不再需要写死服务提供方地址?当然可以,现如今zookeeper被广泛用于实现服务自动注册与发现功能!

简单来讲,zookeeper可以充当一个服务注册表(Service Registry),让多个服务提供者形成一个集群,让服务消费者通过服务注册表获取具体的服务访问地址(ip+端口)去访问具体的服务提供者。如下图所示:

具体来说,zookeeper就是个分布式文件系统,每当一个服务提供者部署后都要将自己的服务注册到zookeeper的某一路径上: /{service}/{version}/{ip:port}, 比如我们的HelloWorldService部署到两台机器,那么zookeeper上就会创建两条目录:分别为/HelloWorldService/1.0.0/100.19.20.01:16888  /HelloWorldService/1.0.0/100.19.20.02:16888。

zookeeper提供了“心跳检测”功能,它会定时向各个服务提供者发送一个请求(实际上建立的是一个 Socket 长连接),如果长期没有响应,服务中心就认为该服务提供者已经“挂了”,并将其剔除,比如100.19.20.02这台机器如果宕机了,那么zookeeper上的路径就会只剩/HelloWorldService/1.0.0/100.19.20.01:16888。

服务消费者会去监听相应路径(/HelloWorldService/1.0.0),一旦路径上的数据有任务变化(增加或减少),zookeeper都会通知服务消费方服务提供者地址列表已经发生改变,从而进行更新。

更为重要的是zookeeper与生俱来的容错容灾能力(比如leader选举),可以确保服务注册表的高可用性。

3.Hadoop中RPC实例分析

ipc.RPC类中有一些内部类,为了大家对RPC类有个初步的印象,就先罗列几个我们感兴趣的分析一下吧:

Invocation :用于封装方法名和参数,作为数据传输层。
ClientCache :用于存储client对象,用socket factory作为hash key,存储结构为hashMap <SocketFactory, Client>。
Invoker :是动态代理中的调用实现类,继承了InvocationHandler.
Server :是ipc.Server的实现类。

 
public Object invoke(Object proxy, Method method, Object[] args)throws Throwable {•••ObjectWritable value = (ObjectWritable)client.call(new Invocation(method, args), remoteId);•••return value.get();}

如果你发现这个invoke()方法实现的有些奇怪的话,那你就对了。一般我们看到的动态代理的invoke()方法中总会有 method.invoke(ac, arg);  这句代码。而上面代码中却没有,这是为什么呢?其实使用 method.invoke(ac, arg); 是在本地JVM中调用;而在hadoop中,是将数据发送给服务端,服务端将处理的结果再返回给客户端,所以这里的invoke()方法必然需要进行网络通信。而网络通信就是下面的这段代码实现的:

 
ObjectWritable value = (ObjectWritable)
client.call(new Invocation(method, args), remoteId);

Invocation类在这里封装了方法名和参数。其实这里网络通信只是调用了Client类的call()方法。那我们接下来分析一下ipc.Client源码吧。和第一章一样,同样是3个问题

  1. 客户端和服务端的连接是怎样建立的?
  2. 客户端是怎样给服务端发送数据的?
  3. 客户端是怎样获取服务端的返回数据的?

3.1 客户端和服务端的连接是怎样建立的?

public Writable call(Writable param, ConnectionId remoteId) throws InterruptedException, IOException {Call call = new Call(param);       //将传入的数据封装成call对象Connection connection = getConnection(remoteId, call);   //获得一个连接connection.sendParam(call);     // 向服务端发送call对象boolean interrupted = false;synchronized (call) {while (!call.done) {try {call.wait(); // 等待结果的返回,在Call类的callComplete()方法里有notify()方法用于唤醒线程} catch (InterruptedException ie) {// 因中断异常而终止,设置标志interrupted为trueinterrupted = true;}}if (interrupted) {Thread.currentThread().interrupt();}if (call.error != null) {if (call.error instanceof RemoteException) {call.error.fillInStackTrace();throw call.error;} else { // 本地异常throw wrapException(remoteId.getAddress(), call.error);}} else {return call.value; //返回结果数据
      }}}

 

具体代码的作用我已做了注释,所以这里不再赘述。但到目前为止,你依然不知道RPC机制底层的网络连接是怎么建立的。分析代码后,我们会发现和网络通信有关的代码只会是下面的两句了:

 
Connection connection = getConnection(remoteId, call);   //获得一个连接connection.sendParam(call);      // 向服务端发送call对象

先看看是怎么获得一个到服务端的连接吧,下面贴出ipc.Client类中的getConnection()方法。

 
private Connection getConnection(ConnectionId remoteId,Call call)throws IOException, InterruptedException {if (!running.get()) {// 如果client关闭了throw new IOException("The client is stopped");}Connection connection;
//如果connections连接池中有对应的连接对象,就不需重新创建了;如果没有就需重新创建一个连接对象。
//但请注意,该//连接对象只是存储了remoteId的信息,其实还并没有和服务端建立连接。do {synchronized (connections) {connection = connections.get(remoteId);if (connection == null) {connection = new Connection(remoteId);connections.put(remoteId, connection);}}} while (!connection.addCall(call)); //将call对象放入对应连接中的calls池,就不贴出源码了//这句代码才是真正的完成了和服务端建立连接哦~
    connection.setupIOstreams();return connection;}

下面贴出Client.Connection类中的setupIOstreams()方法:

 
private synchronized void setupIOstreams() throws InterruptedException {•••try {•••while (true) {setupConnection();  //建立连接InputStream inStream = NetUtils.getInputStream(socket);     //获得输入流OutputStream outStream = NetUtils.getOutputStream(socket);  //获得输出流
         writeRpcHeader(outStream);•••this.in = new DataInputStream(new BufferedInputStream(new PingInputStream(inStream)));   //将输入流装饰成DataInputStreamthis.out = new DataOutputStream(new BufferedOutputStream(outStream));   //将输出流装饰成DataOutputStream
         writeHeader();// 跟新活动时间
         touch();//当连接建立时,启动接受线程等待服务端传回数据,注意:Connection继承了Tread
         start();return;}} catch (IOException e) {markClosed(e);close();}}

再有一步我们就知道客户端的连接是怎么建立的啦,下面贴出Client.Connection类中的setupConnection()方法:

 
private synchronized void setupConnection() throws IOException {short ioFailures = 0;short timeoutFailures = 0;while (true) {try {this.socket = socketFactory.createSocket(); //终于看到创建socket的方法了this.socket.setTcpNoDelay(tcpNoDelay);•••// 设置连接超时为20sNetUtils.connect(this.socket, remoteId.getAddress(), 20000);this.socket.setSoTimeout(pingInterval);return;} catch (SocketTimeoutException toe) {/* 设置最多连接重试为45次。* 总共有20s*45 = 15 分钟的重试时间。*/handleConnectionFailure(timeoutFailures++, 45, toe);} catch (IOException ie) {handleConnectionFailure(ioFailures++, maxRetries, ie);}}}

终于,我们知道了客户端的连接是怎样建立的了,其实就是创建一个普通的socket进行通信。

3.2 客户端是怎样给服务端发送数据的?

下面贴出Client.Connection类的sendParam()方法吧:

 
public void sendParam(Call call) {if (shouldCloseConnection.get()) {return;}DataOutputBuffer d=null;try {synchronized (this.out) {if (LOG.isDebugEnabled())LOG.debug(getName() + " sending #" + call.id);//创建一个缓冲区d = new DataOutputBuffer();d.writeInt(call.id);call.param.write(d);byte[] data = d.getData();int dataLength = d.getLength();out.writeInt(dataLength);        //首先写出数据的长度out.write(data, 0, dataLength); //向服务端写数据
          out.flush();}} catch(IOException e) {markClosed(e);} finally {IOUtils.closeStream(d);}}

3.3 客户端是怎样获取服务端的返回数据的?

下面贴出Client.Connection类和Client.Call类中的相关方法:

方法一: public void run() {•••while (waitForWork()) {receiveResponse();  //具体的处理方法
      }close();•••
}方法二:
private void receiveResponse() {if (shouldCloseConnection.get()) {return;}touch();try {int id = in.readInt();                    // 阻塞读取idif (LOG.isDebugEnabled())LOG.debug(getName() + " got value #" + id);Call call = calls.get(id);    //在calls池中找到发送时的那个对象int state = in.readInt();     // 阻塞读取call对象的状态if (state == Status.SUCCESS.state) {Writable value = ReflectionUtils.newInstance(valueClass, conf);value.readFields(in);           // 读取数据//将读取到的值赋给call对象,同时唤醒Client等待线程,贴出setValue()代码方法三
          call.setValue(value);             calls.remove(id);               //删除已处理的call   } else if (state == Status.ERROR.state) {•••} else if (state == Status.FATAL.state) {•••}} catch (IOException e) {markClosed(e);}
}方法三:
public synchronized void setValue(Writable value) {this.value = value;callComplete();   //具体实现
}
protected synchronized void callComplete() {this.done = true;notify();         // 唤醒client等待线程}

完成的功能主要是:启动一个处理线程,读取从服务端传来的call对象,将call对象读取完毕后,唤醒client处理线程。就这么简单,客户端就获取了服务端返回的数据了哦~。客户端的源码分析就到这里了哦,下面我们来分析Server端的源码吧。

3.4 ipc.Server源码分析

为了让大家对ipc.Server有个初步的了解,我们先分析一下它的几个内部类吧:

Call :用于存储客户端发来的请求
Listener : 监听类,用于监听客户端发来的请求,同时Listener内部还有一个静态类,Listener.Reader,当监听器监听到用户请求,便让Reader读取用户请求。
Responder :响应RPC请求类,请求处理完毕,由Responder发送给请求客户端。
Connection :连接类,真正的客户端请求读取逻辑在这个类中。
Handler :请求处理类,会循环阻塞读取callQueue中的call对象,并对其进行操作。

 
private void initialize(Configuration conf) throws IOException {•••// 创建 rpc serverInetSocketAddress dnSocketAddr = getServiceRpcServerAddress(conf);if (dnSocketAddr != null) {int serviceHandlerCount =conf.getInt(DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);//获得serviceRpcServerthis.serviceRpcServer = RPC.getServer(this, dnSocketAddr.getHostName(),dnSocketAddr.getPort(), serviceHandlerCount,false, conf, namesystem.getDelegationTokenSecretManager());this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();setRpcServiceServerAddress(conf);
}
//获得serverthis.server = RPC.getServer(this, socAddr.getHostName(),socAddr.getPort(), handlerCount, false, conf, namesystem.getDelegationTokenSecretManager());•••this.server.start();  //启动 RPC server   Clients只允许连接该serverif (serviceRpcServer != null) {serviceRpcServer.start();  //启动 RPC serviceRpcServer 为HDFS服务的server
    }startTrashEmptier(conf);}

查看Namenode初始化源码得知:RPC的server对象是通过ipc.RPC类的getServer()方法获得的。下面咱们去看看ipc.RPC类中的getServer()源码吧:

 
public static Server getServer(final Object instance, final String bindAddress, final int port,final int numHandlers,final boolean verbose, Configuration conf,SecretManager<? extends TokenIdentifier> secretManager) throws IOException {return new Server(instance, conf, bindAddress, port, numHandlers, verbose, secretManager);}

这时我们发现getServer()是一个创建Server对象的工厂方法,但创建的却是RPC.Server类的对象。哈哈,现在你明白了我前面说的“RPC.Server是ipc.Server的实现类”了吧。不过RPC.Server的构造函数还是调用了ipc.Server类的构造函数的,因篇幅所限,就不贴出相关源码了。

初始化Server后,Server端就运行起来了,看看ipc.Server的start()源码吧:

 
/** 启动服务 */public synchronized void start() {responder.start();  //启动responderlistener.start();   //启动listenerhandlers = new Handler[handlerCount];for (int i = 0; i < handlerCount; i++) {handlers[i] = new Handler(i);handlers[i].start();   //逐个启动Handler
   }}

分析过ipc.Client源码后,我们知道Client端的底层通信直接采用了阻塞式IO编程,当时我们曾做出猜测:Server端是不是也采用了阻塞式IO。现在我们仔细地分析一下吧,如果Server端也采用阻塞式IO,当连接进来的Client端很多时,势必会影响Server端的性能。hadoop的实现者们考虑到了这点,所以他们采用了java  NIO来实现Server端,那Server端采用java NIO是怎么建立连接的呢?分析源码得知,Server端采用Listener监听客户端的连接,下面先分析一下Listener的构造函数吧:

 
public Listener() throws IOException {address = new InetSocketAddress(bindAddress, port);// 创建ServerSocketChannel,并设置成非阻塞式acceptChannel = ServerSocketChannel.open();acceptChannel.configureBlocking(false);// 将server socket绑定到本地端口
     bind(acceptChannel.socket(), address, backlogLength);port = acceptChannel.socket().getLocalPort();// 获得一个selectorselector= Selector.open();readers = new Reader[readThreads];readPool = Executors.newFixedThreadPool(readThreads);//启动多个reader线程,为了防止请求多时服务端响应延时的问题for (int i = 0; i < readThreads; i++) {      Selector readSelector = Selector.open();Reader reader = new Reader(readSelector);readers[i] = reader;readPool.execute(reader);}// 注册连接事件
     acceptChannel.register(selector, SelectionKey.OP_ACCEPT);this.setName("IPC Server listener on " + port);this.setDaemon(true);}

在启动Listener线程时,服务端会一直等待客户端的连接,下面贴出Server.Listener类的run()方法:

 
public void run() {•••while (running) {SelectionKey key = null;try {selector.select();Iterator<SelectionKey> iter = selector.selectedKeys().iterator();while (iter.hasNext()) {key = iter.next();iter.remove();try {if (key.isValid()) {if (key.isAcceptable())doAccept(key);     //具体的连接方法
             }} catch (IOException e) {}key = null;}} catch (OutOfMemoryError e) {•••        }

下面贴出Server.Listener类中doAccept()方法中的关键源码吧:

 
void doAccept(SelectionKey key) throws IOException,  OutOfMemoryError {Connection c = null;ServerSocketChannel server = (ServerSocketChannel) key.channel();SocketChannel channel;while ((channel = server.accept()) != null) { //建立连接channel.configureBlocking(false);channel.socket().setTcpNoDelay(tcpNoDelay);Reader reader = getReader();  //从readers池中获得一个readertry {reader.startAdd(); // 激活readSelector,设置adding为trueSelectionKey readKey = reader.registerChannel(channel);//将读事件设置成兴趣事件c = new Connection(readKey, channel, System.currentTimeMillis());//创建一个连接对象readKey.attach(c);   //将connection对象注入readKeysynchronized (connectionList) {connectionList.add(numConnections, c);numConnections++;}•••} finally {
//设置adding为false,采用notify()唤醒一个reader,其实代码十三中启动的每个reader都使
//用了wait()方法等待。因篇幅有限,就不贴出源码了。
          reader.finishAdd();}}}

当reader被唤醒,reader接着执行doRead()方法。

下面贴出Server.Listener.Reader类中的doRead()方法和Server.Connection类中的readAndProcess()方法源码:

 
法一:  void doRead(SelectionKey key) throws InterruptedException {int count = 0;Connection c = (Connection)key.attachment();  //获得connection对象if (c == null) {return; }c.setLastContact(System.currentTimeMillis());try {count = c.readAndProcess();    // 接受并处理请求 } catch (InterruptedException ieo) {•••}•••   
}方法二:
public int readAndProcess() throws IOException, InterruptedException {while (true) {•••if (!rpcHeaderRead) {if (rpcHeaderBuffer == null) {rpcHeaderBuffer = ByteBuffer.allocate(2);}//读取请求头count = channelRead(channel, rpcHeaderBuffer);if (count < 0 || rpcHeaderBuffer.remaining() > 0) {return count;}// 读取请求版本号 int version = rpcHeaderBuffer.get(0);byte[] method = new byte[] {rpcHeaderBuffer.get(1)};••• data = ByteBuffer.allocate(dataLength);}// 读取请求 count = channelRead(channel, data);if (data.remaining() == 0) {•••if (useSasl) {•••} else {processOneRpc(data.array());//处理请求
          }•••}}return count;}}

下面贴出Server.Connection类中的processOneRpc()方法和processData()方法的源码。

 
方法一:  private void processOneRpc(byte[] buf) throws IOException,InterruptedException {if (headerRead) {processData(buf);} else {processHeader(buf);headerRead = true;if (!authorizeConnection()) {throw new AccessControlException("Connection from " + this+ " for protocol " + header.getProtocol()+ " is unauthorized for user " + user);}}
}
方法二:private void processData(byte[] buf) throws  IOException, InterruptedException {DataInputStream dis =new DataInputStream(new ByteArrayInputStream(buf));int id = dis.readInt();      // 尝试读取idWritable param = ReflectionUtils.newInstance(paramClass, conf);//读取参数
      param.readFields(dis);       Call call = new Call(id, param, this);  //封装成callcallQueue.put(call);   // 将call存入callQueueincRpcCount();  // 增加rpc请求的计数}

4. RPC与web service

RPC:

Web service

web service接口就是RPC中的stub组件,规定了server能够提供的服务(web service),这在server和client上是一致的,但是也是跨语言跨平台的。同时,由于web service规范中的WSDL文件的存在,现在各平台的web service框架,都可以基于WSDL文件,自动生成web service接口 。

其实两者差不多,只是传输的协议不同。

参考:你应该知道的RPC原理

参考:RPC简介,及与web service的对比

相关文章:

hdu 4608 I-number

http://acm.hdu.edu.cn/showproblem.php?pid4608 直接暴力 代码&#xff1a; #include<iostream> #include<cstdio> #include<string> #include<cstring> #include<cmath> #include<set> #include<map> #include<stack> #inc…

php tab标签,JavaScript代码分享:tab标签的切换

本文实例讲述了js实现点击切换TAB标签。分享给大家供大家参考。具体如下&#xff1a;这里演示的选项卡效果代码&#xff0c;无jq,纯JS来实现&#xff0c;灰色风格&#xff0c;没有怎么美化&#xff0c;或许看上去比较普通&#xff0c;不过兼容性和操作起来挺舒服的&#xff0c;…

二进制,十进制,十六进制

生活中其实很多地方的计数方法都多少有点不同进制的影子。 比如我们最常用的10进制&#xff0c;其实起源于人有10个指头。如果我们的祖先始终没有摆脱手脚不分的境况&#xff0c;我想我们现在一定是在使用20进制。 至于二进制……没有袜子称为0只袜子&#xff0c;有一只袜子称为…

D3.js系列——初步使用、选择元素与绑定数据

D3 的全称是&#xff08;Data-Driven Documents&#xff09;&#xff0c;顾名思义可以知道是一个被数据驱动的文档。听名字有点抽象&#xff0c;说简单一点&#xff0c;其实就是一个 JavaScript 的函数库&#xff0c;使用它主要是用来做数据可视化的。 D3 提供了各种简单易用的…

秦州:西瓜书 + 南瓜书 吃瓜系列 12. 聚类

Datawhale南瓜书是经典机器学习教材《机器学习》&#xff08;西瓜书&#xff09;的公式推导解析指南&#xff0c;旨在让在学习西瓜书的过程中&#xff0c;再也没有难推的公式&#xff0c;学好机器学习。 航路开辟者&#xff1a;谢文睿、秦州开源内容&#xff1a;https://githu…

php 5/0,PHP 5.5.0 released.该怎么解决

当前位置:我的异常网 PHP PHP 5.5.0 released.该怎么解决PHP 5.5.0 released.该怎么解决www.myexceptions.net 网友分享于&#xff1a;2013-08-02 浏览&#xff1a;12次PHP 5.5.0 released.The PHP development team is proud to announce the immediate availability of PH…

Windows下SVN权限配置过程详解

本节讲解一下Windows下SVN权限配置说明&#xff0c;针对的是一个目录下多库的情况&#xff0c;下面是具体的介绍&#xff0c;希望通过本文的学习&#xff0c;你能够对SVN权限配置问题有更加深刻的认识。 1、本文档适用于对Subvesion的自带服务svnserve进行权限配置&#xff0c;…

胡小明:大数据应用方向思考

一、警惕大数据过热 1.1 过热产生盲目性 国内大数据的宣传早已过热&#xff0c;很多区县级政府也在考虑成立大数据局&#xff0c;政府对大数据热几乎没有抵抗力&#xff0c;企业没有紧跟就对了&#xff0c;在大数据高潮中反省政府的大数据行为、冷静一下头脑是有益的&#xff0…

Datawhale组队学习周报(第040周)

本周报总结了从 11月15日至11月21日&#xff0c;Datawhale组队学习的运行情况&#xff0c;我们一直秉承“与学习者一起成长的理念”&#xff0c;希望这个活动能够让更多的学习者受益。 第 31 期组队学习已经与大家见面了&#xff0c;这次组队学习一共 11 门开源课程&#xff0…

matlab 无序数对,MATLAB中sort函数对矩阵数进行排序

(1)Bsort(A) 对一维或二维数组进行升序排序,并返回排序后的数组,当A为二维时,对数组每一列进行排序.eg: A[1,5,3],则sort(A)[1,3,5]A[1,5,3;2,4,1],则sort(A)[1,4,1;2,5,3](2)Bsort(A,dim),对数组按指定方向进行升序排序,dim 1,表示对每一列进行排序,,dim2表示对每一行进行排序…

云智易获上海CIO联盟“年度物联网云平台技术创新奖”

6月23日&#xff0c;云智易作为国内领先物联网云平台&#xff0c;受邀出席“跨界融合 聚势谋远”上海CIO联盟峰会。本次大会汇聚了全球各地各行业300位精英翘楚&#xff0c;共话未来发展趋势。 在本次大会中&#xff0c;云智易物联云平台凭借强大的技术实力、成熟的平台架构、…

d3d导致cairo不正常

最近要把cairo集成到项目中,却发现cairo不能工作了 折腾了两天才找到了原因:cairo的一个trick导致浮点数计算错误: http://blog.163.com/lvan100yeah/blog/static/6811721420131191434556/ 给d3dcreate加上D3DCREATE_FPU_PRESERVE之后一切正常 如果我直接调cairo的代码就能早点…

青少年编程竞赛交流群周报(第038周)

2021年11月21日&#xff08;周日&#xff09;晚20:00我们在青少年编程竞赛交流群开展了第三十八期直播活动。 一、直播内容 我们直播活动的主要内容如下&#xff1a; 讲解了上次测试中小朋友们做错的题目 Scratch青少年编程能力等级测试模拟题&#xff08;四级&#xff09;。…

php的遍历方法,PHP数组遍历方法总结

在PHP中数组分为两类&#xff1a; 数字索引数组和关联数组。其中数字索引数组和C语言中的数组一样&#xff0c;下标是为0&#xff0c;1&#xff0c;2…而关联数组下标可能是任意类型&#xff0c;与其它语言中的hash&#xff0c;map等结构相似。下面介绍PHP中遍历关联数组的三种…

二、Silverlight中使用MVVM(二)——提高

在第一篇文章中的示例中&#xff0c;我们已经简单的了解了应用MVVM模式的流程&#xff0c;我的本意是你已经了解了一点MVVM的概念&#xff0c;然后又没有一个较好的例子学习&#xff0c;可以跟着我一起学习MVVM模式&#xff0c;所以这个部分&#xff0c;都是没有理论知识的&…

中国电子学会青少年编程能力等级测试图形化四级编程题:太空大战

「青少年编程竞赛交流群」已成立&#xff08;适合6至18周岁的青少年&#xff09;&#xff0c;公众号后台回复【Scratch】或【Python】&#xff0c;即可进入。如果加入了之前的社群不需要重复加入。 我们将有关编程题目的教学视频已经发布到抖音号21252972100&#xff0c;小马老…

Pycharm开发环境设置与熟悉。

练习基本输入输出&#xff1a; print(你好,{}..format(name)) print(sys.argv) 库的使用方法&#xff1a; import ... from ... import ... 条件语句&#xff1a; if (abs(pos()))<1: break 循环语句&#xff1a; for i in range(5): while True: 函数定义&#xff1a; def …

oracle 默认表空间 10g,Oracle10g 表空间管理

1.表空间分类&#xff1a;(1)临时表空间&#xff1a;存放临时数据(2)数据表空间&#xff1a;又称为用户表空间&#xff0c;用来存放永久数据(不删除永久存在)(3)日志表空间&#xff1a;用来存放日志信息一般情况下&#xff0c;用户使用时只需要创建数据表空间即可&#xff0c;根…

POJ1022 Packing Unit 4D Cubes

题目来源&#xff1a;http://poj.org/problem?id1022 题目大意&#xff1a; 有一些4维的单位体积的立方体盒子&#xff0c;每个立方体有8个面。要用一个大的4为盒子将它们包起来&#xff0c;求最小的大盒子体积。 输入&#xff1a;第一行为测试用例数。每个用例的第一行为单位…

中国电子学会青少年编程能力等级测试图形化三级编程题:海底寻宝

「青少年编程竞赛交流群」已成立&#xff08;适合6至18周岁的青少年&#xff09;&#xff0c;公众号后台回复【Scratch】或【Python】&#xff0c;即可进入。如果加入了之前的社群不需要重复加入。 我们将有关编程题目的教学视频已经发布到抖音号21252972100&#xff0c;小马老…

用Ajax请求服务器的图片,并显示在浏览器中(转)

前言 一直在数据库里面存的都是图片在服务器的地址&#xff0c;然后再到浏览器中显示&#xff0c;但是发现两个问题 第一&#xff1a;为了安全起见&#xff0c;js是无法读取本地的图片的&#xff0c;不然你写一个js&#xff0c;岂不是可以获取任何人电脑里面的文件了。 第二&am…

pb设置Oracle事务的隔离级别,Oracle的事务隔离级别

ANSI/ISO SQL规定了四种事务隔离级别&#xff0c;分别是&#xff1a;read uncommitted,read committed,repeatable read,serializableORACE提供了SQ92标准中的read committed和seriaizabe&#xff0c;同时提供了非SQ92标准的read-ony。read committed&#xff1a;这是ORACE缺省…

inux php pdo mysql 扩展

今天在本机部署了一个pdo项目&#xff0c;发现一些问题&#xff0c;真没想到pdo mysql&#xff0c;不容易装啊&#xff0c;哈哈&#xff0c;我说的不容易&#xff0c;是因为php5.3以前版本&#xff0c;yum源里面根本没有。部署后就报&#xff0c;Undefined class constant MYSQ…

Maven项目Spring Boot启动

1. pom.xml中增加配置 <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.0.0.RELEASE</version></parent><dependencies><dependency><gr…

中国电子学会青少年编程能力等级测试图形化四级模拟题

「青少年编程竞赛交流群」已成立&#xff08;适合6至18周岁的青少年&#xff09;&#xff0c;公众号后台回复【Scratch】或【Python】&#xff0c;即可进入。如果加入了之前的社群不需要重复加入。 我们将有关编程题目的教学视频已经发布到抖音号21252972100&#xff0c;小马老…

Oracle设置date数据比较,ORACLE DATE和TIMESTAMP数据类型的比较

DATE数据类型这个数据类型我们实在是太熟悉了&#xff0c;当我们需要表示日期和时间的话都会想到date类型。它可以存储月&#xff0c;年&#xff0c;日&#xff0c;世纪&#xff0c;时&#xff0c;分和秒。它典型地用来表示什 么时候事情已经发生或将要发生。DATE数据类型的问题…

POJ 1552 Doubles (C++ STL set使用)

题目&#xff1a; 题意&#xff1a;题意&#xff1a;给出几个正数&#xff08;2~15个&#xff09;&#xff0c;然后就是求有这些数字的2倍有没有和原先的正数相同的&#xff0c;求出有几个&#xff0c;没有就是0. 分析&#xff1a;水题。用数组解决&#xff0c;开一个数组存正数…

凌亮:动手学数据分析笔记

凌亮是华北电力大学数理系大二的学生&#xff0c;LSGO软件技术团队&#xff08;Dreamtech算法组&#xff09;成员&#xff0c;参加了多期Datawhale的组队学习。 这篇图文是他在线下组队学习时&#xff0c;为大家分享自己学习“动手学数据分析”的笔记。 希望参与我们线下组队…

【H.264/AVC视频编解码技术详解】十九:熵编码算法(5)——H.264的CABAC(上):语法元素的二值化方法...

《H.264/AVC视频编解码技术详解》视频教程已经在“CSDN学院”上线&#xff0c;视频中详述了H.264的背景、标准协议和实现&#xff0c;并通过一个实战工程的形式对H.264的标准进行解析和实现&#xff0c;欢迎观看&#xff01; “纸上得来终觉浅&#xff0c;绝知此事要躬行”&…

oracle anbob,Tag Archives: oracle安装 | ANBOB

2016/08/02363 viewsUsing ‘opatch lsinventory’ show patched is real? (看到的补丁信息真的靠谱么&#xff1f;)已关闭评论去年在排查SCN 天花板问题修改相关的几个参数时&#xff0c;发现了这个问题&#xff0c;尤其如果是从别人手中接手的数据库&#xff0c;通常从opatc…