种示例¶
rpc 调用¶
from nameko.rpc import rpc, RpcProxy
class ServiceY:
name = "service_y"
@rpc
def append_identifier(self, value):
return u"{}-y".format(value)
class ServiceX:
name = "service_x"
y = RpcProxy("service_y")
@rpc
def remote_method(self, value):
res = u"{}-x".format(value)
return self.y.append_identifier(res)
异步 rpc 调用¶
with ClusterRpcProxy(config) as cluster_rpc:
hello_res = cluster_rpc.service_x.remote_method.call_async("hello")
world_res = cluster_rpc.service_x.remote_method.call_async("world")
# do work while waiting
hello_res.result() # "hello-x-y"
world_res.result() # "world-x-y"
本质就是不等待消费线程返回,再次调用远程方法, 见 call_async()
Travis Web 服务¶
import requests
from nameko.extensions import DependencyProvider
from nameko.rpc import rpc
URL_TEMPLATE = "https://api.travis-ci.org/repos/{}/{}"
class ApiWrapper:
def __init__(self, session):
self.session = session
def repo_status(self, owner, repo):
url = URL_TEMPLATE.format(owner, repo)
return self.session.get(url).json()
class TravisWebservice(DependencyProvider):
def setup(self):
self.session = requests.Session()
def get_dependency(self, worker_ctx):
return ApiWrapper(self.session)
class Travis:
name = "travis_service"
webservice = TravisWebservice()
@rpc
def status_message(self, owner, repo):
status = self.webservice.repo_status(owner, repo)
outcome = "passing" if status['last_build_result'] else "failing"
return "Project {repo} {outcome} since {timestamp}.".format(
repo=status['slug'],
outcome=outcome,
timestamp=status['last_build_finished_at']
)
Event¶
from nameko.events import EventDispatcher, event_handler
from nameko.rpc import rpc
class ServiceA:
""" Event dispatching service. """
name = "service_a"
dispatch = EventDispatcher()
@rpc
def dispatching_method(self, payload):
self.dispatch("event_type", payload)
class ServiceB:
""" Event listening service. """
name = "service_b"
@event_handler("service_a", "event_type")
def handle_event(self, payload):
print("service b received:", payload)
Event broadcast¶
from nameko.events import BROADCAST, event_handler
class ListenerService:
name = "listener"
@event_handler(
"monitor", "ping", handler_type=BROADCAST, reliable_delivery=False
)
def ping(self, payload):
# all running services will respond
print("pong from {}".format(self.name))
Timer¶
from nameko.timer import timer
class Service:
name ="service"
@timer(interval=1)
def ping(self):
# method executed every second
print("pong")
standalone RPC¶
from nameko.standalone.rpc import ClusterRpcProxy
config = {
'AMQP_URI': AMQP_URI # e.g. "pyamqp://guest:guest@localhost"
}
with ClusterRpcProxy(config) as cluster_rpc:
cluster_rpc.service_x.remote_method("hellø") # "hellø-x-y"
standalone events¶
from nameko.standalone.events import event_dispatcher
config = {
'AMQP_URI': AMQP_URI # e.g. "pyamqp://guest:guest@localhost"
}
dispatch = event_dispatcher(config)
dispatch("service_a", "event_type", "payløad")
Service¶
Runner¶
from nameko.runners import ServiceRunner
from nameko.testing.utils import get_container
class ServiceA:
name = "service_a"
class ServiceB:
name = "service_b"
# create a runner for ServiceA and ServiceB
runner = ServiceRunner(config={})
runner.add_service(ServiceA)
runner.add_service(ServiceB)
# ``get_container`` will return the container for a particular service
container_a = get_container(runner, ServiceA)
# start both services
runner.start()
# stop both services
runner.stop()
Container¶
from nameko.containers import ServiceContainer
class Service:
name = "service"
# create a container
container = ServiceContainer(Service, config={})
# ``container.extensions`` exposes all extensions used by the service
service_extensions = list(container.extensions)
# start service
container.start()
# stop service
container.stop()
http¶
# http.py
import json
from nameko.web.handlers import http
class HttpService:
name = "http_service"
@http('GET', '/get/<int:value>')
def get_method(self, request, value):
return json.dumps({'value': value})
@http('POST', '/post')
def do_post(self, request):
return u"received: {}".format(request.get_data(as_text=True))
@http('GET,PUT,POST,DELETE', '/multi')
def do_multi(self, request):
return request.method
exceptions¶
import json
from nameko.web.handlers import HttpRequestHandler
from werkzeug.wrappers import Response
from nameko.exceptions import safe_for_serialization
class HttpError(Exception):
error_code = 'BAD_REQUEST'
status_code = 400
class InvalidArgumentsError(HttpError):
error_code = 'INVALID_ARGUMENTS'
class HttpEntrypoint(HttpRequestHandler):
def response_from_exception(self, exc):
if isinstance(exc, HttpError):
response = Response(
json.dumps({
'error': exc.error_code,
'message': safe_for_serialization(exc),
}),
status=exc.status_code,
mimetype='application/json'
)
return response
return HttpRequestHandler.response_from_exception(self, exc)
http = HttpEntrypoint.decorator
class Service:
name = "http_service"
@http('GET', '/custom_exception')
def custom_exception(self, request):
raise InvalidArgumentsError("Argument `foo` is required.")
advanced¶
# advanced_http.py
from nameko.web.handlers import http
from werkzeug.wrappers import Response
class Service:
name = "advanced_http_service"
@http('GET', '/privileged')
def forbidden(self, request):
return 403, "Forbidden"
@http('GET', '/headers')
def redirect(self, request):
return 201, {'Location': 'https://www.example.com/widget/1'}, ""
@http('GET', '/custom')
def custom(self, request):
return Response("payload")
自定义 receive¶
sqs receive¶
from nameko.extensions import Entrypoint
from functools import partial
import boto3
class SqsReceive(Entrypoint):
def __init__(self, url, region="eu-west-1", **kwargs):
self.url = url
self.region = region
super(SqsReceive, self).__init__(**kwargs)
def setup(self):
self.client = boto3.client('sqs', region_name=self.region)
def start(self):
self.container.spawn_managed_thread(
self.run, identifier="SqsReceiver.run"
)
def run(self):
while True:
response = self.client.receive_message(
QueueUrl=self.url,
WaitTimeSeconds=5,
)
messages = response.get('Messages', ())
for message in messages:
self.handle_message(message)
def handle_message(self, message):
handle_result = partial(self.handle_result, message)
args = (message['Body'],)
kwargs = {}
self.container.spawn_worker(
self, args, kwargs, handle_result=handle_result
)
def handle_result(self, message, worker_ctx, result, exc_info):
# assumes boto client is thread-safe for this action, because it
# happens inside the worker threads
self.client.delete_message(
QueueUrl=self.url,
ReceiptHandle=message['ReceiptHandle']
)
return result, exc_info
receive = SqsReceive.decorator
sqs service¶
from nameko.extensions import Entrypoint
from functools import partial
import boto3
class SqsReceive(Entrypoint):
def __init__(self, url, region="eu-west-1", **kwargs):
self.url = url
self.region = region
super(SqsReceive, self).__init__(**kwargs)
def setup(self):
self.client = boto3.client('sqs', region_name=self.region)
def start(self):
self.container.spawn_managed_thread(
self.run, identifier="SqsReceiver.run"
)
def run(self):
while True:
response = self.client.receive_message(
QueueUrl=self.url,
WaitTimeSeconds=5,
)
messages = response.get('Messages', ())
for message in messages:
self.handle_message(message)
def handle_message(self, message):
handle_result = partial(self.handle_result, message)
args = (message['Body'],)
kwargs = {}
self.container.spawn_worker(
self, args, kwargs, handle_result=handle_result
)
def handle_result(self, message, worker_ctx, result, exc_info):
# assumes boto client is thread-safe for this action, because it
# happens inside the worker threads
self.client.delete_message(
QueueUrl=self.url,
ReceiptHandle=message['ReceiptHandle']
)
return result, exc_info
receive = SqsReceive.decorator
sqs send¶
from nameko.extensions import Entrypoint
from functools import partial
import boto3
class SqsReceive(Entrypoint):
def __init__(self, url, region="eu-west-1", **kwargs):
self.url = url
self.region = region
super(SqsReceive, self).__init__(**kwargs)
def setup(self):
self.client = boto3.client('sqs', region_name=self.region)
def start(self):
self.container.spawn_managed_thread(
self.run, identifier="SqsReceiver.run"
)
def run(self):
while True:
response = self.client.receive_message(
QueueUrl=self.url,
WaitTimeSeconds=5,
)
messages = response.get('Messages', ())
for message in messages:
self.handle_message(message)
def handle_message(self, message):
handle_result = partial(self.handle_result, message)
args = (message['Body'],)
kwargs = {}
self.container.spawn_worker(
self, args, kwargs, handle_result=handle_result
)
def handle_result(self, message, worker_ctx, result, exc_info):
# assumes boto client is thread-safe for this action, because it
# happens inside the worker threads
self.client.delete_message(
QueueUrl=self.url,
ReceiptHandle=message['ReceiptHandle']
)
return result, exc_info
receive = SqsReceive.decorator