RPC(一):基于python实现简单的RPC框架(上)

September 7, 2016 at 10:36 pm

似乎又要放着以前无数个坑不填来挖一个新的坑了,不过这次并不打算挖很深的坑。其实挺早就有对现有的RPC框架有一个简单的理解的想法,所以本文的初步想法是模仿tinyRPC来简单模拟一下RPC框架,这也是本篇的主要内容。如果后面有续篇的话,会花些时间看看像gRPC这样的库的实现机制,不过目前暂时也没有这方面的打算,所以目前而言本系列应该仅此一篇。如果一切顺利的话,之前的坑会在回到学校之后慢慢开始填。废话不多说了,先进入正题吧。

本文主要包含以下内容:
1. 客户端的类本地化调用
2. 序列化协议的设计
3. 通信协议的设计

客户端的类本地化调用

RPC框架中最基本的两个要素是客户端和服务器端,客户端是远程过程的调用者,而服务器端则是过程的实现者,所谓的RPC(Remote Procedure call),就是希望客户端可以像调用本地函数一样调用服务器端的函数。为了做成一个框架,我们需要让客户端可以实现任意函数的调用,由于本文使用python实现,这里利用python的__getattr__(self, name)函数来较为简单、干净地实现这个功能(在python中,如果访问一个类的属性,包括方法不存在时,会调用这个类的__getattr__()来试图返回这个属性或者这个方法的值,对于方法而言,此时可以返回一个函数,这个新的函数将会被调用。这个函数名字有一定的困扰,但在这里使用它可以极大简化代码)。
通过这个函数,我们可以将所有的远程过程调用统一到client的call方法里去:

class RPCClient(object):
    def __getattr__(self, name):
        func = lambda *args, **kwargs: self.call(name, args, kwargs)
        return func
    def call(self, method, args, kwargs):
        print 'function ' + method + ' called with argument ' + str(args)

if __name__ == '__main__':
    client = RPCClient()
    client.reverse_string('hello, world!')
    client.strcpy('hello, ', 'RPC!')

为了使代码更容易理解,上面增加了一些简单的调试用代码。从实际运行结果可以看到,无论client调用什么名字的函数,都最终汇聚到call方法中,call方法获取了函数名、函数的参数。

序列化协议的设计

所谓的RPC框架,就是要让客户端觉得调用一个远程的函数和调用一个本地函数差不多,所以上一小节我们设计了一个简单的客户端调用模式。但为了完成它实质的功能,我们必须要访问远程服务器,这个过程对客户端来说应该是透明的,我们把它设计在call函数中,也就是说我们需要修改第6行的代码。在通用的RPC框架模型中,客户端承担实际序列化、通信、反序列化的部分称为client stub(客户端的存根),本节开始讨论client stub的设计。
为了使我们要调用的函数和其参数通过网络进行传输,我们需要先将它们序列化,并在服务器端反序列化。通常来说,对请求进行序列化和反序列化的方法是RPC框架设计中较为核心的部分之一,它对最终框架的性能有很大的影响,这主要取决于序列化、反序列化的速度和结果的大小。大部分主流的RPC框架都有自己的序列化协议,也有像google的protobuf这样的较为通用的序列化协议(当然,gRPC使用该协议作为通信协议)。

由于我们只是做一个简单的模型,所以这里我们直接采用python标准库包含的json库来进行序列化和反序列化。我们定义两个新的类,RPCRequest类,用来封装我们的请求;RPCProtocol类,用来产生、解析请求。我们在call函数中调用RPCProtocol类对象来产生请求,并且我们将协议作为client的一个参数传入,这样我们的协议就具有了可扩展性。经过了一定的修改之后,我们的代码变成了这样:

class RPCRequest(object):
    def serialize(self):
        return json.dumps(self._to_dict())
    def _to_dict(self):
        data = { 'method': self.method }
        if self.args:
            data['params'] = self.args
        if self.kwargs:
            data['params'] = self.kawrgs
        return data

