Hybrid sync & async Python request/response protocol client implementations

Arakoon, our in-house developed key-value store, is one of our flagship projects. Since a server isn’t of much use if no clients can talk to it, we also developed a couple of client libraries, including an OCaml, C, PHP and Python client.

Next to the Python client maintained inside the main Arakoon repository, an alternative client was developed as well (source, source). One of the goals of this alternative client was supporting the Twisted asynchronous networking framework.

In this post, I’ll present the approach taken to achieve this. It maintains a clear separation between the protocol (the bytes going over the wire) and the transport (the wire itself). Both synchronous as well as asynchronous transports can be implemented, and new request/response commands can be added easily, at a single place in the source code.

Throughout this post, we’ll write a client for this server, which implements a protocol similar to the Arakoon protocol:

import socket
import struct
import threading

HOST = 'localhost'
PORT = 8080

COMMAND_STRUCT = struct.Struct('<I')

SUCCESS_CODE = struct.pack('<I', 0)
ERROR_CODE = struct.pack('<I', 1)
ERROR_MESSAGE = 'Invalid request'
ERROR_MESSAGE_DATA = struct.pack('<I%ds' % len(ERROR_MESSAGE),
    len(ERROR_MESSAGE), ERROR_MESSAGE)

LEN_STRUCT = struct.Struct('<I')

def handle(conn):
    while True:
        command_data = ''
        while len(command_data) < COMMAND_STRUCT.size:
            data = conn.recv(COMMAND_STRUCT.size - len(command_data))

            if not data:
                return

            command_data += data

        command, = COMMAND_STRUCT.unpack(command_data)

        if command == 1:
            len_data = ''

            while len(len_data) < LEN_STRUCT.size:
                data = conn.recv(LEN_STRUCT.size - len(len_data))
                len_data += data

            len_, = LEN_STRUCT.unpack(len_data)

            data = ''
            while len(data) < len_:
                data += conn.recv(len_ - len(data))

            conn.send(SUCCESS_CODE)
            conn.send(struct.pack('<L%ds' % len(data), len(data), data[::-1]))
        else:
            conn.send(ERROR_CODE)
            conn.send(ERROR_MESSAGE_DATA)

def main():
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind((HOST, PORT))
    sock.listen(1)

    while True:
        conn, addr = sock.accept()
        print 'Connect: %r' % (addr, )

        threading.Thread(target=lambda: handle(conn)).start()

if __name__ == '__main__':
    main()

Every message sent by a client starts with a 32bit integer, the command identifier. Currently only one command, ‘reverse’ with ID 1, is implemented. This takes a single string as argument. Strings are encoded in two parts: first, a 32bit integer containing the length of the string, followed by the actual string data as characters.

When the server receives a command, it sends a 32bit integer denoting success (0x00) or failure (0x01). The ‘reverse’ command simply returns a the input string, reversed, using the same string encoding as described before. Once this cycle is complete, a new command can be sent by a client.

Now, on to the client side. We’ll need some imports:

import socket
import struct
import logging
import functools
import collections

from twisted.python import log
from twisted.internet import defer, protocol, reactor
from twisted.protocols import basic, stateful

import utils

The ‘utils’ module contains some helpers, and is contained in the Pyrakoon repository.

We need a way to communicate between the protocol layer and the transport layer. Only the protocol side knows how much data is expected, and only the transport side can provide these bytes, even though it might not have the required amount of data available immediately, and want to yield execution (in case of asynchronous networking). To make development within these constraints easier, coroutines are used throughout the system to encapsulate intermediate state whenever possible, whilst maintaining a simple API.

Here’s how the API works: a single protocol action (e.g. reading the response of a request) is backed by a coroutine, which yields one or more ‘Request’ objects, which encapsulate the number of bytes that should be provided to the coroutine for the protocol to be able to construct a value, or a ‘Result’ object which encapsulate the final value. Whenever the upper layer receives a ‘Request’ object, it should read the requested number of bytes from the transport, then ‘send’ the data into the coroutine, which will yield another ‘Request’, or finally  a ‘Response’.

