内置扩展¶
Nameko 包含多个内置的 扩展 。本节介绍这些扩展并给出它们用法的简要示例。
RPC¶
Nameko 包含一个基于 AMQP 的 RPC 实现。它包括 @rpc
入口点,一个服务间通信的代理,以及一个独立的代理,非 Nameko 客户端可以使用它来向集群发起 RPC 调用:
from nameko.rpc import rpc, RpcProxy
class ServiceY:
name = "service_y"
@rpc
def append_identifier(self, value):
return u"{}-y".format(value)
class ServiceX:
name = "service_x"
y = RpcProxy("service_y")
@rpc
def remote_method(self, value):
res = u"{}-x".format(value)
return self.y.append_identifier(res)
from nameko.standalone.rpc import ClusterRpcProxy
config = {
'AMQP_URI': AMQP_URI # e.g. "pyamqp://guest:guest@localhost"
}
with ClusterRpcProxy(config) as cluster_rpc:
cluster_rpc.service_x.remote_method("hellø") # "hellø-x-y"
正常的 RPC 调用会阻塞,直到远程方法完成,但代理也具有异步调用模式,可以将 RPC 调用后台执行或并行化:
with ClusterRpcProxy(config) as cluster_rpc:
hello_res = cluster_rpc.service_x.remote_method.call_async("hello")
world_res = cluster_rpc.service_x.remote_method.call_async("world")
# do work while waiting
hello_res.result() # "hello-x-y"
world_res.result() # "world-x-y"
在具有多个目标服务实例的集群中,RPC 请求会在实例之间进行轮询。请求将由目标服务的一个实例处理。
只有在请求成功处理后,AMQP 消息才会被确认。如果服务未能确认消息并且 AMQP 连接关闭(例如,如果服务进程被终止),则代理会撤销并将消息分配给可用的服务实例。
请求和响应负载会序列化为 JSON 格式,以便通过网络传输。
事件 (发布-订阅)¶
Nameko 事件是一个异步消息系统,实现了发布-订阅模式。服务会分发事件,这些事件可能会被零个或多个其他服务接收:
from nameko.events import EventDispatcher, event_handler
from nameko.rpc import rpc
class ServiceA:
""" Event dispatching service. """
name = "service_a"
dispatch = EventDispatcher()
@rpc
def dispatching_method(self, payload):
self.dispatch("event_type", payload)
class ServiceB:
""" Event listening service. """
name = "service_b"
@event_handler("service_a", "event_type")
def handle_event(self, payload):
print("service b received:", payload)
EventHandler
入口点有三种 handler_type
s ,它们决定了事件消息在集群中是如何接收的:
SERVICE_POOL
— 事件处理程序按服务名称进行分组,每个池中的一个实例接收事件,类似于 RPC 入口点的集群行为。这是默认的处理程序类型。BROADCAST
— 每个监听的服务实例都将接收该事件。SINGLETON
— 精确一个监听的服务实例将接收该事件。
使用 BROADCAST
模式的示例:
from nameko.events import BROADCAST, event_handler
class ListenerService:
name = "listener"
@event_handler(
"monitor", "ping", handler_type=BROADCAST, reliable_delivery=False
)
def ping(self, payload):
# all running services will respond
print("pong from {}".format(self.name))
事件会序列化为 JSON 格式,以便通过网络传输。
HTTP¶
HTTP 入口点是基于 werkzeug 构建的,支持所有标准 HTTP 方法(GET/POST/DELETE/PUT 等)。
HTTP 入口点可以为单个 URL 指定多个 HTTP 方法,使用逗号分隔的列表。请参见下面的示例。
服务方法必须返回以下之一:
一个字符串,作为响应体
一个 2 元组
(状态码, 响应体)
一个 3 元组
(状态码, 头部字典, 响应体)
一个
werkzeug.wrappers.Response
的实例
# http.py
import json
from nameko.web.handlers import http
class HttpService:
name = "http_service"
@http('GET', '/get/<int:value>')
def get_method(self, request, value):
return json.dumps({'value': value})
@http('POST', '/post')
def do_post(self, request):
return u"received: {}".format(request.get_data(as_text=True))
@http('GET,PUT,POST,DELETE', '/multi')
def do_multi(self, request):
return request.method
$ nameko run http
starting services: http_service
$ curl -i localhost:8000/get/42
HTTP/1.1 200 OK
Content-Type: text/plain; charset=utf-8
Content-Length: 13
Date: Fri, 13 Feb 2015 14:51:18 GMT
{'value': 42}
$ curl -i -d "post body" localhost:8000/post
HTTP/1.1 200 OK
Content-Type: text/plain; charset=utf-8
Content-Length: 19
Date: Fri, 13 Feb 2015 14:55:01 GMT
received: post body
一个高级用法示例:
# advanced_http.py
from nameko.web.handlers import http
from werkzeug.wrappers import Response
class Service:
name = "advanced_http_service"
@http('GET', '/privileged')
def forbidden(self, request):
return 403, "Forbidden"
@http('GET', '/headers')
def redirect(self, request):
return 201, {'Location': 'https://www.example.com/widget/1'}, ""
@http('GET', '/custom')
def custom(self, request):
return Response("payload")
$ nameko run advanced_http
starting services: advanced_http_service
$ curl -i localhost:8000/privileged
HTTP/1.1 403 FORBIDDEN
Content-Type: text/plain; charset=utf-8
Content-Length: 9
Date: Fri, 13 Feb 2015 14:58:02 GMT
curl -i localhost:8000/headers
HTTP/1.1 201 CREATED
Location: https://www.example.com/widget/1
Content-Type: text/plain; charset=utf-8
Content-Length: 0
Date: Fri, 13 Feb 2015 14:58:48 GMT
您可以通过重写 response_from_exception()
来控制从您的服务返回的错误格式:
import json
from nameko.web.handlers import HttpRequestHandler
from werkzeug.wrappers import Response
from nameko.exceptions import safe_for_serialization
class HttpError(Exception):
error_code = 'BAD_REQUEST'
status_code = 400
class InvalidArgumentsError(HttpError):
error_code = 'INVALID_ARGUMENTS'
class HttpEntrypoint(HttpRequestHandler):
def response_from_exception(self, exc):
if isinstance(exc, HttpError):
response = Response(
json.dumps({
'error': exc.error_code,
'message': safe_for_serialization(exc),
}),
status=exc.status_code,
mimetype='application/json'
)
return response
return HttpRequestHandler.response_from_exception(self, exc)
http = HttpEntrypoint.decorator
class Service:
name = "http_service"
@http('GET', '/custom_exception')
def custom_exception(self, request):
raise InvalidArgumentsError("Argument `foo` is required.")
$ nameko run http_exceptions
starting services: http_service
$ curl -i http://localhost:8000/custom_exception
HTTP/1.1 400 BAD REQUEST
Content-Type: application/json
Content-Length: 72
Date: Thu, 06 Aug 2015 09:53:56 GMT
{"message": "Argument `foo` is required.", "error": "INVALID_ARGUMENTS"}
你可以通过配置中的 WEB_SERVER_ADDRESS 来改变 HTTP 的端口和IP地址:
# foobar.yaml
AMQP_URI: 'pyamqp://guest:guest@localhost'
WEB_SERVER_ADDRESS: '0.0.0.0:8000'
计时器¶
Timer
是一个简单的入口点,它会每隔可配置的秒数触发一次。该计时器不是“集群感知”的,会在所有服务实例上触发。
from nameko.timer import timer
class Service:
name ="service"
@timer(interval=1)
def ping(self):
# method executed every second
print("pong")