Dyno版本:AWS,Swifter

上一次,我们做了很多设置以开始使用Amazon Web Services的DynamoDB ,包括使用Swift-Python桥,以便我们可以使用官方的AWS接口boto3与DynamoDB进行通信。

但是boto3除了基于Python并因此没有Swift类型安全性之外,还具有一些局限性:它很复杂,难以使用—并且存在同步的主要问题。

要查看该问题,请运行上次到达的代码:

…现在,请尝试关闭计算机的WiFi ,然后重新运行。 怎么了? table.scan()行仅挂起那里30秒钟,直到出现令人讨厌的异常并且程序崩溃(带有不可恢复的致命错误)。 实际上,这不是我们希望库调用或可能具有间歇性网络连接的应用程序(例如移动应用程序¹)所期望的行为。

本文-更好的boto

Dyno库旨在做得更好。在本文中,我们将介绍如何做! 和以前一样,尽管这个想法是产生一个有用的库,但我也希望展示可以在自己的代码中使用的技术。

  • 我们将使boto3调用异步。 这将演示信号量,工作队列和工作项的使用
  • Dyno将利用新的Swift 5 Result类型发布一个Observable结果流。 在这里,我们将演示带有复杂数据流的Observable和Reactive编程
  • 我们将添加一些有用的,类型安全的方法来从DynamoDb(原生于Swift)读取和写入数据。 这说明了 我们数据类型上的 一些出色的功能构造,例如 zip flatMap

在本文的最后,我们将直接从Swift将Dinosaurs(当然是🦕和))写到DynamoDB数据库中,然后以异步方式读回它们,并适当考虑网络延迟。 这将成为我们Dyno库的开始。 和以前一样,该库正在公开开发中,因此您可以在github( swiftify分支)上查看源代码。

有很多事情要做,让我们开始吧!

可观察的流

正如我在上一篇文章中提到的,可观察对象是表示数据流的一种方式。 我们可以将它们连接到Reactive组件,以便能够以功能强大和声明性的方式处理数据流。 这是表示数据操作的一种非常强大的方法-我们将在以后的文章中介绍,但是现在我们将看看如何将DynamoDB交互表示为Observable。

建模数据交互的关键是要注意它们都看起来像这样:

  • 要求DynamoDB做某事(扫描表,更新行等)
  • 等待结果(返回200行,更新成功)…
  • 或出现错误,例如 超时或数据完整性错误。

我们使用DynoActivity数据类型的可观察流对这些阶段进行DynoActivity

它就像一串可观察的事物(橙色和红色的大理石代表可观察的事件)看起来像这样:

现在,我们要做的一件事是假设即使一次大型查询(例如,返回了数百行),我们也一次性获得了所有数据:我们不对输出进行“分页”。 我们将来可能会改变它²。

您可能还会注意到,我们希望我们的可观察流以多线程的方式异步工作:我们可以让多个流同时运行,有的读取数据,有的写入。

为什么我们不将Future用于这种类型的异步数据请求/响应? 使用Observable流可以非常轻松地处理诸如“显示等待图标,直到返回数据或显示错误”之类的交互模式。 对于实际应用而言,这是非常基本的。

现实检查

在创建高级Observable之前,我们需要处理以下事实:通过不可靠的连接与远程数据库进行同步接口,并使用Python接口进行引导。

具体来说,我们需要确保Dyno正在控制AWS连接上的活动,而不是将其留给Boto3的30秒同步,程序终止超时。 那么,如何在不自行控制Boto3代码的情况下使Boto3异步和多线程呢?

我们将使用DispatchSemaphoresDispatchQueuesDispatchWorkItems

DispatchWorkItems允许我们打包工作(在这种情况下,我们perform对Boto3的调用)并将其发送出去以在DispatchQueue上执行。 重要的是, DispatchWorkItem也可以在任何时候终止 -例如,在达到超时之后。 我们将使用它来强制Boto3调用正常停止,而不会在超时时崩溃整个程序。

perform返回Result类型的值,这是Swift 5中的新功能。 Results.success (具有成功值)或.failure (具有失败)。 稍后我们将看到,我们在Dyno的许多地方都使用了Result ,以确保我们以一致的方式报告任何错误。

我们最初对DispatchWorkItem使用如下所示:

我们可以使用DispatchSemaphore等待DispatchWorkItem完成。 信号量是异步计算中的常见概念,实质上是可以在多个执行线程之间设置的标志,用于协调对共享资源的访问。 在这种情况下,共享资源是通过Boto3对AWS连接的访问​​。

因此,我们在DispatchWorkItem要做的是在Boto3调用完成时向信号发出信号:成功时还是失败时。 信号灯会停下来等待信号–或等待直到我们在信号灯上设置的超时时间(默认为5秒)。 然后,这使Dyno重新控制了AWS连接:现在,我们可以根据自己的时间轴启动和终止连接,并在连接超时的情况下发出错误消息-而不是使程序失败。

