Hybrid sync & async Python request/response protocol client implementations
Posted: December 13, 2011 Filed under: Programming, Python | Tags: python 1 Comment »Arakoon, our in-house developed key-value store, is one of our flagship projects. Since a server isn’t of much use if no clients can talk to it, we also developed a couple of client libraries, including an OCaml, C, PHP and Python client.
Next to the Python client maintained inside the main Arakoon repository, an alternative client was developed as well (source, source). One of the goals of this alternative client was supporting the Twisted asynchronous networking framework.
In this post, I’ll present the approach taken to achieve this. It maintains a clear separation between the protocol (the bytes going over the wire) and the transport (the wire itself). Both synchronous as well as asynchronous transports can be implemented, and new request/response commands can be added easily, at a single place in the source code.
Throughout this post, we’ll write a client for this server, which implements a protocol similar to the Arakoon protocol:
import socket
import struct
import threading
HOST = 'localhost'
PORT = 8080
COMMAND_STRUCT = struct.Struct('<I')
SUCCESS_CODE = struct.pack('<I', 0)
ERROR_CODE = struct.pack('<I', 1)
ERROR_MESSAGE = 'Invalid request'
ERROR_MESSAGE_DATA = struct.pack('<I%ds' % len(ERROR_MESSAGE),
len(ERROR_MESSAGE), ERROR_MESSAGE)
LEN_STRUCT = struct.Struct('<I')
def handle(conn):
while True:
command_data = ''
while len(command_data) < COMMAND_STRUCT.size:
data = conn.recv(COMMAND_STRUCT.size - len(command_data))
if not data:
return
command_data += data
command, = COMMAND_STRUCT.unpack(command_data)
if command == 1:
len_data = ''
while len(len_data) < LEN_STRUCT.size:
data = conn.recv(LEN_STRUCT.size - len(len_data))
len_data += data
len_, = LEN_STRUCT.unpack(len_data)
data = ''
while len(data) < len_:
data += conn.recv(len_ - len(data))
conn.send(SUCCESS_CODE)
conn.send(struct.pack('<L%ds' % len(data), len(data), data[::-1]))
else:
conn.send(ERROR_CODE)
conn.send(ERROR_MESSAGE_DATA)
def main():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((HOST, PORT))
sock.listen(1)
while True:
conn, addr = sock.accept()
print 'Connect: %r' % (addr, )
threading.Thread(target=lambda: handle(conn)).start()
if __name__ == '__main__':
main()
Every message sent by a client starts with a 32bit integer, the command identifier. Currently only one command, ‘reverse’ with ID 1, is implemented. This takes a single string as argument. Strings are encoded in two parts: first, a 32bit integer containing the length of the string, followed by the actual string data as characters.
When the server receives a command, it sends a 32bit integer denoting success (0×00) or failure (0×01). The ‘reverse’ command simply returns a the input string, reversed, using the same string encoding as described before. Once this cycle is complete, a new command can be sent by a client.
Now, on to the client side. We’ll need some imports:
import socket import struct import logging import functools import collections from twisted.python import log from twisted.internet import defer, protocol, reactor from twisted.protocols import basic, stateful import utils
The ‘utils’ module contains some helpers, and is contained in the Pyrakoon repository.
We need a way to communicate between the protocol layer and the transport layer. Only the protocol side knows how much data is expected, and only the transport side can provide these bytes, even though it might not have the required amount of data available immediately, and want to yield execution (in case of asynchronous networking). To make development within these constraints easier, coroutines are used throughout the system to encapsulate intermediate state whenever possible, whilst maintaining a simple API.
Here’s how the API works: a single protocol action (e.g. reading the response of a request) is backed by a coroutine, which yields one or more ‘Request’ objects, which encapsulate the number of bytes that should be provided to the coroutine for the protocol to be able to construct a value, or a ‘Result’ object which encapsulate the final value. Whenever the upper layer receives a ‘Request’ object, it should read the requested number of bytes from the transport, then ‘send’ the data into the coroutine, which will yield another ‘Request’, or finally a ‘Response’.
The definitions are very simple:
class Request(object):
def __init__(self, count):
self.count = count
class Result(object):
def __init__(self, value):
self.value = value
Next, the protocol uses a couple of different types of values: 32bit (unsigned) integers, and strings. The latter uses the first in its internal encoding.
Every type has 3 methods: ‘check’, ‘serialize’ and ‘receive’.
‘check’ performs input validation for values of the given type (type check, boundary check,…). ‘serialize’ is a generator which yields the encoded data for a given value. ‘receive’ is a coroutine which yield ‘Request’ and ‘Result’ objects to receive a value of the type.
For basic types (e.g. integers) packed values using the ‘struct’ module can be used, so the base implementations provides the required functionality for this. Here’s the base type, and implementations for both 32bit unsigned integers and strings:
class Type(object):
PACKER = None
def check(self, value):
raise NotImplementedError
def serialize(self, value):
if not self.PACKER:
raise NotImplementedError
yield self.PACKER.pack(value)
def receive(self):
if not self.PACKER:
raise NotImplementedError
data = yield Request(self.PACKER.size)
result, = self.PACKER.unpack(data)
yield Result(result)
class UnsignedInt32(Type):
PACKER = struct.Struct('<I')
MAX_INT = (2 ** 32) - 1
def check(self, value):
if not isinstance(value, (int, long)):
raise TypeError
if value < 0:
raise ValueError('Unsigned integer expected')
if value > self.MAX_INT:
raise ValueError('Integer overflow')
UNSIGNED_INT32 = UnsignedInt32()
class String(Type):
def check(self, value):
if not isinstance(value, str):
raise TypeError
def serialize(self, value):
length = len(value)
for bytes_ in UNSIGNED_INT32.serialize(length):
yield bytes_
yield struct.pack('<%ds' % length, value)
def receive(self):
length_receiver = UNSIGNED_INT32.receive()
request = length_receiver.next()
while isinstance(request, Request):
value = yield request
request = length_receiver.send(value)
if not isinstance(request, Result):
raise TypeError
length = request.value
if length == 0:
result = ''
else:
data = yield Request(length)
result, = struct.unpack('<%ds' % length, data)
yield Result(result)
STRING = String()
Now the basic types are defined, we can describe the request/response messages transferred between client and server. Every message has a tag (its identifier), zero or more arguments, and a return type. Messages can be serialized and received similar to the corresponding methods on ‘Type’. Most, if not all necessary plumbing can be hidden inside the ‘Message’ class, so command-specific classes can be very short and simple. This makes it easy to add new protocol commands to the client as well!
Here’s the ‘Message’ definition, as well as the implementation of our ‘Reverse’ command. Note how simple the definition of the latter is.
class Message(object):
TAG = None
ARGS = None
RETURN_TYPE = None
def serialize(self):
for bytes_ in UNSIGNED_INT32.serialize(self.TAG):
yield bytes_
for arg in self.ARGS:
name, type_ = arg
for bytes_ in type_.serialize(getattr(self, name)):
yield bytes_
def receive(self):
code_receiver = UNSIGNED_INT32.receive()
request = code_receiver.next()
while isinstance(request, Request):
value = yield request
request = code_receiver.send(value)
if not isinstance(request, Result):
yield TypeError
code = request.value
if code == 0x00:
result_receiver = self.RETURN_TYPE.receive()
else:
result_receiver = STRING.receive()
request = result_receiver.next()
while isinstance(request, Request):
value = yield request
request = result_receiver.send(value)
if not isinstance(request, Result):
raise TypeError
result = request.value
if code == 0x00:
yield Result(result)
else:
raise Exception('Error %d: %s' % (code, result))
class Reverse(Message):
TAG = 0x01
ARGS = ('text', STRING),
RETURN_TYPE = STRING
def __init__(self, text):
super(Reverse, self).__init__()
self.text = text
Next up, we’ll write the base class for all actual client implementations. Some dynamic method construction is used on the go, based on the following 2 utility functions:
def validate_types(specs, args):
for spec, arg in zip(specs, args):
name, type_ = spec[:2]
try:
type_.check(arg)
except TypeError:
raise TypeError('Invalid type of argument "%s"' % name)
except ValueError:
raise ValueError('Invalid value of argument "%s"' % name)
def call(message_type):
def wrapper(fun):
argspec = ['self']
for arg in message_type.ARGS:
argspec.append(arg[0])
@utils.update_argspec(*argspec)
@functools.wraps(fun)
def wrapped(**kwargs):
self = kwargs['self']
if not self.connected:
raise RuntimeError('Not connected')
args = tuple(kwargs[arg[0]] for arg in message_type.ARGS)
validate_types(message_type.ARGS, args)
message = message_type(*args)
return self._process(message)
return wrapped
return wrapper
The ‘Client’ base class becomes extremely simple. Whenever a new command is added to the protocol, adding it to this class (as done for the ‘reverse’ call) is obvious.
class Client(object):
connected = False
@call(Reverse)
def reverse(self):
assert False
def _process(self, message):
raise NotImplementedError
That’s about all there is. What’s left is transport-specific client implementations.
Starting with a synchronous socket client is the easiest. All we need is a ‘connect’ method to set up a socket, and implement ‘Client._process’ to handle interaction between the protocol and the transport. The implementation is pretty straight-forward:
class SyncClient(Client):
def __init__(self):
self._socket = None
def connect(self, addr, port):
self._socket = socket.create_connection((addr, port))
@property
def connected(self):
return self._socket is not None
def _process(self, message):
try:
for part in message.serialize():
self._socket.sendall(part)
receiver = message.receive()
request = receiver.next()
while isinstance(request, Request):
data = ''
while len(data) < request.count:
d = self._socket.recv(request.count - len(data))
if not d:
raise Exception
data += d
request = receiver.send(data)
if not isinstance(request, Result):
raise TypeError
utils.kill_coroutine(receiver, logging.exception)
return request.value
except Exception:
try:
self._socket.close()
finally:
self._socket = None
raise
The Twisted protocol is somewhat more complex, and won’t be covered in detail in this post. If you ever wrote a Twisted protocol yourself, it should be easy to follow though. The implementation piggy-backs on ‘twisted.protocol.stateful.StatefulProtocol’, which simplifies a lot.
class TwistedProtocol(Client, stateful.StatefulProtocol, basic._PauseableMixin):
_INITIAL_REQUEST_SIZE = UNSIGNED_INT32.PACKER.size
def __init__(self):
Client.__init__(self)
self._handlers = collections.deque()
self._currentHandler = None
self._connected = False
self._deferredLock = defer.DeferredLock()
def _process(self, message):
deferred = defer.Deferred()
self._handlers.append((message.receive(), deferred))
def process(_):
try:
for data in message.serialize():
self.transport.write(data)
finally:
self._deferredLock.release()
self._deferredLock.acquire().addCallback(process)
return deferred
def getInitialState(self):
self._currentHandler = None
return self._responseCodeReceived, self._INITIAL_REQUEST_SIZE
def _responseCodeReceived(self, data):
self._currentHandler = None
try:
self._currentHandler = handler = self._handlers.pop()
except IndexError:
log.msg('Request data received but no handler registered')
self.transport.loseConnection()
return None
request = handler[0].next()
if isinstance(request, Result):
return self._handleResult(request)
elif isinstance(request, Request):
if request.count != self._INITIAL_REQUEST_SIZE:
handler[1].errback(ValueError('Unexpected request count'))
self.transport.loseConnection()
return None
return self._handleRequest(data)
else:
log.err(TypeError,
'Received unknown type from message parsing coroutine')
handler[1].errback(TypeError)
self.transport.loseConnection()
return None
def _handleRequest(self, data):
if not self._currentHandler:
log.msg('Request data received but no handler registered')
self.transport.loseConnection()
return None
receiver, deferred = self._currentHandler
try:
request = receiver.send(data)
except Exception, exc: #pylint: disable-msg=W0703
log.err(exc, 'Exception raised by message receive loop')
deferred.errback(exc)
return self.getInitialState()
if isinstance(request, Result):
return self._handleResult(request)
elif isinstance(request, Request):
return self._handleRequest, request.count
else:
log.err(TypeError,
'Received unknown type from message parsing coroutine')
deferred.errback(TypeError)
self.transport.loseConnection()
return None
def _handleResult(self, result):
receiver, deferred = self._currentHandler
self._currentHandler = None
# To be on the safe side...
utils.kill_coroutine(receiver, lambda msg: log.err(None, msg))
deferred.callback(result.value)
return self.getInitialState()
def connectionLost(self, reason=protocol.connectionDone):
self._connected = False
self._cancelHandlers(reason)
return stateful.StatefulProtocol.connectionLost(self, reason)
def _cancelHandlers(self, reason):
while self._handlers:
receiver, deferred = self._handlers.popleft()
utils.kill_coroutine(receiver, lambda msg: log.err(None, msg))
deferred.errback(reason)
That’s it! Finally, we can test our clients against the server:
HOST = 'localhost'
PORT = 8080
def test_sync():
client = SyncClient()
client.connect(HOST, PORT)
r = client.reverse('sync')
print 'sync =>', r
print r, '=>', client.reverse(r)
def test_twisted():
def create_client(host, port):
client = protocol.ClientCreator(reactor, TwistedProtocol)
return client.connectTCP(host, port)
@defer.inlineCallbacks
def run(proto):
result = yield proto.reverse('twisted')
print 'twisted =>', result
result2 = yield proto.reverse(result)
print result2, '=>', result
proto.transport.loseConnection()
deferred = create_client(HOST, PORT)
deferred.addCallback(run)
deferred.addBoth(lambda _: reactor.stop())
reactor.run()
if __name__ == '__main__':
test_sync()
test_twisted()
If for example an ‘add’ method is added to the server, which returns the sum of two given 32bit unsigned integers, we could define a new command like this:
class Add(Message):
TAG = 0x02
ARGS = ('a', UNSIGNED_INT32), ('b', UNSIGNED_INT32),
RETURN_TYPE = UNSIGNED_INT32
def __init__(self, a, b):
super(Add, self).__init__()
self.a = a
self.b = b
Next, add it to the ‘Client’ class like this:
@call(Add)
def add(self):
assert False
Once this is done, the ‘add(self, a, b)’ method will be available on all clients and work as expected!
This is just a basic example. The Arakoon protocol contains more complex types as well, including ‘option’ types and lists. See the Pyrakoon source-code to see how this is handled. Only a type definition should be added, multiple commands can use them as-is easily.
Using the approach described in this post, it becomes easy to provide client implementations using several different backends (blocking, non-blocking, sockets or anything else as transport,…), and simplify adding new commands/calls to all clients at once (keeping them in sync). This simplifies client maintenance a lot.

[...] Texto original disponível em http://blog.incubaid.com/2011/12/13/hybrid-sync-async-python-requestresponse-protocol-client-impleme… [...]