介绍使用Nameko的Python微服务

本文概述

  • 介绍
  • Python微服务
  • 本文总结
介绍 考虑到微服务的灵活性和弹性, 这种微服务架构模式是一种越来越流行的架构风格。结合Kubernetes等技术, 使用微服务架构来引导应用程序变得前所未有的容易。
根据Martin Fowler博客的一篇经典文章, 微服务架构风格可以概括为:
简而言之, 微服务架构风格是一种将单个应用程序开发为一组小服务的方法, 每个小服务都在自己的进程中运行并与轻量级机制(通常是HTTP资源API)进行通信。这些服务围绕业务功能构建, 并且可以由全自动部署机制独立部署。
换句话说, 遵循微服务架构的应用程序由几个独立的动态服务组成, 这些服务使用通信协议相互通信。通常会使用HTTP(和REST), 但是正如我们将看到的, 我们可以使用其他类型的通信协议, 例如基于AMQP(高级消息队列协议)的RPC(远程过程调用)。
可以将微服务模式视为SOA(面向服务的体系结构)的特定情况。但是, 在SOA中, 通常使用ESB(企业服务总线)来管理服务之间的通信。 ESB通常是高度复杂的, 并且包含用于复杂消息路由和业务规则应用程序的功能。在微服务中, 通常采用一种替代方法:” 智能端点和哑管道” , 这意味着服务本身应包含所有业务逻辑和复杂性(高内聚性), 但是服务之间的连接应尽可能简单。可能(高解耦), 这意味着服务不一定需要知道哪些其他服务将与其通信。这是在体系结构级别应用的关注点分离。
微服务的另一个方面是, 对于每个服务中应使用哪些技术没有强制性规定。你应该能够使用可以与其他服务进行通信的任何软件堆栈来编写服务。每个服务也都有自己的生命周期管理。所有这些都意味着在公司中, 可以让团队使用不同的技术甚至管理方法来研究单独的服务。每个团队都将关注业务能力, 以帮助建立一个更加敏捷的组织。
Python微服务 考虑到这些概念, 在本文中, 我们将重点介绍使用Python构建概念验证微服务应用程序。为此, 我们将使用Python微服务框架Nameko。它内置RPC over AMQP, 使你可以轻松地在服务之间进行通信。它还具有用于HTTP查询的简单界面, 我们将在本教程中使用该界面。但是, 要编写公开HTTP端点的微服务, 建议你使用其他框架, 例如Flask。要使用Flask通过RPC调用Nameko方法, 可以使用flask_nameko, 这是一个包装程序, 仅用于将Flask与Nameko互操作。
设置基本环境
让我们先运行从Nameko网站提取的最简单的示例, 然后将其扩展以达到我们的目的。首先, 你将需要安装Docker。我们将在示例中使用Python 3, 因此请确保还安装了它。然后, 创建一个python virtualenv并运行$ pip install nameko。
要运行Nameko, 我们需要RabbitMQ消息代理。它将负责我们Nameko服务之间的通信。不过请放心, 因为你不需要在计算机上再安装一个依赖项。使用Docker, 我们可以简单地下载一个预配置的映像, 然后运行它, 完成后, 只需停止该容器即可。没有守护程序, apt-get或dnf安装。
介绍使用Nameko的Python微服务

文章图片
通过运行$ docker run -p 5672:5672 – hostname nameko-rabbitmq rabbitmq:3启动RabbitMQ容器(你可能需要sudo来做到这一点)。这将使用最新版本的3 RabbitMQ启动Docker容器, 并将其通过默认端口5672公开。
带有微服务的Hello World
继续创建一个名为hello.py的文件, 其内容如下:
from nameko.rpc import rpcclass GreetingService: name = "greeting_service"@rpc def hello(self, name): return "Hello, {}!".format(name)

Nameko服务是类。这些类公开了实现为扩展的入口点。内置扩展包括创建代表RPC方法, 事件侦听器, HTTP端点或计时器的入口点的能力。还有一些社区扩展可用于与PostgreSQL数据库, Redis等进行交互……可以编写自己的扩展。
让我们继续运行示例。如果你在默认端口上运行RabbitMQ, 只需运行$ nameko run hello。它将找到RabbitMQ并自动连接到它。然后, 要测试我们的服务, 请在另一个终端中运行$ nameko shell。这将创建一个交互式外壳, 它将连接到同一RabbitMQ实例。很棒的事情是, 通过使用AMQP上的RPC, Nameko实现了自动服务发现。调用RPC方法时, nameko将尝试查找相应的运行服务。
介绍使用Nameko的Python微服务

