Dyno版本:AWS,Swifter
上一次,我们做了很多设置以开始使用Amazon Web Services的DynamoDB ,包括使用Swift-Python桥,以便我们可以使用官方的AWS接口boto3与DynamoDB进行通信。
但是boto3除了基于Python并因此没有Swift类型安全性之外,还具有一些局限性:它很复杂,难以使用—并且存在同步的主要问题。
要查看该问题,请运行上次到达的代码:
…现在,请尝试关闭计算机的WiFi ,然后重新运行。 怎么了? table.scan()
行仅挂起那里30秒钟,直到出现令人讨厌的异常并且程序崩溃(带有不可恢复的致命错误)。 实际上,这不是我们希望库调用或可能具有间歇性网络连接的应用程序(例如移动应用程序¹)所期望的行为。
本文-更好的boto
Dyno库旨在做得更好。在本文中,我们将介绍如何做! 和以前一样,尽管这个想法是产生一个有用的库,但我也希望展示可以在自己的代码中使用的技术。
- 我们将使
boto3
调用异步。 这将演示信号量,工作队列和工作项的使用 - Dyno将利用新的Swift 5
Result
类型发布一个Observable结果流。 在这里,我们将演示带有复杂数据流的Observable和Reactive编程 - 我们将添加一些有用的,类型安全的方法来从DynamoDb(原生于Swift)读取和写入数据。 这说明了 我们数据类型上的 一些出色的功能构造,例如
zip
和flatMap
在本文的最后,我们将直接从Swift将Dinosaurs(当然是🦕和))写到DynamoDB数据库中,然后以异步方式读回它们,并适当考虑网络延迟。 这将成为我们Dyno库的开始。 和以前一样,该库正在公开开发中,因此您可以在github( swiftify分支)上查看源代码。
有很多事情要做,让我们开始吧!
可观察的流
正如我在上一篇文章中提到的,可观察对象是表示数据流的一种方式。 我们可以将它们连接到Reactive组件,以便能够以功能强大和声明性的方式处理数据流。 这是表示数据操作的一种非常强大的方法-我们将在以后的文章中介绍,但是现在我们将看看如何将DynamoDB交互表示为Observable。
建模数据交互的关键是要注意它们都看起来像这样:
- 要求DynamoDB做某事(扫描表,更新行等)
- 等待结果(返回200行,更新成功)…
- 或出现错误,例如 超时或数据完整性错误。
我们使用DynoActivity
数据类型的可观察流对这些阶段进行DynoActivity
:
它就像一串可观察的事物(橙色和红色的大理石代表可观察的事件)看起来像这样:
现在,我们要做的一件事是假设即使一次大型查询(例如,返回了数百行),我们也一次性获得了所有数据:我们不对输出进行“分页”。 我们将来可能会改变它²。
您可能还会注意到,我们希望我们的可观察流以多线程的方式异步工作:我们可以让多个流同时运行,有的读取数据,有的写入。
为什么我们不将Future用于这种类型的异步数据请求/响应? 使用Observable流可以非常轻松地处理诸如“显示等待图标,直到返回数据或显示错误”之类的交互模式。 对于实际应用而言,这是非常基本的。
现实检查
在创建高级Observable之前,我们需要处理以下事实:通过不可靠的连接与远程数据库进行同步接口,并使用Python接口进行引导。
具体来说,我们需要确保Dyno正在控制AWS连接上的活动,而不是将其留给Boto3的30秒同步,程序终止超时。 那么,如何在不自行控制Boto3代码的情况下使Boto3异步和多线程呢?
我们将使用DispatchSemaphores
, DispatchQueues
和DispatchWorkItems
。
DispatchWorkItems
允许我们打包工作(在这种情况下,我们perform
对Boto3的调用)并将其发送出去以在DispatchQueue
上执行。 重要的是, DispatchWorkItem
也可以在任何时候终止 -例如,在达到超时之后。 我们将使用它来强制Boto3调用正常停止,而不会在超时时崩溃整个程序。
perform
返回Result
类型的值,这是Swift 5中的新功能。 Results
是.success
(具有成功值)或.failure
(具有失败)。 稍后我们将看到,我们在Dyno的许多地方都使用了Result
,以确保我们以一致的方式报告任何错误。
我们最初对DispatchWorkItem
使用如下所示:
我们可以使用DispatchSemaphore
等待DispatchWorkItem
完成。 信号量是异步计算中的常见概念,实质上是可以在多个执行线程之间设置的标志,用于协调对共享资源的访问。 在这种情况下,共享资源是通过Boto3对AWS连接的访问。
因此,我们在DispatchWorkItem
要做的是在Boto3调用完成时向信号量发出信号:成功时还是失败时。 信号灯会停下来等待信号–或等待直到我们在信号灯上设置的超时时间(默认为5秒)。 然后,这使Dyno重新控制了AWS连接:现在,我们可以根据自己的时间轴启动和终止连接,并在连接超时的情况下发出错误消息-而不是使程序失败。
添加信号量可以得到如下代码( 略有简化 )
此外,通过将整个DispatchWorkItem
/ DispatchSemaphore
放置在DispatchQueue.global().async
工作队列中以DispatchQueue.global().async
,我们可以衍生出独立的AWS连接线程,每个线程都有自己的超时,并且彼此独立运行…
…几乎:有一个最后的陷阱让我们绊倒:Swift的超安全内存模型不会让我们同时对同一个Python对象(boto3连接)运行多个调用,因为它无法证明它们不会干扰彼此。 幸运的是,有一个简单的解决方法:我们只是在自己的DispatchWorkItem
上运行DispatchQueue
,而不是每次都产生一个全新的线程。 这强制了对基础AWS连接的串行访问,这实际上不是一件坏事,因为它可以在数据库方面增强事务安全性。
将来如果这成为性能瓶颈,我们可以研究通过多个boto3连接进行的线程池化。
您可以在主要Dyno
结构的perform
函数中看到最终代码。
回到功能世界
因此,现在我们有了Perform函数,该函数将返回Observable流–让我们对DynamoDb进行一些调用!
为此,我们使用一种协议,该协议允许我们抽象地表示数据库上的操作:
实际上,上面的
DispatchWorkItem
代码中的perform
函数是相同的:最终被调用以影响数据库。
当前有3个实现此功能的结构,并提供了Dyno的3个扩展:
-
ActionGetItem
,根据键获取单个项目 -
ActionPutItem
将带有键的项放入数据库中(或使用该键更新现有项) -
ActionScanAll
会扫描表中的所有行,查找过滤器,然后返回其余部分(名称的所有部分都意味着DynamoDB将查看整个表,即使您指定了仅返回一些项目的过滤器,或没有项目)
使Dyno栩栩如生
让我们详细介绍一个Action
。 下面是ActionGetItem.perform
实现(略有简化,并ActionGetItem.perform
注释):
首先,您可能会注意到该函数返回DynoResult
:这只是常规Result
类型的类型DynoError
,但是在失败的情况下始终返回DynoError
。
完成每个步骤:
- 在第1步中,我们使用一个辅助函数来调用Python Boto3库。 我们不直接调用函数
get_item
,而是使用boto3Call
来实现,而是分别传递参数["Key":[keyField: keyValue ]]
。 我们为什么要做这个? 在boto3Call
这使我们能够捕获Python抛出的所有异常,因此我们可以将它们转换为.failure
值,而不会导致程序崩溃! 您可以查看boto3Call
看看如何完成。 - 注意步骤1的结果是
DynoResult
值。 如果结果是.success
,那么我们要继续操作返回的值。 我们在步骤2通过flatMap
做到这一点。flatMap
将获取Boto 3调用的结果(名为lookup
),并确保我们确实获得了正确的返回值。
但是请注意,如果Boto 3调用返回了
.failure
,那么将不会执行查找检查,而我们将从整个函数中返回.failure
。 正是这种返回处理的一致性,使得使用Result
类型非常好。 我以前的文章的读者可能会注意到Result
是Monad 。
- 在第3步中,我们称为
builder
。 这是由库的用户在调用getItem
函数时提供的。 在Boto3库中,返回值仅是一个字典。 这在Python中很常见,但是在Swift中我们更喜欢类型安全,因此builder
允许我们将字典转换为T
类型的值。 如果类型无法转换,builder
可以再次返回.failure
。
建筑商和一个老朋友
如果您查看main
功能,则会看到一个示例示例Builder,它创建了Dinosaur
对象:
目前,这有点笨拙:我们将
PythonObject
暴露给我们的Swift代码–当然,在Swift中执行此操作的正确方法是通过Codeable
对象。 我们稍后将解决此问题!
getStr
是一个辅助函数,用于检查给定的字典中是否确实存在给定的键,如果存在,则将其作为.success
返回; 否则将返回.failure
。
但是zip3
是zip3
? 我也为Dyno库添加了许多zipX
函数,特别是针对Result
类型。 zip3
有点像常规zip
,但是采用4个参数而不是2:第一个参数( with
)是一个函数, 仅当其余3个参数的求值.success
:如果它们中的任何一个均为.failure
则Dinosaur.init
将不会被调用。
这意味着我们可以将Result
视为可应用的函子:尽管zipX
提供了将Result用作可应用的直接方法,但请阅读链接的文章以获取更多信息。
侏罗纪公园🦕🦖
我们要做的最后一件事是创建一个包装对象Dyno
,该包装对象用于抽象数据库连接。 然后,我们可以在其中添加诸如getItem
类的辅助函数,以启动Actions
。
因此,让我们旋转一下。 因为我们返回了Observable
,所以merge
, subscribe
和dispose
这看起来有点复杂……但是,如果过去,您会看到我们的Dyno库被调用为setItem
,它将一个项目写入Dinosaurs数据库,然后进行scan
以检索写的项目。
那些反应式Observable
看起来很复杂,但实际上它们给了我们很多。 我们想同时写 《恐龙》。 但是在两个写入都成功之前,我们不希望阅读 。 Observable.merge
允许我们并行运行setItem
Observable流。 然后.concat
等待合并的流完成,然后再运行scan
。
在以后的文章中,我们将看到如何将这些Observable
插入UI组件中。 但是现在,我们只让log
方法显示写入和读取的内容。
下次
ew,这是很多工作! 但是图书馆正在形成。 我没有涉及到几件事-例如,查看用于scan
的过滤器。 但是,我们还有更多工作要做:
- 测试! 我们到底如何测试依赖远程数据库的库?
- 可编码让我们把最后的几只甩掉……
直到下一次!
更新-本系列的下一篇文章 现在已经发布 !