nameko.testing.websocket 源代码

from __future__ import absolute_import

import json
import uuid
from collections import defaultdict

from eventlet.event import Event
from eventlet.queue import Queue

from nameko.exceptions import deserialize


[文档] def make_virtual_socket(host, port, path='/ws'): from websocket import WebSocketApp result_handlers = {} class Socket(object): def __init__(self): self._event_queues = defaultdict(Queue) def get_event_queue(self, event_type): return self._event_queues[event_type] def wait_for_event(self, event_type): return self.get_event_queue(event_type).get() def rpc(self, _method, **data): id = str(uuid.uuid4()) event = Event() result_handlers[id] = event.send ws_app.send(json.dumps({ 'method': _method, 'data': data, 'correlation_id': id, })) rv = event.wait() if rv['success']: return rv['data'] raise deserialize(rv['error']) sock = Socket() def on_message(ws, message): msg = json.loads(message) if msg['type'] == 'event': sock.get_event_queue(msg['event']).put((msg['event'], msg['data'])) elif msg['type'] == 'result': result_id = msg['correlation_id'] handler = result_handlers.pop(result_id, None) if handler is not None: handler(msg) ready_event = Event() def on_open(ws): ready_event.send(None) def on_error(ws, err): ready_event.send(err) ws_app = WebSocketApp( 'ws://%s:%d%s' % (host, port, path), on_message=on_message, on_open=on_open, on_error=on_error, ) def connect_socket(): err = ready_event.wait() if err is not None: raise err # (www.logilab.org/ticket/3207) pylint: disable=E0702 return sock return ws_app, connect_socket