文章图片
运行Nameko Shell时, 你将获得一个名为n的特殊对象, 该对象添加到名称空间中。该对象允许调度事件和进行RPC调用。要对我们的服务进行RPC调用, 请运行:
> > > n.rpc.greetingservice.hello(name='world') 'Hello, world!'

并发通话
这些服务类在呼叫时实例化, 并在呼叫完成后销毁。因此, 它们本质上应该是无状态的, 这意味着你不应尝试在调用之间保持对象或类中的任何状态。这意味着服务本身必须是无状态的。假设所有服务都是无状态的, Nameko可以通过使用事件let greenthreads来利用并发性。实例化的服务称为” 工人” , 并且可以有配置的同时运行的最大工人数量。
要在实践中验证Nameko并发性, 请在返回响应之前通过在过程调用中添加睡眠来修改源代码:
from time import sleepfrom nameko.rpc import rpcclass GreetingService: name = "greeting_service"@rpc def hello(self, name): sleep(5) return "Hello, {}!".format(name)

我们正在使用来自时间模块的睡眠, 该模块未启用异步功能。但是, 当使用nameko run来运行我们的服务时, 它将自动通过阻止诸如sleep(5)之类的调用来修补触发收益。
现在, 预计来自过程调用的响应时间大约需要5秒。但是, 当我们从nameko shell运行该代码时, 以下代码段的行为是什么?
res = [] for i in range(5): hello_res = n.rpc.greeting_service.hello.call_async(name=str(i)) res.append(hello_res)for hello_res in res: print(hello_res.result())

Nameko为每个RPC入口点提供一个非阻塞的call_async方法, 返回一个代理答复对象, 然后可以查询其结果。在回复代理上调用result方法时, 它将被阻塞, 直到返回响应为止。
不出所料, 此示例仅需五秒钟即可运行。每个工作人员都将被阻止等待睡眠呼叫完成, 但这不会阻止其他工作人员开始。例如, 用一个有用的阻塞I / O数据库调用替换该睡眠调用, 你将获得非常快的并发服务。
如前所述, Nameko在调用方法时创建工作程序。最大工人数是可配置的。默认情况下, 该数字设置为10。你可以测试将上面代码段中的range(5)更改为例如range(20)。这将调用hello方法20次, 现在需要十秒钟才能运行:
> > > res = [] > > > for i in range(20): ...hello_res = n.rpc.greeting_service.hello.call_async(name=str(i)) ...res.append(hello_res) > > > for hellores in res: ...print(hello_res.result()) Hello, 0! Hello, 1! Hello, 2! Hello, 3! Hello, 4! Hello, 5! Hello, 6! Hello, 7! Hello, 8! Hello, 9! Hello, 10! Hello, 11! Hello, 12! Hello, 13! Hello, 14! Hello, 15! Hello, 16! Hello, 17! Hello, 18! Hello, 19!

现在, 假设你调用该hello方法的并发用户太多(超过10个)。有些用户将挂起, 等待时间超过预期的五秒钟。一种解决方案是通过使用例如配置文件覆盖默认设置来增加工作量。但是, 如果由于调用的方法依赖于一些繁重的数据库查询而使服务器已受这十个工作线程的限制, 则增加工作线程数可能会导致响应时间增加更多。
扩展我们的服务
更好的解决方案是使用Nameko Microservices功能。到目前为止, 我们仅使用一台服务器(你的计算机), 运行一台RabbitMQ实例和一台服务实例。在生产环境中, 你将希望任意增加运行服务的节点数量过多的节点的数量。如果你希望消息代理更可靠, 则也可以构建RabbitMQ集群。
为了模拟服务扩展, 我们可以简单地打开另一个终端并使用$ nameko run hello像以前一样运行服务。这将启动另一个服务实例, 并有可能再运行十个工人。现在, 尝试再次使用range(20)运行该代码段。现在应该再次需要五秒钟才能运行。当有多个服务实例在运行时, Nameko将在可用实例之间轮循RPC请求。
Nameko的构建是为了可靠地处理集群中的这些方法调用。要进行测试, 请尝试运行片段, 然后在完成之前, 转到运行Nameko服务的终端之一, 然后按两次Ctrl + C。这将关闭主机, 而无需等待工人完成操作。 Nameko会将呼叫重新分配给另一个可用的服务实例。
在实践中, 你将使用Docker来对服务进行容器化(如稍后所述), 并使用诸如Kubernetes之类的编排工具来管理运行该服务以及其他依赖项(例如消息代理)的节点。如果使用Kubernetes正确完成, 你将可以在健壮的分布式系统中有效地转换应用程序, 而不会受到意外峰值的影响。此外, Kubernetes允许零停机时间部署。因此, 部署服务的新版本不会影响系统的可用性。
考虑到向后兼容性, 构建服务很重要, 因为在生产环境中, 同一服务的多个不同版本可能会同时运行, 尤其是在部署期间。如果你使用Kubernetes, 则在部署期间它将仅在有足够的新容器运行时杀死所有旧版本容器。
对于Nameko而言, 可以同时运行同一服务的多个不同版本, 这不是问题。由于它以循环方式分配呼叫, 因此呼叫可能会通过旧版本或新版本。要进行测试, 请使我们的服务在一个终端上运行旧版本, 然后将服务模块编辑为如下所示:
from time import sleepfrom nameko.rpc import rpcclass GreetingService: name = "greeting_service"@rpc def hello(self, name): sleep(5) return "Hello, {}! (version 2)".format(name)

