首页 > 其他分享 >深入理解 Swift Combine

深入理解 Swift Combine

时间:2024-04-30 22:11:07浏览次数:27  
标签:Publisher publisher receive index subscriber Combine 深入 Swift

Combine

文中写一些 Swift 方法签名时,会带上 label,如 subscribe(_ subscriber:),正常作为 Selector 的写法时会忽略掉 label,只写作 subscribe(_:) ,本文特意带上 label 以使含义更清晰。

Combine Framework

Overview

在 App 运行过程中会发生各种各样的异步事件,如网络请求的返回,Notification 的发送等。在处理这些异步事件时,我们经常会使用异步回调、代理方法等。Combine 框架提供了一种声明式的 Swift API,可以将一个异步事件的处理逻辑表示成单独的一个处理链,链上的每个节点接收上一个节点的处理结果,执行自己的处理逻辑,然后传递给下一个节点。

Combine 框架采用 publisher-subscriber 模式:

  • 协议 Publisher 表示一种能够随时间产生一系列值的类型。Combine 还为这些 Publishers 提供了许多 operators 来处理从上游接收的值,然后再重新发送到下游。
  • 在这个处理链的最末端,是由协议 Subscriber 表示的订阅者类型,接收并处理发送给它的值。
  • 当一个 Subscriber 订阅到一个 Publisher 上时,会接收到一个新生成的由协议 Subscription 表示的类型对象,Subscriber 通过该对象来向 Publisher 请求值,而 Publisher 也只有在接收到 Subscriber 的显式请求时才会分发值。

Publisher 和 Subscriber 的交互流程

  • 首先,subscriber 调用 publisher 的 subscribe(_ subscriber:) 方法,将自己作为参数传过去;subscribe(_ subscriber:) 会接着调用 publisher 的 receive(subscriber:) 方法。
  • 方法 receive(subscriber:)Publisher 协议的要求方法,所有遵循 Publisher 的类型都必须实现该方法,在这个方法里处理 subscriber 的订阅逻辑。但是使用方又不能直接调用该方法,而必须通过调用扩展方法 subscribe(_ subscriber:) 来发起订阅。
  • Publisher 接受订阅之后,会创建一个新的 Subscription 对象,然后调用 subscriber 的 receive(subscription:) 方法。协议 Subscription 约束了一个方法 request(_ demand:) ,subscriber 通过调用该方法来说明自己需要请求多少的值。只有 request 之后,publisher 才会向该 subscriber 发布值。
  • Publisher 通过调用 subscriber 的 receive(_ input:) 方法来向其发布值,并在结束时调用 subscriber 的 receive(completion:) 来进行通知。注意这里面说的调用并不一定指 publisher 对象持有 subscriber 对象,然后直接调用 subscriber 对象的上述方法,具体是否持有是具体实现细节,也有可能通过某些中间对象间接调用。Anyway,实现 Subscriber 协议的类型必须实现这两个方法(以及开头的 receive(subscription:) 方法),在这些方法中处理从 publisher 那里接收到的值。

Combine 中的 Publishers

内置 publishers

Combine 框架提供了许多内置的 publisher 类型供我们使用,如:

  • 为 Sequence 类型实现的 publisher 扩展;
  • 为 NotificationCenter 实现的 publisher(for:object:) 扩展;
  • URLSession 的 dataTaskPublisher(for:) 扩展;

Subject

Subject 给我们提供了一种向流中插入值的方式,为我们在存量命令式编程的代码中引入 Combine 提供了一个强大的工具。Subject 本身即是一个 publisher,下游可以正常去 subscribe 它,然后使用方通过调用它的 send(_ value:) 方法来发布一个值。Combine 提供了两种 subject:

  • CurrentValueSubject:如其名,会维护一个当前值,初始化时需要传入一个初始值作为当前值,后续通过调用 send(_ value:) 来更新当前值。当一个新的 subscriber 订阅时,会马上收到一次最新的当前值。
  • PassthroughSubject:不同于 CurrentValueSubject,内部没有缓存状态,每次调用 send(_ value:) 时才会向下游发布值。

@Published

