import json
import six
from requests import ConnectionError, HTTPError, Session
from requests.auth import HTTPBasicAuth
from requests.utils import get_auth_from_url, urldefragauth
from six.moves.urllib.parse import quote # pylint: disable=E0401
__all__ = ['Client', 'HTTPError']
def _quote(value):
return quote(value, '')
[文档]
class Client(object):
"""Pyrabbit replacement using requests instead of httplib2 """
def __init__(self, uri):
# move basic auth creds into headers to avoid
# https://github.com/requests/requests/issues/4275
username, password = get_auth_from_url(uri)
[文档]
uri = urldefragauth(uri)
[文档]
self._base_url = '{}/api'.format(uri)
[文档]
self._session = Session()
[文档]
self._session.auth = HTTPBasicAuth(username, password)
self._session.headers['content-type'] = 'application/json'
self._verify_api_connection()
[文档]
def _build_url(self, args):
args = map(_quote, args)
return '{}/{}'.format(
self._base_url,
'/'.join(args),
)
[文档]
def _request(self, method, *args, **kwargs):
url = self._build_url(args)
json_data = kwargs.pop('json', None)
if json_data is not None:
kwargs['data'] = json.dumps(json_data)
try:
result = self._session.request(method, url, **kwargs)
except ConnectionError as exc:
six.raise_from(Exception(
'Connection error for the RabbitMQ management HTTP'
' API at {}, is it enabled?'.format(url)
), exc)
result.raise_for_status()
if result.content:
return result.json()
[文档]
def _get(self, *args, **kwargs):
return self._request('GET', *args, **kwargs)
[文档]
def _put(self, *args, **kwargs):
return self._request('PUT', *args, **kwargs)
[文档]
def _delete(self, *args, **kwargs):
return self._request('DELETE', *args, **kwargs)
[文档]
def _post(self, *args, **kwargs):
return self._request('POST', *args, **kwargs)
[文档]
def _verify_api_connection(self):
self._get('overview')
[文档]
def get_connections(self):
return self._get('connections')
[文档]
def delete_connection(self, name):
return self._delete('connections', name)
[文档]
def get_exchanges(self, vhost):
return self._get('exchanges', vhost)
[文档]
def get_all_vhosts(self):
return self._get('vhosts')
[文档]
def create_vhost(self, vhost):
return self._put('vhosts', vhost)
[文档]
def delete_vhost(self, vhost):
return self._delete('vhosts', vhost)
[文档]
def set_vhost_permissions(self, vhost, username, configure, read, write):
permissions = {
'configure': configure,
'read': read,
'write': write,
}
return self._put(
'permissions', vhost, username,
json=permissions)
[文档]
def get_queue(self, vhost, name):
return self._get('queues', vhost, name)
[文档]
def create_queue(self, vhost, name, **properties):
return self._put('queues', vhost, name, json=properties)
[文档]
def get_queues(self, vhost):
return self._get('queues', vhost)
[文档]
def get_queue_bindings(self, vhost, name):
return self._get('queues', vhost, name, 'bindings')
[文档]
def create_queue_binding(self, vhost, exchange, queue, routing_key):
body = {
'routing_key': routing_key,
}
return self._post(
'bindings', vhost, 'e', exchange, 'q', queue, json=body
)
[文档]
def publish(self, vhost, name, routing_key, payload, properties=None):
body = {
'routing_key': routing_key,
'payload': payload,
'properties': properties or {},
'payload_encoding': 'string',
}
return self._post('exchanges', vhost, name, 'publish', json=body)
[文档]
def get_messages(self, vhost, name, count=1, requeue=False):
body = {
'count': count,
'encoding': 'auto',
'requeue': requeue,
}
return self._post('queues', vhost, name, 'get', json=body)