nameko.testing.rabbit 源代码

import json

import six
from requests import ConnectionError, HTTPError, Session
from requests.auth import HTTPBasicAuth
from requests.utils import get_auth_from_url, urldefragauth
from six.moves.urllib.parse import quote  # pylint: disable=E0401


__all__ = ['Client', 'HTTPError']


def _quote(value):
    return quote(value, '')


[文档] class Client(object): """Pyrabbit replacement using requests instead of httplib2 """ def __init__(self, uri): # move basic auth creds into headers to avoid # https://github.com/requests/requests/issues/4275 username, password = get_auth_from_url(uri)
[文档] uri = urldefragauth(uri)
[文档] self._base_url = '{}/api'.format(uri)
[文档] self._session = Session()
[文档] self._session.auth = HTTPBasicAuth(username, password)
self._session.headers['content-type'] = 'application/json' self._verify_api_connection()
[文档] def _build_url(self, args): args = map(_quote, args) return '{}/{}'.format( self._base_url, '/'.join(args), )
[文档] def _request(self, method, *args, **kwargs): url = self._build_url(args) json_data = kwargs.pop('json', None) if json_data is not None: kwargs['data'] = json.dumps(json_data) try: result = self._session.request(method, url, **kwargs) except ConnectionError as exc: six.raise_from(Exception( 'Connection error for the RabbitMQ management HTTP' ' API at {}, is it enabled?'.format(url) ), exc) result.raise_for_status() if result.content: return result.json()
[文档] def _get(self, *args, **kwargs): return self._request('GET', *args, **kwargs)
[文档] def _put(self, *args, **kwargs): return self._request('PUT', *args, **kwargs)
[文档] def _delete(self, *args, **kwargs): return self._request('DELETE', *args, **kwargs)
[文档] def _post(self, *args, **kwargs): return self._request('POST', *args, **kwargs)
[文档] def _verify_api_connection(self): self._get('overview')
[文档] def get_connections(self): return self._get('connections')
[文档] def delete_connection(self, name): return self._delete('connections', name)
[文档] def get_exchanges(self, vhost): return self._get('exchanges', vhost)
[文档] def get_all_vhosts(self): return self._get('vhosts')
[文档] def create_vhost(self, vhost): return self._put('vhosts', vhost)
[文档] def delete_vhost(self, vhost): return self._delete('vhosts', vhost)
[文档] def set_vhost_permissions(self, vhost, username, configure, read, write): permissions = { 'configure': configure, 'read': read, 'write': write, } return self._put( 'permissions', vhost, username, json=permissions)
[文档] def get_queue(self, vhost, name): return self._get('queues', vhost, name)
[文档] def create_queue(self, vhost, name, **properties): return self._put('queues', vhost, name, json=properties)
[文档] def get_queues(self, vhost): return self._get('queues', vhost)
[文档] def get_queue_bindings(self, vhost, name): return self._get('queues', vhost, name, 'bindings')
[文档] def create_queue_binding(self, vhost, exchange, queue, routing_key): body = { 'routing_key': routing_key, } return self._post( 'bindings', vhost, 'e', exchange, 'q', queue, json=body )
[文档] def publish(self, vhost, name, routing_key, payload, properties=None): body = { 'routing_key': routing_key, 'payload': payload, 'properties': properties or {}, 'payload_encoding': 'string', } return self._post('exchanges', vhost, name, 'publish', json=body)
[文档] def get_messages(self, vhost, name, count=1, requeue=False): body = { 'count': count, 'encoding': 'auto', 'requeue': requeue, } return self._post('queues', vhost, name, 'get', json=body)