编写扩展

结构(Structure)

扩展应继承自 nameko.extensions.Extension 。这个基类提供了扩展的基本结构,特别是以下方法,可以重写以添加功能:

Extension.setup()[源代码]

在容器启动之前调用了绑定的扩展。

扩展应在此处进行任何必要的初始化。

Extension.start()[源代码]

在容器成功启动时调用绑定的扩展。

此方法仅在所有其他扩展成功返回 Extension.setup 后被调用。如果扩展对外部事件做出反应,它现在应该开始对此进行响应。

Extension.stop()[源代码]

在服务容器开始关闭时调用。

扩展应在此处执行任何优雅的关闭操作。

编写依赖提供者

几乎每个 Nameko 应用程序都需要定义自己的依赖项——可能是为了与没有 社区扩展 的数据库接口,或与 特定网络服务 进行通信。

依赖提供者应继承自 nameko.extensions.DependencyProvider 类,并实现一个 ~nameko.extensions.DependencyProvider.get_dependency 方法,该方法返回要注入到服务工作者中的对象。

推荐的模式是注入依赖项所需的最小接口。这减少了测试的复杂性,并使在测试中更容易执行服务代码。

依赖提供者还可以挂钩到 工作者生命周期。以下三个方法会在每个工作者的所有依赖提供者上被调用:

DependencyProvider.worker_setup(worker_ctx)[源代码]

在服务工作者执行任务之前调用。

依赖项应在此处进行任何预处理,如果失败则引发异常。

Example: ...

Parameters:
worker_ctxWorkerContext

spawn_worker()

DependencyProvider.worker_result(worker_ctx, result=None, exc_info=None)[源代码]

在服务工作者执行结果时调用。

需要处理结果的依赖项应在此处进行处理。此方法在任何工作者完成时会被调用所有 Dependency 实例。

示例:数据库会话依赖项可能会刷新事务。

Parameters:
worker_ctxWorkerContext

spawn_worker()

DependencyProvider.worker_teardown(worker_ctx)[源代码]

在服务工作者执行完任务后调用。

依赖项应在此处进行任何后处理,如果失败则引发异常。

示例:数据库会话依赖项可能会提交会话。

Parameters:
worker_ctxWorkerContext

spawn_worker()

并发与线程安全

~nameko.extensions.DependencyProvider.get_dependency 方法返回的对象应该是线程安全的,因为它可能被多个并发运行的工作者访问。

工作者生命周期 在执行服务方法的同一线程中被调用。这意味着,例如,你可以定义线程局部变量,并从每个方法中访问它们。

示例

一个简单的 DependencyProvider,用于向 SQS 队列发送消息。

from nameko.extensions import DependencyProvider

import boto3

class SqsSend(DependencyProvider):

    def __init__(self, url, region="eu-west-1", **kwargs):
        self.url = url
        self.region = region
        super(SqsSend, self).__init__(**kwargs)

    def setup(self):
        self.client = boto3.client('sqs', region_name=self.region)

    def get_dependency(self, worker_ctx):

        def send_message(payload):
            # assumes boto client is thread-safe for this action, because it
            # happens inside the worker threads
            self.client.send_message(
                QueueUrl=self.url,
                MessageBody=payload
            )
        return send_message

编写入口点

如果你想支持新的传输或启动服务代码的机制,可以实现新的 Entrypoint 扩展。

Entrypoint 的最低要求是:

  1. 继承自 nameko.extensions.Entrypoint

  2. 实现 start() 方法,以便在容器启动时启动入口点。如果需要后台线程,建议使用由服务容器管理的线程(参见 生成后台线程 )。

  3. 在适当的时候调用绑定容器上的 spawn_worker()

示例

一个简单的 Entrypoint,用于从 SQS 队列接收消息。

from nameko.extensions import Entrypoint
from functools import partial

import boto3


