Shampoo is a asyncio websocket protocol implementation for Autobahn.
Shampoo will connect incomming websockets to user defined endpoint classes based on their path. The connecting client can make calls to the endpoint using simple JSON messages and the endpoint can send the client push messages.
Note: Only python versions 3.5 and up are supported.
$ pip install shampooFirst setup the Autobahn websocket server:
#!/usr/bin/env python
import asyncio
from autobahn.asyncio.websocket import WebSocketServerFactory
from shampoo.shampoo import ShampooProtocol
import txaio
if __name__ == '__main__':
txaio.use_asyncio()
factory = WebSocketServerFactory('ws://localhost:9007')
factory.protocol = ShampooProtocol
loop = asyncio.get_event_loop()
coro = loop.create_server(factory, '', 9007)
server = loop.run_until_complete(coro)
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
server.close()
loop.close()This will give us a running server. Now we set up an example endpoint. We're
creating a 'number' endpoint. We can connect to the endpoint for the numbers
0 - 9 and create an endpoint to add a number. The endpoint connection will be
at ws://localhost:9007/number/<number>/.
from shampoo import shampoo
@shampoo.websocket_endpoint('r/number/(?P<number>\d+)/?')
class NumberEndpoint():
def __init__(self, number, protocol, peer, params, headers):
"""Connection initialization for the number endpoint"""
self.number = int(number)
// We only accept numbers 0 - 9.
if self.number not in range(10):
// Connection will be refused with http status code 404.
raise shampoo.ShampooEndpointInitializationError(
404, 'Number not available')
def cleanup(self):
"""Called when the connection is closed. We can clean up here"""
@shampoo.websocket_method()
def add_number(self, request_data):
"""Method that can be called by the client.
Needs to return a tuple with an `object`, `status_code` and a
`message`. You can ommit the status code and the message.
"""
return {
"number": self.number + request_data['number']
}, 200, 'ok' ## You can ommit the status code and messageWe registered the endpoint using a decorator. Note that we also had to use a
decorator for the add_number method. Only allowed methods can be remotely
called, this is to prevent someone from calling the __init__ method or any
method you would not want to expose. You can manually set the method as
callable by by setting NumberEndpoint.add_number.is_endpoint_method = True.
You can also manually register the endpoint itself using register_endpoint.
from shampoo.shampoo import register_endpoint
register_endpoint(r'/number/(?P<number>\d+)/?', NumberEndpoint)That's it, now we can connect to this server and make calls. For example make
a connection to ws://localhost:9007/number/3/. The connection needs to
request the shampoo protocol, otherwise the connection will be refused.
When the connection is established we can make a call by sending a JSON
message:
{
"method": "is_prime",
"request_data": {
"number": 4
},
"request_id": 1
}This will give the following response:
{
"response_data": {
"number": 7
},
"message": "ok",
"status": 200,
"request_id": 1
}For the exact specification of the request and response JSON see
request.json and
response.json.
You can notifiy a connected client of any events with push messages. This is
an example using redis pubsub.
import asyncio
import json
import asyncio_redis
from shampoo import shampoo
@shampoo.websocket_endpoint('r/notifications/?')
class NotificationEndpoint():
def __init__(protocol, **kwargs):
self.protocol = protocol
sef.protocol.register_coroutine(self.notifications)
@asyncio.coroutine
def notifications(self):
self.subscription = Subscription()
yield from self.subscription.initialize()
while True:
message = yield from self.subscription.next_message()
self._protocol.push_message({'message': message})
class Subscription():
"""Listens to Redis and relay messages to all subscribers
This class sets up one connection to Redis and distributes all
messages to all subscribers. Without this there would be a Redis
connection per subscriber (websocket connection).
"""
redis = None
subscriptions = []
@asyncio.coroutine
def initialize():
if self.redis is None:
self.redis = yield from asyncio_redis.Connection.create(
host='127.0.0.1', port=6379)
self.pubsub = yield from self.redis.start_subscribe()
yield from self.pubsub.subscribe(['notifications'])
asyncio.get_event_loop().create_task(
Subscription.activity_monitor())
@asyncio.coroutine
def activity_monitor()
while True:
result = yield from pubsub.next_published()
message = json.loads(result.value)
subscriptions = Subscription.subscriptions
Subscription.subscriptions = []
for subscription in subscriptions:
subscription.future.set_result(message)
@asyncio.coroutine
def next_message():
self.future = asyncio.Future()
self.subscriptions.append(self)
return futureWhen you publish a message to redis with
redis.publish('notifications', 'This is a notification!'), the client
gets the following message:
{
"push_data": {
"message": "This is a notification!"
}
}See for the exact specification push_message.json.
For JSON decoding and encoding the standard python json module is used. If
you want to use a custom encoder or decoder you can set them like this:
from shampoo import shampoo
from custom_json import CustomJSONEncoder, CustomJSONDecoder
shampoo.set_json_encoder(CustomJSONEncoder)
shampoo.set_json_decoder(CustomerJSONDecoder)