该 property wrapper 修饰 Class 的某个属性,为其生成一个 publisher,使用方通过 $ 加上属性名来访问该 publisher。当属性值变化时,该 publisher 会在属性的 willSet 里发布新值,因此需要留意属性值本身尚未被更新仍是旧值,传给 subscriber 的值是新值。

Combine 中的 Subscribers

Combine 提供了两个内置的 subscribers:Subscribers.Sink 和 Subscribers.Assign,但一般不直接创建这两个的实例,而是通过 Publisher 的两种扩展方法 sink 和 assign 来获取类型抹除后的 AnyCancellable 对象。

Sink

Sink subscriber 创建的时候会立即调用 subscription 对象的 request(.unlimited) ,后面会详细介绍 Demand 的用法,这里需要留意的是一旦请求了 .unlimited 的 demand 之后,便无法再调整了,也就是说只要 publisher 不断地产生新的值,Sink 就会持续地接收到新值,直至被 cancel。

Publisher 有两个 sink 扩展方法:

  • sink(receiveCompletion:receiveValue:) :两个闭包的含义和用法不必多解释;
  • sink(receiveValue:) :只有当 Publisher 的 Failure associated type 是 Never 时才可以使用该方法。

Assign

Assign subscriber 会将接收到的值赋值给一个类对象的属性或者一个另一个 Published publisher 上,它对 publisher 的 demand 也是 .unlimited。

  • assign<Root>(to keyPath: ReferenceWritableKeyPath<Root, Self.Output>, on object: Root)
    • 因为 keyPath 类型是 ReferenceWritableKeyPath,所以 object 只能是一个类实例;
    • 注意该 Assign subscriber 会强持有 object 对象,除非上游发布了一个 completion。
  • assign(to published: inout Published<Self.Output>.Publisher)
    • 使用该 subscriber 可以将上游的值通过一个 @Published 修饰的属性重新发布;
    • 该方法没有返回值,当关联的 Published 实例析构时会自动地 cancel 掉订阅。

Publisher 的 operators

Combine 还为 publisher 添加了许多扩展方法,称为 operators,它们返回的也是一个 Publisher,因此可以进行链式调用。每个 operator 接收一个上游 publisher,处理转换上游发布的值,然后重新发布到下游。

具体的 operator 及其用法详见文档

Connectable

常用的 Subject 如 sink(receiveValue:) 会在订阅到一个 publisher 时立即发起一个 unlimited demand,对应的 publisher 如果此时有值则会马上发布,但这时使用方并不一定准备好接收并处理数据。另一种情况是如果一个 publisher 期望有多个 subscribers,但由于每个 subscriber 订阅的时机不一样,有可能当第一个 subscriber 订阅时,publisher 就已经把值给发布出去了,这样当第二个 subscriber 订阅时,只会收到一个 completion。

Combine 提供一个 ConnectablePublisher 协议来支持手动控制开始发布值的时机,遵循该协议的 Publisher 只有在显示调用 connect() 方法之后,才会开始值发布的过程,在这之前,即使满足 publisher 发布值的条件,也不会进行发布。

使用 Publisher 的 makeConnectable() operator 来将一个已有的 publisher 包装成一个 Publishers.MakeConnectable 实例,该实例便是一个 ConnectablePublisher,之后使用方便可在合适的时机调用其 connect() 方法来开启值的发布。

Combine 中有些 publisher 已经实现了 ConnectablePublisher 协议,如 Publishers.MulticastTimer.TimerPublisher 等,有时在一些使用这些 publisher 的简单场景中,显式调用 connect() 反而显得繁琐,因此 ConnectablePublisher 又提供了一个 autoconnect() operator,该操作符会在一个 subscriber 订阅它时立刻调用 connect 方法。

Demand and Back Pressure

在 Combine 中,一个 Publisher 只有在被 Subscriber 订阅并且发起要求的时候才会产生值。Subscriber 有下面两种方式来发起要求:

  • 通过调用 Subscription 对象的 request(_ demand:) 方法,Subscription 对象在发起订阅时由 receive(subscription:) 方法传入;
  • 每次 receive(_ input:) 调用时,可以返回一个新的 Demand。