The definitions are very simple:

class Request(object):
    def __init__(self, count):
        self.count = count

class Result(object):
    def __init__(self, value):
        self.value = value

Next, the protocol uses a couple of different types of values: 32bit (unsigned) integers, and strings. The latter uses the first in its internal encoding.

Every type has 3 methods: ‘check’, ‘serialize’ and ‘receive’.

‘check’ performs input validation for values of the given type (type check, boundary check,…). ‘serialize’ is a generator which yields the encoded data for a given value. ‘receive’ is a coroutine which yield ‘Request’ and ‘Result’ objects to receive a value of the type.

For basic types (e.g. integers) packed values using the ‘struct’ module can be used, so the base implementations provides the required functionality for this. Here’s the base type, and implementations for both 32bit unsigned integers and strings:

class Type(object):
    PACKER = None

    def check(self, value):
        raise NotImplementedError

    def serialize(self, value):
        if not self.PACKER:
            raise NotImplementedError

        yield self.PACKER.pack(value)

    def receive(self):
        if not self.PACKER:
            raise NotImplementedError

        data = yield Request(self.PACKER.size)
        result, = self.PACKER.unpack(data)

        yield Result(result)

class UnsignedInt32(Type):
    PACKER = struct.Struct('<I')
    MAX_INT = (2 ** 32) - 1

    def check(self, value):
        if not isinstance(value, (int, long)):
            raise TypeError

        if value < 0:
            raise ValueError('Unsigned integer expected')

        if value > self.MAX_INT:
            raise ValueError('Integer overflow')

UNSIGNED_INT32 = UnsignedInt32()

class String(Type):
    def check(self, value):
        if not isinstance(value, str):
            raise TypeError

    def serialize(self, value):
        length = len(value)

        for bytes_ in UNSIGNED_INT32.serialize(length):
            yield bytes_

        yield struct.pack('<%ds' % length, value)

    def receive(self):
        length_receiver = UNSIGNED_INT32.receive()
        request = length_receiver.next()

        while isinstance(request, Request):
            value = yield request
            request = length_receiver.send(value)

        if not isinstance(request, Result):
            raise TypeError

        length = request.value

        if length == 0:
            result = ''
        else:
            data = yield Request(length)
            result, = struct.unpack('<%ds' % length, data)

        yield Result(result)

STRING = String()

Now the basic types are defined, we can describe the request/response messages transferred between client and server. Every message has a tag (its identifier), zero or more arguments, and a return type. Messages can be serialized and received similar to the corresponding methods on ‘Type’. Most, if not all necessary plumbing can be hidden inside the ‘Message’ class, so command-specific classes can be very short and simple. This makes it easy to add new protocol commands to the client as well!

Here’s the ‘Message’ definition, as well as the implementation of our ‘Reverse’ command. Note how simple the definition of the latter is.

class Message(object):
    TAG = None
    ARGS = None
    RETURN_TYPE = None

    def serialize(self):
        for bytes_ in UNSIGNED_INT32.serialize(self.TAG):
            yield bytes_

        for arg in self.ARGS:
            name, type_ = arg

            for bytes_ in type_.serialize(getattr(self, name)):
                yield bytes_

    def receive(self):
        code_receiver = UNSIGNED_INT32.receive()
        request = code_receiver.next()

        while isinstance(request, Request):
            value = yield request
            request = code_receiver.send(value)

        if not isinstance(request, Result):
            yield TypeError

        code = request.value

        if code == 0x00:
            result_receiver = self.RETURN_TYPE.receive()
        else:
            result_receiver = STRING.receive()

        request = result_receiver.next()

        while isinstance(request, Request):
            value = yield request
            request = result_receiver.send(value)

        if not isinstance(request, Result):
            raise TypeError

        result = request.value

        if code == 0x00:
            yield Result(result)
        else:
            raise Exception('Error %d: %s' % (code, result))