如果从另一个终端运行该服务, 则将同时运行两个版本。现在, 再次运行我们的测试代码片段, 你将看到两个版本都显示出来:
> > > res = [] > > > for i in range(5): ...hello_res = n.rpc.greeting_service.hello.call_async(name=str(i)) ...res.append(hello_res) > > > for hellores in res: ...print(hello_res.result()) Hello, 0! Hello, 1! (version 2) Hello, 2! Hello, 3! (version 2) Hello, 4!

处理多个实例
现在, 我们知道了如何与Nameko有效合作以及扩展的工作原理。现在, 让我们更进一步, 使用Docker生态系统中的更多工具:docker-compose。如果你要部署到单个服务器, 这将是可行的, 这绝对不是理想选择, 因为你将无法利用微服务架构的许多优点。同样, 如果你想拥有更合适的基础架构, 则可以使用诸如Kubernetes之类的编排工具来管理容器的分布式系统。因此, 继续安装docker-compose。
同样, 我们要做的就是部署RabbitMQ实例, Nameko将负责其余的工作, 因为所有服务都可以访问该RabbitMQ实例。该示例的完整源代码可在此GitHub存储库中找到。
让我们构建一个简单的旅行应用程序来测试Nameko的功能。该应用程序允许注册机场和旅行。每个机场都被简单地存储为机场名称, 而行程存储了始发和目的地机场的ID。我们系统的体系结构如下所示:
介绍使用Nameko的Python微服务

文章图片
理想情况下, 每个微服务都应具有自己的数据库实例。但是, 为简单起见, 我为Trips和Airports微服务创建了一个共享的Redis数据库。网关微服务将通过类似于REST的简单API接收HTTP请求, 并使用RPC与Airports和Trips进行通信。
让我们从网关微服务开始。它的结构简单明了, 对于像Flask这样的框架的任何人都应该非常熟悉。我们基本上定义了两个端点, 每个端点都允许GET和POST方法:
import jsonfrom nameko.rpc import RpcProxy from nameko.web.handlers import httpclass GatewayService: name = 'gateway'airports_rpc = RpcProxy('airports_service') trips_rpc = RpcProxy('trips_service')@http('GET', '/airport/< string:airport_id> ') def get_airport(self, request, airport_id): airport = self.airports_rpc.get(airport_id) return json.dumps({'airport': airport})@http('POST', '/airport') def post_airport(self, request): data = http://www.srcmini.com/json.loads(request.get_data(as_text=True)) airport_id = self.airports_rpc.create(data['airport'])return airport_id@http('GET', '/trip/< string:trip_id> ') def get_trip(self, request, trip_id): trip = self.trips_rpc.get(trip_id) return json.dumps({'trip': trip})@http('POST', '/trip') def post_trip(self, request): data = http://www.srcmini.com/json.loads(request.get_data(as_text=True)) trip_id = self.trips_rpc.create(data['airport_from'], data['airport_to'])return trip_id

现在让我们来看看机场服务。正如预期的那样, 它公开了两种RPC方法。 get方法将只查询Redis数据库并返回机场的给定ID。 create方法将生成一个随机ID, 存储机场信息并返回ID:
import uuidfrom nameko.rpc import rpc from nameko_redis import Redisclass AirportsService: name = "airports_service"redis = Redis('development')@rpc def get(self, airport_id): airport = self.redis.get(airport_id) return airport@rpc def create(self, airport): airport_id = uuid.uuid4().hex self.redis.set(airport_id, airport) return airport_id