Demand 表示 subscriber 需要多少值,Combine 提供的 API 里面,有 .none.unlimited , 以及指定具体数目的 .max(Int) 。Demand 是可加的,如一个 subscriber 要求了 2 个值,然后又要求了 .max(3),则其订阅的 publisher 现在共有 5 个未满足的值,每当 publisher 发布一个值,其为满足的 demand 便随之减 1,这也是唯一使其为满足的 demand 数值减少的方式,因为 Demand 不支持负值。而一旦 subscriber 要求了 .unlimited 的 demand,则后续就无法继续再同 publisher 协商了。

自定义 Subscriber

内置的 Subscriber Sink 和 Assign 都是一开始就请求了 .unlimited 的 demand,如果需要精细化地控制 publisher 发送值的 rate,可以实现一个自定义的 Subscriber,如:

class MySubscriber: Subscriber {
    typealias Input = Date
    typealias Failure = Never
    var subscription: Subscription?
    
    func receive(subscription: Subscription) {
        print("published                             received")
        self.subscription = subscription
        DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
            subscription.request(.max(3))
        }
    }
    
    func receive(_ input: Date) -> Subscribers.Demand {
        print("\(input)             \(Date())")
        return Subscribers.Demand.none
    }
    
    func receive(completion: Subscribers.Completion<Never>) {
        print ("--done--")
    }
}

自定义 Subscriber 的要点即是实现协议约束的三个方法,自定义的 subscriber 可以自己持有传入的 Subscription 对象,以实现精细化的控制。这种由 subscriber 来控制流速的行为称为 back pressure。

Back-Pressure 操作符

除了自定义 Subscriber,Combine 也提供了一些操作符给内置的 subscriber 使用以协助控制流速,这些操作符内部实现一些缓存相关的逻辑:

  • buffer:最大缓存一定数目的值,超出后丢弃或抛出错误;

  • debounce:设定一个 dueTime,假设时间为 t0 上游发布一个值,这时 debounce 不会立刻重新发布,而是创建一个重发布任务在 t0 + dueTime 之后执行(注意这里的任务是为了便于理解抽象出的概念,不代表具体实现真正地创建了一个任务,不过有可能确实是这样实现的);但如果在这期间,在时间 t1 时上游又发布了一个值,则之前的延时任务会被丢弃,会重新创建一个任务在 t1 + dueTime 之后才会重新发布,如果在这期间上游又发送了一个值,则以此类推。如:

    let bounces:[(Int,TimeInterval)] = [
        (0, 0),
        (1, 0.25),  // 0.25s interval since last index
        (2, 1),     // 0.75s interval since last index
        (3, 1.25),  // 0.25s interval since last index
        (4, 1.5),   // 0.25s interval since last index
        (5, 2)      // 0.5s interval since last index
    ]
    
    let subject = PassthroughSubject<Int, Never>()
    cancellable = subject
        .debounce(for: .seconds(0.5), scheduler: RunLoop.main)
        .sink { index in
            print ("Received index \(index)")
        }
    
    for bounce in bounces {
        DispatchQueue.main.asyncAfter(deadline: .now() + bounce.1) {
            subject.send(bounce.0)
        }
    }
    
    // Prints:
    //  Received index 1
    //  Received index 4
    //  Received index 5
    
    //  Here is the event flow shown from the perspective of time, showing value delivery through the `debounce()` operator:
    
    //  Time 0: Send index 0. (republish task0 at: 0.5)
    //  Time 0.25: Send index 1. (task0 is discarded, republish task1 at 0.25 + 0.5 = 0.75)
    //  Time 0.75: Debounce period ends, publish index 1. (execute task1)
    //  Time 1: Send index 2. (republish task2 at: 1 + 0.5 = 1.5)
    //  Time 1.25: Send index 3. (task2 is discarded, republish task3 at 1.25 + 0.5 = 1.75)
    //  Time 1.5: Send index 4. (task3 is discarded, republish task4 at 1.5 + 0.5 = 2.0)
    //  Time 2: Debounce period ends, publish index 4. Also, send index 5. (execute task4. republish task5 at: 2 + 0.5 = 2.5)
    //  Time 2.5: Debounce period ends, publish index 5. (execute task5)
    
  • throttle:设定一个 interval,每次达到时间时,会检查这一小段时间内有无值被发布,如果有的话,根据设定的 latest 参数决定将最新或最旧的值发布到下游。

  • collect:从上游接收到值时,先搜集起来,超过给定的数目或者超过给定的时间间隔之后,再把所有搜集到的值发给下游。