class RPCProtocol(object):
    def create_request(self, method, args=None, kwargs=None):
        request = RPCRequest()
        request.method = method
        request.args = args
        request.kwargs = kwargs
        print 'IN create request ' + request.method
        print 'IN create request ' + str(request.args)
        return request
    def parse_reply(self, data):
        rep = json.loads(data)
        return rep

class RPCClient(object):
    def __init__(self, protocol):
        self.protocol = protocol
    def __getattr__(self, name):
        func = lambda *args, **kwargs: self.call(name, args, kwargs)
        return func
    def call(self, method, args, kwargs):
        req = self.protocol.create_request(method, args, kwargs)
        return self._send_and_handle_reply(req)
        #[DEBUG] print 'function ' + method + ' called with argument ' + str(args)
    def _send_and_handle_reply(self, req):
        return None

if __name__ == '__main__':
    client = RPCClient(RPCProtocol())
    client.reverse_string('hello, world!')
    client.strcpy('hello, ', 'RPC!')

这样,我们只需要调用RPCRequest.serialize就可以将它序列化了。现在我们将重心移动到_send_and_handle_reply的设计上来,讨论如何将序列化了的信息发出去。

通信协议的设计

client stub的另外一部分功能就是进行和服务器端的通信。这里我们在设计上将通信协议和序列化协议分开,使得整个框架可以有更大的变化空间。关于通信协议的内容实际上已经超出了本文的讨论范围,或许以后有机会的话探讨一下通信协议吧。这里为了简单起见,我们使用一个简单的zmq客户端来发起通信(使用zmq的一个原因是原来的tinyRPC里也提供了这种通信方式。对它不了解也没有关系,可以简单理解为一个更强大的socket。除了需要初始化context以外,这里对它的使用和socket无异。如前所述,对于通信方式的讨论已经超出本文的范围):

import json
import zmq

class ClientTransport(object):
    def __init__(self, socket):
        self.socket = socket
    def send_message(self, message, expect_reply=True):
        self.socket.send(message)
        if expect_reply:
            self.socket.recv()
    @classmethod
    def create(cls, zmq_context, endpoint):
        socket = zmq_context.socket(zmq.REQ)
        socket.connect(endpoint)
        return cls(socket)

class RPCRequest(object):
    def serialize(self):
        return json.dumps(self._to_dict())
    def _to_dict(self):
        data = { 'method': self.method }
        if self.args:
            data['params'] = self.args
        if self.kwargs:
            data['params'] = self.kawrgs
        return data

class RPCProtocol(object):
    def create_request(self, method, args=None, kwargs=None):
        request = RPCRequest()
        request.method = method
        request.args = args
        request.kwargs = kwargs
        #[DEBUG] print 'IN create request ' + request.method
        #[DEBUG] print 'IN create request ' + str(request.args)
        return request
    def parse_reply(self, data):
        rep = json.loads(data)
        return rep

class RPCClient(object):
    def __init__(self, protocol, transport):
        self.protocol = protocol
    def __getattr__(self, name):
        func = lambda *args, **kwargs: self.call(name, args, kwargs)
        return func
    def call(self, method, args, kwargs):
        req = self.protocol.create_request(method, args, kwargs)
        return self._send_and_handle_reply(req)
        #[DEBUG] print 'function ' + method + ' called with argument ' + str(args)
    def _send_and_handle_reply(self, req):
        reply = self.transport.send_message(req.serialize())
        response = self.protocol.parse_reply(reply)
        return response

if __name__ == '__main__':
    ctx = zmq.Context()
    client = RPCClient(RPCProtocol(), ClientTransport.create(ctx, 'tcp://127.0.0.1:5001'))
    client.reverse_string('hello, world!')
    client.strcpy('hello, ', 'RPC!')

为了验证代码是否工作,我们需要接下来要补上服务器端的代码。由于时间的关系,本文只能先到这里了,不出意外的话明天会把剩下来的部分补充完整,合成一篇,在这之前,姑且称这个为上篇吧。