添加信号量可以得到如下代码( 略有简化

此外,通过将整个DispatchWorkItem / DispatchSemaphore放置在DispatchQueue.global().async工作队列中以DispatchQueue.global().async ,我们可以衍生出独立的AWS连接线程,每个线程都有自己的超时,并且彼此独立运行…

…几乎:有一个最后的陷阱让我们绊倒:Swift的超安全内存模型不会让我们同时对同一个Python对象(boto3连接)运行多个调用,因为它无法证明它们不会干扰彼此。 幸运的是,有一个简单的解决方法:我们只是在自己DispatchWorkItem上运行DispatchQueue ,而不是每次都产生一个全新的线程。 这强制了对基础AWS连接的串行访问,这实际上不是一件坏事,因为它可以在数据库方面增强事务安全性。

将来如果这成为性能瓶颈,我们可以研究通过多个boto3连接进行的线程池化。

您可以在主要Dyno结构的perform函数中看到最终代码。

回到功能世界

因此,现在我们有了Perform函数,该函数将返回Observable流–让我们对DynamoDb进行一些调用!

为此,我们使用一种协议,该协议允许我们抽象地表示数据库上的操作:

实际上,上面的DispatchWorkItem代码中的perform函数是相同的:最终被调用以影响数据库。

当前有3个实现此功能的结构,并提供了Dyno的3个扩展:

  • ActionGetItem ,根据键获取单个项目
  • ActionPutItem将带有键的项放入数据库中(或使用该键更新现有项)
  • ActionScanAll会扫描表中的所有行,查找过滤器,然后返回其余部分(名称的所有部分都意味着DynamoDB将查看整个表,即使您指定了仅返回一些项目的过滤器,或没有项目)

使Dyno栩栩如生

让我们详细介绍一个Action 。 下面是ActionGetItem.perform实现(略有简化,并ActionGetItem.perform注释):

首先,您可能会注意到该函数返回DynoResult :这只是常规Result类型的类型DynoError ,但是在失败的情况下始终返回DynoError

完成每个步骤:

  • 第1步中,我们使用一个辅助函数来调用Python Boto3库。 我们不直接调用函数get_item ,而是使用boto3Call来实现,而是分别传递参数["Key":[keyField: keyValue ]] 。 我们为什么要做这个? 在boto3Call这使我们能够捕获Python抛出的所有异常,因此我们可以将它们转换为.failure值,而不会导致程序崩溃! 您可以查看boto3Call看看如何完成。
  • 注意步骤1的结果是DynoResult值。 如果结果是.success ,那么我们要继续操作返回的值。 我们在步骤2通过flatMap做到这一点。 flatMap将获取Boto 3调用的结果(名为lookup ),并确保我们确实获得了正确的返回值。

但是请注意,如果Boto 3调用返回了.failure ,那么将不会执行查找检查,而我们将从整个函数中返回.failure 。 正是这种返回处理的一致性,使得使用Result类型非常好。 我以前的文章的读者可能会注意到ResultMonad

  • 第3步中,我们称为builder 。 这是由库的用户在调用getItem函数时提供的。 在Boto3库中,返回值仅是一个字典。 这在Python中很常见,但是在Swift中我们更喜欢类型安全,因此builder允许我们将字典转换为T类型的值。 如果类型无法转换, builder可以再次返回.failure

建筑商和一个老朋友

如果您查看main功能,则会看到一个示例示例Builder,它创建了Dinosaur对象:

目前,这有点笨拙:我们将PythonObject暴露给我们的Swift代码–当然,在Swift中执行此操作的正确方法是通过Codeable对象。 我们稍后将解决此问题!

getStr是一个辅助函数,用于检查给定的字典中是否确实存在给定的键,如果存在,则将其作为.success返回; 否则将返回.failure

但是zip3zip3 ? 我也为Dyno库添加了许多zipX函数,特别是针对Result类型。 zip3有点像常规zip ,但是采用4个参数而不是2:第一个参数( with )是一个函数, 仅当其余3个参数的求值.success :如果它们中的任何一个均为.failureDinosaur.init将不会被调用。

这意味着我们可以将Result视为可应用的函子:尽管zipX提供了将Result用作可应用的直接方法,但请阅读链接的文章以获取更多信息。

侏罗纪公园🦕🦖

我们要做的最后一件事是创建一个包装对象Dyno ,该包装对象用于抽象数据库连接。 然后,我们可以在其中添加诸如getItem类的辅助函数,以启动Actions

因此,让我们旋转一下。 因为我们返回了Observable ,所以mergesubscribedispose这看起来有点复杂……但是,如果过去,您会看到我们的Dyno库被调用为setItem ,它将一个项目写入Dinosaurs数据库,然后进行scan以检索写的项目。

那些反应式Observable看起来很复杂,但实际上它们给了我们很多。 我们想同时 《恐龙》。 但是在两个写入都成功之前,我们不希望阅读Observable.merge允许我们并行运行setItem Observable流。 然后.concat等待合并的流完成,然后再运行scan

在以后的文章中,我们将看到如何将这些Observable插入UI组件中。 但是现在,我们只让log方法显示写入和读取的内容。

下次

ew,这是很多工作! 但是图书馆正在形成。 我没有涉及到几件事-例如,查看用于scan的过滤器。 但是,我们还有更多工作要做:

  • 测试! 我们到底如何测试依赖远程数据库的库?
  • 可编码让我们把最后的几只甩掉……

直到下一次!

更新-本系列的下一篇文章 现在已经发布