许多人经常搞不清 debounce 和 throttle 的区别,从上面的解释可以很清楚地看出二者的机制和差异,throttle 很稳定地定期发布一次(如有值可发布),而 debounce 如果上游频繁地发布值的话,可能要等好久才会发布一次,这也正是 debounce 的作用,比如在输入框输入文字的场景。

开源实现:OpenCombine

Apple 家的新东西都有一个特点:只能在比较新的操作系统版本上使用,比如 Combine 要求 iOS 13 以上才能使用,而且一些 API 更新可能会要求更新的 OS 版本。然而有位大佬 Sergej Jaskiewicz 开发了 Combine 的开源实现:OpenCombine,完全兼容 Combine 的 API,可以运行在老的 iOS 和 macOS 版本上,甚至支持 Windows、Linux 和 WASM。

我们来简单看一下内置的 Publisher 之一的 PassthroughSubject 的开源实现。

先从订阅流程看起,可以结合上面的图作为参照。首先是 Publisher 的扩展方法 subscribe(_ subscribe) ,其实就是简单地调用了一下具体 Publisher 类型的协议约束方法,传入参数 subscriber: receive(subscriber: subscriber) 。PassthroughSubject 的协议方法实现为:

public func receive<Downstream: Subscriber>(subscriber: Downstream)
    where Output == Downstream.Input, Failure == Downstream.Failure
{
    lock.lock()
    if active {
        let conduit = Conduit(parent: self, downstream: subscriber) // a.
        downstreams.insert(conduit) // b.
        lock.unlock()
        subscriber.receive(subscription: conduit) // c.
    } else {
        let completion = self.completion!
        lock.unlock()
        subscriber.receive(subscription: Subscriptions.empty) // d.
        subscriber.receive(completion: completion) // e.
    }
}

先看 c 处,最终调用了 subscriber 的 receive(subscription:) ,传入的 subscription 对象在 a 处创建,在 b 处被 publisher 插入到自己内部的一个 downstreams 数组中,该对象传给 subscriber 之后也会被 subscriber 对象持有,用于向 publisher 要求 demand、执行 cancel。

我们上面多次介绍到 Subscription 类型,但 Combine 并没有提供具体的实现类型,因为它其实是某个具体 publisher 的实现的一部分,这里的 Conduit 便是 PassthroughSubject 的 Subscription 实现类。我们接着看 PassthroughSubject 的另一个协议约束方法:

// PassthroughSubject 的 send(_ input:) 实现
public func send(_ input: Output) {
    lock.lock()
    guard active else {
        lock.unlock()
        return
    }
    let downstreams = self.downstreams
    lock.unlock()
    for conduit in downstreams {
        conduit.offer(input)
    }
}

// Conduit 的 offer(_ output:) 实现
override func offer(_ output: Output) {
    lock.lock()
    guard demand > 0, let downstream = downstream else {
        lock.unlock()
        return
    }
    demand -= 1 // a.
    lock.unlock()
    downstreamLock.lock()
    let newDemand = downstream.receive(output) // b.
    downstreamLock.unlock()
    guard newDemand > 0 else { return }
    lock.lock()
    demand += newDemand // c.
    lock.unlock()
}

PassthroughSubject 的 send(_ input:) 会调用 Conduit 的 offer(_ output:),在 offer 方法中,a 处将 demand 减 1,b 处调用 subscriber 的 receive(_ input:) 方法,c 处再将返回的 newDemand 加到现有 demand 上面,和前文描述的逻辑完全一致。

其他方法实现、Subscriber 的实现、各种操作符的实现可以直接翻源码,了解各个协议之间的约束之后非常简洁易读。

References

标签:Publisher,publisher,receive,index,subscriber,Combine,深入,Swift
From: https://www.cnblogs.com/jerrywossion/p/18168777

