编写扩展¶
结构(Structure)¶
扩展应继承自 nameko.extensions.Extension
。这个基类提供了扩展的基本结构,特别是以下方法,可以重写以添加功能:
- Extension.setup()[源代码]
在容器启动之前调用了绑定的扩展。
扩展应在此处进行任何必要的初始化。
- Extension.start()[源代码]
在容器成功启动时调用绑定的扩展。
此方法仅在所有其他扩展成功返回 Extension.setup 后被调用。如果扩展对外部事件做出反应,它现在应该开始对此进行响应。
- Extension.stop()[源代码]
在服务容器开始关闭时调用。
扩展应在此处执行任何优雅的关闭操作。
编写依赖提供者¶
几乎每个 Nameko 应用程序都需要定义自己的依赖项——可能是为了与没有 社区扩展 的数据库接口,或与 特定网络服务 进行通信。
依赖提供者应继承自 nameko.extensions.DependencyProvider 类,并实现一个 ~nameko.extensions.DependencyProvider.get_dependency 方法,该方法返回要注入到服务工作者中的对象。
推荐的模式是注入依赖项所需的最小接口。这减少了测试的复杂性,并使在测试中更容易执行服务代码。
依赖提供者还可以挂钩到 工作者生命周期。以下三个方法会在每个工作者的所有依赖提供者上被调用:
- DependencyProvider.worker_setup(worker_ctx)[源代码]
在服务工作者执行任务之前调用。
依赖项应在此处进行任何预处理,如果失败则引发异常。
Example: ...
- Parameters:
- worker_ctx
WorkerContext
- worker_ctx
- DependencyProvider.worker_result(worker_ctx, result=None, exc_info=None)[源代码]
在服务工作者执行结果时调用。
需要处理结果的依赖项应在此处进行处理。此方法在任何工作者完成时会被调用所有 Dependency 实例。
示例:数据库会话依赖项可能会刷新事务。
- Parameters:
- worker_ctx
WorkerContext
- worker_ctx
- DependencyProvider.worker_teardown(worker_ctx)[源代码]
在服务工作者执行完任务后调用。
依赖项应在此处进行任何后处理,如果失败则引发异常。
示例:数据库会话依赖项可能会提交会话。
- Parameters:
- worker_ctx
WorkerContext
- worker_ctx
并发与线程安全¶
~nameko.extensions.DependencyProvider.get_dependency 方法返回的对象应该是线程安全的,因为它可能被多个并发运行的工作者访问。
工作者生命周期 在执行服务方法的同一线程中被调用。这意味着,例如,你可以定义线程局部变量,并从每个方法中访问它们。
示例¶
一个简单的 DependencyProvider
,用于向 SQS 队列发送消息。
from nameko.extensions import DependencyProvider
import boto3
class SqsSend(DependencyProvider):
def __init__(self, url, region="eu-west-1", **kwargs):
self.url = url
self.region = region
super(SqsSend, self).__init__(**kwargs)
def setup(self):
self.client = boto3.client('sqs', region_name=self.region)
def get_dependency(self, worker_ctx):
def send_message(payload):
# assumes boto client is thread-safe for this action, because it
# happens inside the worker threads
self.client.send_message(
QueueUrl=self.url,
MessageBody=payload
)
return send_message
编写入口点¶
如果你想支持新的传输或启动服务代码的机制,可以实现新的 Entrypoint 扩展。
Entrypoint 的最低要求是:
实现
start()
方法,以便在容器启动时启动入口点。如果需要后台线程,建议使用由服务容器管理的线程(参见 生成后台线程 )。在适当的时候调用绑定容器上的
spawn_worker()
。
示例¶
一个简单的 Entrypoint
,用于从 SQS 队列接收消息。
from nameko.extensions import Entrypoint
from functools import partial
import boto3
class SqsReceive(Entrypoint):
def __init__(self, url, region="eu-west-1", **kwargs):
self.url = url
self.region = region
super(SqsReceive, self).__init__(**kwargs)
def setup(self):
self.client = boto3.client('sqs', region_name=self.region)
def start(self):
self.container.spawn_managed_thread(
self.run, identifier="SqsReceiver.run"
)
def run(self):
while True:
response = self.client.receive_message(
QueueUrl=self.url,
WaitTimeSeconds=5,
)
messages = response.get('Messages', ())
for message in messages:
self.handle_message(message)
def handle_message(self, message):
handle_result = partial(self.handle_result, message)
args = (message['Body'],)
kwargs = {}
self.container.spawn_worker(
self, args, kwargs, handle_result=handle_result
)
def handle_result(self, message, worker_ctx, result, exc_info):
# assumes boto client is thread-safe for this action, because it
# happens inside the worker threads
self.client.delete_message(
QueueUrl=self.url,
ReceiptHandle=message['ReceiptHandle']
)
return result, exc_info
receive = SqsReceive.decorator
在一个服务中使用:
from .sqs_receive import receive
class SqsService(object):
name = "sqs-service"
@receive('https://sqs.eu-west-1.amazonaws.com/123456789012/nameko-sqs')
def handle_sqs_message(self, body):
""" This method is called by the `receive` entrypoint whenever
a message sent to the given SQS queue.
"""
print(body)
return body
预期异常¶
Entrypoint 基类构造函数将接受一个类列表,这些类在被装饰的服务方法中被引发时应被视为“预期的”。这可以用于区分 用户错误 和更根本的执行错误。例如:
class Service:
name = "service"
auth = Auth()
@rpc(expected_exceptions=Unauthorized)
def update(self, data):
if not self.auth.has_role("admin"):
raise Unauthorized()
# perform update
raise TypeError("Whoops, genuine error.")
预期异常的列表会保存到 Entrypoint 实例中,以便稍后进行检查,例如通过其他处理异常的扩展,如 nameko-sentry 。
敏感参数¶
与 预期异常 类似,Entrypoint 构造函数允许你将某些参数或参数部分标记为敏感。例如:
class Service:
name = "service"
auth = Auth()
@rpc(sensitive_arguments="password", expected_exceptions=Unauthenticated)
def login(self, username, password):
# raises Unauthenticated if username/password do not match
return self.auth.authenticate(username, password)
这可以与实用函数 nameko.utils.get_redacted_args 结合使用,该函数将返回入口点的调用参数(类似于 inspect.getcallargs ),但敏感元素被遮蔽。
这在记录或保存有关入口点调用信息的扩展中非常有用,例如 nameko-tracer 。
对于接受嵌套在其他安全参数中的敏感信息的入口点,可以指定部分遮蔽。例如:
# by dictionary key
@entrypoint(sensitive_arguments="foo.a")
def method(self, foo):
pass
>>> get_redacted_args(method, foo={'a': 1, 'b': 2})
... {'foo': {'a': '******', 'b': 2}}
# list index
@entrypoint(sensitive_arguments="foo.a[1]")
def method(self, foo):
pass
>>> get_redacted_args(method, foo=[{'a': [1, 2, 3]}])
... {'foo': {'a': [1, '******', 3]}}
不支持切片和相对列表索引。
生成后台线程¶
需要在线程中执行工作的扩展可以选择通过使用 spawn_managed_thread()
将该线程的管理委托给服务容器。
def start(self):
self.container.spawn_managed_thread(
self.run, identifier="SqsReceiver.run"
)
建议将线程管理委托给容器,因为:
管理的线程在容器停止或被杀死时会始终被终止。
管理线程中的未处理异常会被容器捕获,并导致其终止并显示适当的消息,这可以防止进程挂起。