class Reverse(Message):
    TAG = 0x01
    ARGS = ('text', STRING),
    RETURN_TYPE = STRING

    def __init__(self, text):
        super(Reverse, self).__init__()

        self.text = text

Next up, we’ll write the base class for all actual client implementations. Some dynamic method construction is used on the go, based on the following 2 utility functions:

def validate_types(specs, args):
    for spec, arg in zip(specs, args):
        name, type_ = spec[:2]

        try:
            type_.check(arg)
        except TypeError:
            raise TypeError('Invalid type of argument "%s"' % name)
        except ValueError:
            raise ValueError('Invalid value of argument "%s"' % name)

def call(message_type):
    def wrapper(fun):
        argspec = ['self']
        for arg in message_type.ARGS:
            argspec.append(arg[0])

        @utils.update_argspec(*argspec)
        @functools.wraps(fun)
        def wrapped(**kwargs):
            self = kwargs['self']

            if not self.connected:
                raise RuntimeError('Not connected')

            args = tuple(kwargs[arg[0]] for arg in message_type.ARGS)
            validate_types(message_type.ARGS, args)

            message = message_type(*args)

            return self._process(message)

        return wrapped

    return wrapper

The ‘Client’ base class becomes extremely simple. Whenever a new command is added to the protocol, adding it to this class (as done for the ‘reverse’ call) is obvious.

class Client(object):
    connected = False

    @call(Reverse)
    def reverse(self):
        assert False

    def _process(self, message):
        raise NotImplementedError

That’s about all there is. What’s left is transport-specific client implementations.

Starting with a synchronous socket client is the easiest. All we need is a ‘connect’ method to set up a socket, and implement ‘Client._process’ to handle interaction between the protocol and the transport. The implementation is pretty straight-forward:

class SyncClient(Client):
    def __init__(self):
        self._socket = None

    def connect(self, addr, port):
        self._socket = socket.create_connection((addr, port))

    @property
    def connected(self):
        return self._socket is not None

    def _process(self, message):
        try:
            for part in message.serialize():
                self._socket.sendall(part)

            receiver = message.receive()
            request = receiver.next()

            while isinstance(request, Request):
                data = ''

                while len(data) < request.count:
                    d = self._socket.recv(request.count - len(data))
                    if not d:
                        raise Exception

                    data += d

                request = receiver.send(data)

            if not isinstance(request, Result):
                raise TypeError

            utils.kill_coroutine(receiver, logging.exception)

            return request.value
        except Exception:
            try:
                self._socket.close()
            finally:
                self._socket = None

            raise

The Twisted protocol is somewhat more complex, and won’t be covered in detail in this post. If you ever wrote a Twisted protocol yourself, it should be easy to follow though. The implementation piggy-backs on ‘twisted.protocol.stateful.StatefulProtocol’, which simplifies a lot.