相关文章

  • 深入理解正则表达式:从入门到精通
    title:深入理解正则表达式:从入门到精通date:2024/4/3018:37:21updated:2024/4/3018:37:21tags:正则Python文本分析日志挖掘数据清洗模式匹配工具推荐第一章:正则表达式入门介绍正则表达式的基本概念和语法正则表达式是一种用于描述字符串模式的表达式,由普通......
  • 深入浅出Spring源码,终于把学Spring源码的技巧吃透了!
    前言本人从事Java架构十余年,也曾经在几家一线大厂任职多年,一直认为最难啃的当属Spring源码,为此我自己录制了一套Spring由浅入深的源码教程,根据自己多年来对于Spring源码整理的课纲一步步带你深入学习Spring源码,教程课件都打包好提供给你mian费学习!由于官方限制,对Spring源码感兴......
  • 算法训练优化的经验:深入任务与数据的力量
    引言在算法优化的世界中,理解所面对的任务不仅是起点,也是整个优化过程的核心。在这篇博客中,我将分享我在算法训练和优化中的一些经验,以及一个关于场景流估计的项目中应用的案例。我希望这些经验能帮助你在未来的项目中取得更好的成绩。1.深入理解任务和数据理解算法项目的......
  • 深入理解Python多进程:从基础到实战
    title:深入理解Python多进程:从基础到实战date:2024/4/2920:49:41updated:2024/4/2920:49:41categories:后端开发tags:并发编程多进程管理错误处理资源调度性能优化异步编程Python并发库引言在Python编程中,多进程是一种重要的并发编程方式,可以让我们充分......
  • 深入理解 C++ 中的多态与文件操作
    C++多态多态(Polymorphism)是面向对象编程(OOP)的核心概念之一,它允许对象在相同操作下表现出不同的行为。在C++中,多态通常通过继承和虚函数来实现。理解多态想象一个场景,你有一个动物园,里面有各种动物,如猫、狗、鸟等。每个动物都有自己的叫声。使用面向对象编程,我们可以创建一个......
  • 深入理解分布式共识算法(一)——2pc_3pc
    分布式事务问题通常单节点事务比较简单,Spring提供的@Transaction注解能够实现。但是在分布式场景下,比如ServiceA调用ServiceB、ServiceC,每个服务分别操作各自的数据库,如果某个服务调用成功、另外一个调用失败,就会造成数据的不一致性,这就是分布式事务问题。2PC二阶段提交......
  • 深入理解Python协程:从基础到实战
    title:深入理解Python协程:从基础到实战date:2024/4/2716:48:43updated:2024/4/2716:48:43categories:后端开发tags:协程异步IO并发编程Pythonaiohttpasyncio网络爬虫第1章:协程基础1.1协程概念介绍协程(Coroutines)是一种特殊的软件构造,它允许程序在执......
  • 深入mysql索引
    1.索引索引是对数据库表中一列或多列的值进行排序的一种结构。 MySQL索引的建立对于MySQL的高效运行是很重要的,索引可以大大提高MySQL的检索速度。索引只是提高效率的一个因素,如果你的MySQL有大数据量的表,就需要花时间研究建立最优秀的索引,或优化查询语句。简单类比一下,数据库......
  • 探索 DTD 在 XML 中的作用及解析:深入理解文档类型定义
    DTD是文档类型定义(DocumentTypeDefinition)的缩写。DTD定义了XML文档的结构以及合法的元素和属性。为什么使用DTD通过使用DTD,独立的团体可以就数据交换的标准DTD达成一致。应用程序可以使用DTD来验证XML数据的有效性。内部DTD声明如果DTD在XML文件内声......
  • 【花雕学AI】深入浅出:Kimi的八大功能,让AI不再高不可攀
    Kimi是一款功能强大的AI智能助手,拥有以下八个核心功能,可以帮助您完成各种任务,提升工作效率和生活品质。为了让普通人也能轻松理解,这里会用通俗易懂的语言进行详细的解释并举例说明。 **一.信息检索:**Kimi就像一个无所不知的智者,能够快速准确地帮您找到所需信息。只需输入......