class SqsReceive(Entrypoint):

    def __init__(self, url, region="eu-west-1", **kwargs):
        self.url = url
        self.region = region
        super(SqsReceive, self).__init__(**kwargs)

    def setup(self):
        self.client = boto3.client('sqs', region_name=self.region)

    def start(self):
        self.container.spawn_managed_thread(
            self.run, identifier="SqsReceiver.run"
        )

    def run(self):
        while True:
            response = self.client.receive_message(
                QueueUrl=self.url,
                WaitTimeSeconds=5,
            )
            messages = response.get('Messages', ())
            for message in messages:
                self.handle_message(message)

    def handle_message(self, message):
        handle_result = partial(self.handle_result, message)

        args = (message['Body'],)
        kwargs = {}

        self.container.spawn_worker(
            self, args, kwargs, handle_result=handle_result
        )

    def handle_result(self, message, worker_ctx, result, exc_info):
        # assumes boto client is thread-safe for this action, because it
        # happens inside the worker threads
        self.client.delete_message(
            QueueUrl=self.url,
            ReceiptHandle=message['ReceiptHandle']
        )
        return result, exc_info


receive = SqsReceive.decorator

在一个服务中使用:

from .sqs_receive import receive


class SqsService(object):
    name = "sqs-service"

    @receive('https://sqs.eu-west-1.amazonaws.com/123456789012/nameko-sqs')
    def handle_sqs_message(self, body):
        """ This method is called by the `receive` entrypoint whenever
        a message sent to the given SQS queue.
        """
        print(body)
        return body

预期异常

Entrypoint 基类构造函数将接受一个类列表,这些类在被装饰的服务方法中被引发时应被视为“预期的”。这可以用于区分 用户错误 和更根本的执行错误。例如:

class Service:
    name = "service"

    auth = Auth()

    @rpc(expected_exceptions=Unauthorized)
    def update(self, data):
        if not self.auth.has_role("admin"):
            raise Unauthorized()

        # perform update
        raise TypeError("Whoops, genuine error.")

预期异常的列表会保存到 Entrypoint 实例中,以便稍后进行检查,例如通过其他处理异常的扩展,如 nameko-sentry

敏感参数

预期异常 类似,Entrypoint 构造函数允许你将某些参数或参数部分标记为敏感。例如:

class Service:
    name = "service"

    auth = Auth()

    @rpc(sensitive_arguments="password", expected_exceptions=Unauthenticated)
    def login(self, username, password):
        # raises Unauthenticated if username/password do not match
        return self.auth.authenticate(username, password)

这可以与实用函数 nameko.utils.get_redacted_args 结合使用,该函数将返回入口点的调用参数(类似于 inspect.getcallargs ),但敏感元素被遮蔽。

这在记录或保存有关入口点调用信息的扩展中非常有用,例如 nameko-tracer

对于接受嵌套在其他安全参数中的敏感信息的入口点,可以指定部分遮蔽。例如:

# by dictionary key
@entrypoint(sensitive_arguments="foo.a")
def method(self, foo):
    pass

>>> get_redacted_args(method, foo={'a': 1, 'b': 2})
... {'foo': {'a': '******', 'b': 2}}

# list index
@entrypoint(sensitive_arguments="foo.a[1]")
def method(self, foo):
    pass

>>> get_redacted_args(method, foo=[{'a': [1, 2, 3]}])
... {'foo': {'a': [1, '******', 3]}}

不支持切片和相对列表索引。

生成后台线程

需要在线程中执行工作的扩展可以选择通过使用 spawn_managed_thread() 将该线程的管理委托给服务容器。

    def start(self):
        self.container.spawn_managed_thread(
            self.run, identifier="SqsReceiver.run"
        )

建议将线程管理委托给容器,因为:

  • 管理的线程在容器停止或被杀死时会始终被终止。

  • 管理线程中的未处理异常会被容器捕获,并导致其终止并显示适当的消息,这可以防止进程挂起。