注意我们如何使用nameko_redis扩展名。看一下社区扩展列表。扩展以采用依赖项注入的方式实现。 Nameko负责启动每个工作人员将使用的实际扩展对象。
【介绍使用Nameko的Python微服务】机场和Trips微服务之间没有太大区别。这是Trips微服务的外观:
import uuidfrom nameko.rpc import rpc from nameko_redis import Redisclass AirportsService: name = "trips_service"redis = Redis('development')@rpc def get(self, trip_id): trip = self.redis.get(trip_id) return trip@rpc def create(self, airport_from_id, airport_to_id): trip_id = uuid.uuid4().hex self.redis.set(trip_id, { "from": airport_from_id, "to": airport_to_id }) return trip_id

每个微服务的Dockerfile也非常简单。唯一的依赖关系是nameko, 就机场和旅行服务而言, 也需要安装nameko-redis。这些依赖关系在每个服务的requirements.txt中给出。机场服务的Dockerfile如下所示:
FROM python:3RUN apt-get update & & apt-get -y install netcat & & apt-get cleanWORKDIR /appCOPY requirements.txt ./ RUN pip install --no-cache-dir -r requirements.txtCOPY config.yml ./ COPY run.sh ./ COPY airports.py ./RUN chmod +x ./run.shCMD ["./run.sh"]

该文件与其他服务的Dockerfile之间的唯一区别是源文件(在本例中为airport.py), 应相应地进行更改。
run.sh脚本负责等待, 直到RabbitMQ为止(对于Airports and Trips服务而言), Redis数据库已准备就绪。以下代码片段显示了机场的run.sh内容。同样, 对于其他服务, 只需从机场更改为网关或相应地跳闸即可:
#!/bin/bashuntil nc -z ${RABBIT_HOST} ${RABBIT_PORT}; do echo "$(date) - waiting for rabbitmq..." sleep 1 doneuntil nc -z ${REDIS_HOST} ${REDIS_PORT}; do echo "$(date) - waiting for redis..." sleep 1 donenameko run --config config.yml airports

我们的服务现在可以运行了:
$ docker-compose up
让我们测试一下我们的系统。运行命令:
$ curl -i -d "{\"airport\": \"first_airport\"}" localhost:8000/airport HTTP/1.1 200 OK Content-Type: text/plain; charset=utf-8 Content-Length: 32 Date: Sun, 27 May 2018 05:05:53 GMTf2bddf0e506145f6ba0c28c247c54629

最后一行是我们机场生成的ID。要测试它是否正常运行, 请运行:
$curl localhost:8000/airport/f2bddf0e506145f6ba0c28c247c54629 {"airport": "first_airport"}Great, now let’s add another airport: $ curl -i -d "{\"airport\": \"second_airport\"}" localhost:8000/airport HTTP/1.1 200 OK Content-Type: text/plain; charset=utf-8 Content-Length: 32 Date: Sun, 27 May 2018 05:06:00 GMT565000adcc774cfda8ca3a806baec6b5

现在我们有两个机场, 足以组成一个旅程。现在创建一个旅程:
$ curl -i -d "{\"airport_from\": \"f2bddf0e506145f6ba0c28c247c54629\", \"airport_to\": \"565000adcc774cfda8ca3a806baec6b5\"}" localhost:8000/trip HTTP/1.1 200 OK Content-Type: text/plain; charset=utf-8 Content-Length: 32 Date: Sun, 27 May 2018 05:09:10 GMT34ca60df07bc42e88501178c0b6b95e4

如前所述, 最后一行代表行程ID。让我们检查一下它是否正确插入:
$ curl localhost:8000/trip/34ca60df07bc42e88501178c0b6b95e4 {"trip": "{'from': 'f2bddf0e506145f6ba0c28c247c54629', 'to': '565000adcc774cfda8ca3a806baec6b5'}"}

本文总结 通过创建在本地运行的RabbitMQ实例, 连接到该实例并执行几次测试, 我们已经了解了Nameko的工作方式。然后, 我们将获得的知识应用到使用微服务架构的简单系统中。
尽管非常简单, 但我们的系统与生产就绪型部署非常相似。你最好使用其他框架来处理HTTP请求, 例如Falcon或Flask。两者都是不错的选择, 例如, 在你想要中断网关服务的情况下, 它们都可以轻松用于创建其他基于HTTP的微服务。 Flask的优点是已经有一个插件可以与Nameko进行交互, 但是你可以直接从任何框架中使用nameko-proxy。
Nameko也非常易于测试。为了简单起见, 我们这里没有介绍测试, 但是请查阅Nameko的测试文档。
由于微服务架构中的所有活动部件都需要确保你拥有强大的日志记录系统。要构建一个, 请参阅srcminier和Python开发人员Son Nguyen Kim编写的Python日志记录:深入教程。

    推荐阅读