class TwistedProtocol(Client, stateful.StatefulProtocol, basic._PauseableMixin):
    _INITIAL_REQUEST_SIZE = UNSIGNED_INT32.PACKER.size

    def __init__(self):
        Client.__init__(self)

        self._handlers = collections.deque()
        self._currentHandler = None

        self._connected = False
        self._deferredLock = defer.DeferredLock()

    def _process(self, message):
        deferred = defer.Deferred()
        self._handlers.append((message.receive(), deferred))

        def process(_):
            try:
                for data in message.serialize():
                    self.transport.write(data)
            finally:
                self._deferredLock.release()

        self._deferredLock.acquire().addCallback(process)

        return deferred

    def getInitialState(self):
        self._currentHandler = None

        return self._responseCodeReceived, self._INITIAL_REQUEST_SIZE

    def _responseCodeReceived(self, data):
        self._currentHandler = None

        try:
            self._currentHandler = handler = self._handlers.pop()
        except IndexError:
            log.msg('Request data received but no handler registered')
            self.transport.loseConnection()

            return None

        request = handler[0].next()

        if isinstance(request, Result):
            return self._handleResult(request)
        elif isinstance(request, Request):
            if request.count != self._INITIAL_REQUEST_SIZE:
                handler[1].errback(ValueError('Unexpected request count'))
                self.transport.loseConnection()

                return None

            return self._handleRequest(data)
        else:
            log.err(TypeError,
                'Received unknown type from message parsing coroutine')
            handler[1].errback(TypeError)

            self.transport.loseConnection()

            return None

    def _handleRequest(self, data):
        if not self._currentHandler:
            log.msg('Request data received but no handler registered')
            self.transport.loseConnection()

            return None

        receiver, deferred = self._currentHandler

        try:
            request = receiver.send(data)
        except Exception, exc: #pylint: disable-msg=W0703
            log.err(exc, 'Exception raised by message receive loop')
            deferred.errback(exc)

            return self.getInitialState()

        if isinstance(request, Result):
            return self._handleResult(request)
        elif isinstance(request, Request):
            return self._handleRequest, request.count
        else:
            log.err(TypeError,
                'Received unknown type from message parsing coroutine')
            deferred.errback(TypeError)

            self.transport.loseConnection()

            return None

    def _handleResult(self, result):
        receiver, deferred = self._currentHandler
        self._currentHandler = None

        # To be on the safe side...
        utils.kill_coroutine(receiver, lambda msg: log.err(None, msg))

        deferred.callback(result.value)

        return self.getInitialState()

    def connectionLost(self, reason=protocol.connectionDone):
        self._connected = False

        self._cancelHandlers(reason)

        return stateful.StatefulProtocol.connectionLost(self, reason)

    def _cancelHandlers(self, reason):
        while self._handlers:
            receiver, deferred = self._handlers.popleft()

            utils.kill_coroutine(receiver, lambda msg: log.err(None, msg))

            deferred.errback(reason)

 

That’s it! Finally, we can test our clients against the server:

HOST = 'localhost'
PORT = 8080

def test_sync():
    client = SyncClient()
    client.connect(HOST, PORT)
    r = client.reverse('sync')
    print 'sync =>', r
    print r, '=>', client.reverse(r)

def test_twisted():
    def create_client(host, port):
        client = protocol.ClientCreator(reactor, TwistedProtocol)
        return client.connectTCP(host, port)

    @defer.inlineCallbacks
    def run(proto):
        result = yield proto.reverse('twisted')
        print 'twisted =>', result
        result2 = yield proto.reverse(result)
        print result2, '=>', result
        proto.transport.loseConnection()

    deferred = create_client(HOST, PORT)
    deferred.addCallback(run)
    deferred.addBoth(lambda _: reactor.stop())

    reactor.run()

if __name__ == '__main__':
    test_sync()
    test_twisted()

 

If for example an ‘add’ method is added to the server, which returns the sum of two given 32bit unsigned integers, we could define a new command like this:

class Add(Message):
    TAG = 0x02
    ARGS = ('a', UNSIGNED_INT32), ('b', UNSIGNED_INT32),
    RETURN_TYPE = UNSIGNED_INT32

    def __init__(self, a, b):
        super(Add, self).__init__()

        self.a = a
        self.b = b

Next, add it to the ‘Client’ class like this:

@call(Add)
def add(self):
    assert False

Once this is done, the ‘add(self, a, b)’ method will be available on all clients and work as expected!

 

This is just a basic example. The Arakoon protocol contains more complex types as well, including ‘option’ types and lists. See the Pyrakoon source-code to see how this is handled. Only a type definition should be added, multiple commands can use them as-is easily.

Using the approach described in this post, it becomes easy to provide client implementations using several different backends (blocking, non-blocking, sockets or anything else as transport,…), and simplify adding new commands/calls to all clients at once (keeping them in sync). This simplifies client maintenance a lot.

About these ads

One Comment on “Hybrid sync & async Python request/response protocol client implementations”


Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

Follow

Get every new post delivered to your Inbox.