Swift并行框架学习笔记

2022/12/18

Swift 并行框架

这篇文章介绍了Swift最新的并行(Concurrency)框架,针对已有Swift基础的人,许多专业术语不再作单独解释,如果遇到不懂的专业术语可以参考相关资料;如果不断地有不明白的地方,可能你并没有足够的基础来学习async/await,不妨考虑先把基础打牢;如果你的职位已经在用async/await而你还是一头雾水,不妨考虑换个职业 - 以人生的长远来看,这未必是件坏事。

这里不会像有些教程那样,也许是为了增加篇幅,先用一个章节来介绍技术发展历史。如果你真想回顾,可以参考相关资料。我们首先来看一段代码:

func fetchSomething(completion: @escaping (Result<Something, SomeError>) -> Void) {
  UISession.shared
    .dataTask(
      with: URL(string: "SOME URL HERE")!
    ) { data, response, error in
      let result = // decoding
      completion(result)
    }
    .resume()
}

fetchSomething { result
  switch result {
    case .success(let something):
      // …
    case .failure(let error):
      // …
  }
}

以上代码从某处获取某个东西,完成后无论成功还是失败,都会有一个回调,无论是代码中的闭包还是代理方法(delegation),如果你觉得上面的代码还算工整,那么继续看下面这段表达人生追名逐利的代码:

func performLife() {
  fetchFame { fameResult
    switch fameResult {
      case .result(let fame):
        fetchFortune { fortuneResult
          switch fortuneResult {
            case .result(let fortune):
            print("名利双收")
            case .failure(let fortuneError):
            print("只有名没有利")
          }
        }
      case .failure:
        print("无名无利")
    }
  }
}

以上代码先获取“名”,成功后继续获取“利”,并对“名”与“利”的失败各自处理。因为获取名和利的两个闭包嵌在一起,这种梯田似的代码看起来很乱,尤其是屏幕不大的时候,多个镶嵌的闭包会导致断行,让本就不整洁的代码更加凌乱。针对这个问题,用async/await可以达到的效果是:

Task {
  let fame = try await fetchFame()
  let fortune = try await fetchFortune()
}

以上代码依然是先获取“名”再获取“利”,而具体的fetch方法则只要写成:

func fetchFame() async throws -> Fame {
  let (data, _) = try await URLSession.shared.data(
    from: URL("SOMEURL")!
  )
  return decodeFame(from: data)
}

以上代码先后获得“名”与“利”,如果需要同时并行获得两者,可以在let前加上async,并且用到“名”或“利”的时候加上await

async let fame = fetchFame()
async let fortune = fetchFortune()

let life = await [fame, fortune]

下面来看看async/await相关的不同型态

async/await的几种型态

异步函数

定义一个async函数,需要在函数声明的返回值箭头前加上async关键字,如果函数可能抛出错误,则加上throws

async函数允许其内部使用await,调用此函数时需要在函数前加上await,如果函数可能抛出错误,再加上try

func myFunction() async throws -> MyObject {
  await fetchData()
}

let myObject = try await myFunction()

程序运行到await关键字的地方,可能会抛弃当前线程,这是程序的潜在暂停点(Suspend Point)

异步只读属性

定义一个异步属性,在此变量的getter前加async。异步属性必须是只读(read-only),Swift 5.5开始异步属性可以抛出错误,把throws写在async后面。

var thumbnail: UIImage? {
  get async throws {
    try await self.downloadThumbnail()
  }
}

相对于getter来说,setter需要考虑更多的事情,比如inout行为,didSetwillSet应该何时被调用等等。如果只是单纯的需要对某个属性以异步方式进行设置,可以通过一个异步函数:

var value: Int

func updateValue(newValue: Int) async throws {
  await validateValue(newValue)
  try checkShouldWrite(newValue)
  value = newValue
}

函数异步参数

作为函数的参数,需要把async加到参数后面:

func doSomething(worker: (Work) async -> Void) -> Outcome {
  // …
}

doSomething { work in
  return await process(work)
}

协议中的属性

协议中的异步属性:

protocol Patient {
    var isRecovered: Bool { get async throws }
}

异步序列

for try await value in someAsyncSequence {
  process(value)
}

升级回调函数为异步函数

如需要把基于回调(completion callback)的函数转换为异步函数,基本的方式包括下面两步:

  1. 去掉回调参数
  2. 加上async修饰

如最开始的fetchSomething

func fetchSomething(completion: @escaping (Result<Something, SomeError>) -> Void)

转换为异步函数则成为:

func fetchSomething() async throws -> Something

有些函数除了回调参数外,还有一个返回值,如:

class URLSession {
  func dataTask(
    with url: URL,
    completionHandler: @escaping (Data?, URLResponse?, Error?) -> Void
  ) -> URLSessionDataTask
}

Apple在为其创建异步版本时选择忽略返回值:

func data(
  from url: URL,
  delegate: URLSessionTaskDelegate? = nil
) async throws -> (Data, URLResponse)

因为一旦某个函数转变为异步函数,调用它的函数通常也得变成异步函数,Apple的建议是从最底层的函数开始转换,并提供了一些辅助措施,如给非异步回调函数加一个注解,告诉编译器此函数存在一个异步版本。拿之前的fetchSomething为例:

@completionHandlerAsync("fetchSomething()")
func fetchSomething(completion: @escaping (Result<Something, SomeError>) -> Void)

在异步函数中以同步方式调用fetchSomething则会有一个警告,告诉你可以用它的异步版本。

如果回调函数的回调参数并非最后一个参数,还可以在注解中指定回调参数的位置:

@completionHandlerAsync("fetchSomething(for:)", completionHandlerIndex: 0)
func fetchSomething(completion: @escaping (Result<Something, SomeError>), for user: User)

Continuation

如果异步函数中需要用到同步回调函数,可以使用Swift提供的一组全局函数来暂停当前任务并捕获当前的续体(Continuation):

func fetchSomething() -> async throws Something {
  try await withCheckedThrowingContinuation { continuation in
    fetchSomething { result in
      switch result {
        case .success(let something):
          continuation.resume(returning: something)
        case .failure(let error):
          continuation.resume(throwing: error)
      }
    }
  }
}

以上代码把对同步函数fetchSomething的调用包装在try await withCheckedThrowingContinuation当中,拿到结果后根据成功或者失败,调用不同的continuation.resume方法回传结果。

以上continuation有不同的版本:

func withUnsafeContinuation<T>(
  _ fn: (UnsafeContinuation<T, Never>) -> Void
) async -> T

func withUnsafeThrowingContinuation<T>(
  _ fn: (UnsafeContinuation<T, Error>) -> Void
) async throws -> T

func withCheckedContinuation<T>(
  function: String = #function,
  _ body: (CheckedContinuation<T, Never>) -> Void
) async -> T

func withCheckedThrowingContinuation<T>(
  function: String = #function,
  _ body: (CheckedContinuation<T, Error>) -> Void
) async throws -> T

传统代码中Delegate协议很常见,如果要转换成异步方式,一种常用的写法是将continuation保存起来:

protocol ValidationDelegate {
  func validationDidSucceed(result: Validation)
  func validationDidFail(error: Error)
}

final class Validator: ValidationDelegate {
  
  private var continuation: CheckedContinuation<Validaiton, Error>?

  func validate(_ input: Input) async throws -> Validation {
    try await withCheckedThrowingContinuation { continuation in
      self.continuation = continuation
      performValidation(for: input, delegate: self)
    }
  }

  func validationDidSucceed(result: Validation) {
    continuation?.resume(returning: result)
    continuation = nil
  }

  func validationDidFail(error: Error) {
    continuation?.resume(throwing: error)
    continuation = nil
  }
}

Task

Task可以用来定义一组并行的工作,类似DispatchQueue,但更简洁且提供了更丰富的功能。Task创立后会立刻执行,不需要你告诉它开始。这就意味着创建Task的时候必须是可以执行的时候。

let downloadTask = Task { () -> MyModel? in
  let dataURL = URL(string: "https://www.mydata.com/random")

  // 开始前检查此任务是否已经被取消
  try Task.checkCancellation()

  let (data, _) = try await URLSession,shared.data(from: dataURL)

  // 下载之后再检查一下此任务是否已被取消
  // 这样可以防止不必要的数据处理
  try Task.checkCancellation()

  let model = try JSONDecoder().decode(MyModel.self, from: data)

  return model
}

异步序列 - Async Sequence

首先要理解序列,我们以一些正经网页里时不时跳出来的黄色小广告为例,往往图片旁边都有一个连结叫做“下一个”,你不停的点,就不停的给你下一张图片。

同步序列,就如同这些图片都存在本地,点“下一个”之后,系统直接从本地硬盘读取下一张图片,没有延时;而异步序列,图片都存在于网络,点“下一个”之后,系统必须从网络上读取下一张图片,根据网速、图片服务器的位置等因素而会有一定的延时,且可能加载失败。

同步序列

public protocal Sequence {
  associatedtype Iterator: IteratorProtocol
  func makeIterator() -> Self.Iterator
}

public protocol IteratorProtocol {
  associatedtype Element
  mutating func next() -> Self.Element?
}

实现本地小广告:

struct LocalPics: Sequence {
  struct Iterator: IteratorProtocol {
    var index = 0

    mutating func next() -> Photo? {
      let photo = loadPhoto(at: index)
      index += 1
      return photo
    }
  }

  func makeIterator() -> Iterator {
    .init()
  }
}

异步序列

public protocol AsyncSequence {
  associatedtype AsyncIterator: AsyncIteratorProtocol
  func makeAsyncIterator() -> Self.AsyncIterator
}

protocol AsyncIteratorProtocol {
  associatedtype Element
  mutating func next() async throws -> Self.Element?
}

实现网络小广告:

struct OnlinePics: AsyncSequence {
  typealias Element = Photo
  struct AsyncIterator: AsyncIteratorProtocol {
    var index = 0
    
    mutating func next() async throws -> Photo? {
      defer { index += 1 }
      return try await loadOnlinePhoto(at: index)
    }
  }

  func makeAsyncIterator() -> AsyncIterator {
    .init()
  }
}

有了基本的理解,我们就可以用遍历序列的常用方法了:

for try await photo in OnlinePics() {
  slideShow(photo)
}

因为序列协议中提供的是返回“下一个”值的方法,以上代码会被编译器翻译为使用next()的版本:

var iterator = OnlinePics().makeAsyncIterator()
while let photo = try await iterator.next() {
  // …
}

许多同步序列的扩展方法对异步序列来说也是存在的,下面代码创建一个新序列,从异步小广告序列中挑出前10张评分至少四星的照片并返回放大后的照片:

let suggestedPhotos = OnlinePics()
  .filter { $0.rating >= 4 }
  .prefix(10)
  .map { $0.zoomed() }

for try await photo in suggestedPhotos {
  slideShow(photo)
}

新序列的类型将会是:

AsyncMapSequence<
  AsyncPrefixSequence<
    AsyncFilterSequence<
      OnlinePics
    >
  >
  , Photo
>

如果需要定义这样一个类型,我们可以简单写成some AsyncSequence

var suggestedPhotos: some AsyncSequence {
  OnlinePics()
  .filter { $0.rating >= 4 }
  .prefix(10)
  .map { $0.zoomed() }
}

Swift具有高度可扩展性,让我们可以把异步序列包装起来并加入自己的方法。下面的代码可以传入一个作为副作用(Side Effect)的参数:

struct AsyncSideEffectSequence<Base: AsyncSequence>: AsyncSequence {
  typealias Element = Base.Element

  private let base: Base
  private let block: (Element) -> ()

  init(_ base: Base, block: @escaping (Element) -> ()) {
    self.base = base
    self.block = block
  }

  func makeAsyncIterator() -> AsyncIterator {
    return AsyncIterator(
      base.makeAsyncIterator(),
      block: block
    )
  }

  struct AsyncIterator: AsyncIteratorProtocol {
    private var base: Base.AsyncIterator
    private let block: (Element) -> ()

    init(
      _ base: Base.AsyncIterator,
      block: @escaping (Element) -> ()
    ) {
      self.base = base
      self.block = block
    }

    mutating func next() async throws -> Base.Element? {
      let value = try await base.next()
      if let value = value {
        block(value)
      }
      return value
    }
  }
}

对异步序列的操作,许多需要调用next(),因此这些方法本身也是异步函数,如:

extension AsyncSequence {
  func contains(
    where predicate: (Self.Element) async throws -> Bool
  ) async rethrows -> Bool

  func first(
    where predicate: (Self.Element) async throws -> Bool
  ) async rethrows -> Self.Element?

  func reduce<Result(
    _ initialResult: Result,
    _ nextPartialResult:
      (_ partialResult: Result, Self.Element) async throws -> Result
  ) async rethrows -> Result
}

比如实现contains方法:

extension AsyncSequence {
  func _contains(
    where predicate: (Self.Element) async throws -> Bool
  ) async rethrows -> Bool {
    for try await v in self {
      if try await predicate(v) {
        return true
      }
    }
    return false
  }
}

AsyncStream

之前提到用continuation的要求是resume必须且仅需被调用一次,但针对AsyncSequence往往需要对序列中的值多次回调,这时候就要用到AsyncStream类型,通过在其提供的Continuation上调用函数来控制异步函数的执行:

struct AsyncStream<Element> {
  init(
    _ elementType: Element.Type = Element.self,
    bufferingPolicy limit: BufferingPolicy = .unbound,
    _ build: (AsyncStream<Element>.Continuation) -> Void
  )

  struct Continuation {
    func yield(_ value: Element) -> YieldResult
    func finish()
    // …
  }

  // …
}

比如我们模仿水滴石穿的效果:

enum Effect {
  case 水滴
  case 石穿
}

var waterStream: AsyncStream<Effect> {
  AsyncStream<Effect> { continuation in
    let start = Date.now

    Task {
      let timer = Timer.scheduledTimer(
        withTimeInterval: 1.0,
        repeats: true
      ) { timer in
        print("yield")
        continuation.yield(.水滴)

        if Date.now.timeIntervalSince(start) > 99 {
          print("yield")
          continuation.yield(.石穿)

          print("finish")
          continuation.finish()
        }
      }

      // 异步序列结束时做一些清理工作
      continuation.onTermination = { @Sendable state in
        timer.invalidate()
      }
    }
  }
}

创建AsyncStream时可以指定几个参数:

enum BufferingPolicy {
  case unbounded
  case bufferingOldest(Int)
  case bufferingNewest(Int)
}

顾名思义,.bufferingOldest(Int)将在达到上限后放弃新来的值,而.bufferingNewest(Int)则在达到上限后放弃最老的值。

为了解决以上问题,AsyncStream还提供了一个直接返回数据元素的初始化方法:

struct AsyncStream<Element> {
  init(
    unfolding produce: @escaping () async -> Element?,
    onCancel: (@Sendable () -> Void)? = nil
  )
}

参数unfolding将被作为序列迭代器(Iterator)的next函数,每当有await请求值时,它就会被调用来产生一个新的值。同样是实现水滴石穿的效果,以下代码不使用Timer计时器:

AsyncStream<Effect> {
  await Task.sleep(NSEC_PER_SEC)
  if counter < THRESHOLD {
    return .水滴
  }
  return .石穿
} onCancel: { @Sendable in
  print("Cancelled")
}

使用异步序列的一个基本原则,是不要在不同的任务上下文中访问同一个序列。,如以下代码就会产生错误:

let someStream = MyStream()

Task.detached {
  for await value in someStream {}
}

Task.detached {
  for await value in someStream {}
}

(看到这里时你可能会问Task.detach是干嘛的,其实我学的时候也不知道……这就是很多教材的通病,即便是针对初学者的教材,也是经验丰富的人来写,难免会把自己的经验展示出来而忘了读者是一无所知的初学者。)

传统API的异步适配

Apple对一些常用的系统框架中基于回调的函数进行了异步适配:

之前:

let task = URLSession.shared.dataTask(with: url) { data, response, error in
  guard let data = data else {
    // error handling
    return
  }
  // data handling
}

task.resume()

更精细的控制则需要一个自定义参数的Session,并指定一个delegate

let session = URLSession
  (configuration: .default,
   delegate: MyDelegate(),
   delegateQueue: nil)
let task = session.dataTask(with: url)
task.resume()

而异步URLSession则简洁许多:

extension URLSession {
  func data(
    from url: URL,
    delegate: URLSessionTaskDelegate? = nil
  ) async throws -> (Data, URLResponse)
}

let (data, response) = try await URLSession.shared.data(from: url)
// 使用 data

URLSession异步方法的好处,除了异步本身带来的简介外,还有以下两点:

  1. 网络请求直接开始,不需要调用.resume()。这样一来,不会像非异步方法创建dataTask时需要占用资源,直到resume()之后才能被释放。
  2. 针对每个task指定delegate,而非针对整个session,可以更加方便的对不同的task进行不同的delegate操作。

另外,对于传统URLSessionTaskDelegate中收到部分数据的回调urlSession(_:dataTask:didReceive:),也有相应的并行方式,把需要接收的数据作为一个异步序列来处理:

extension URLSession {
  func bytes(
    from url: URL,
    delegate: URLSessionTaskDelegate? = nil
  ) async throws -> (URLSession.AsyncBytes, URLRespnose)
}

以上方法并不会等待所有数据收到后才返回,而是直接返回一个AsyncBytes,从名字不难看出,这是一个代表数据每个字节的异步序列。实际应用中往往用于读取网络数据的开头部分,比如根据开头的格式信息来判断网络请求返回的数据是否是需要的格式,而不需要下载完整的数据后再做判断。因此,Apple提供了一系列扩展方法,在字节的基础上,可以读取字符,字符串等等:

extension AsyncSequence where Self.Element == UInt8 {
  var lines: AsyncLineSequence<Self> { get }
  var characters: AsyncCharacterSequence<Self> { get }
  var unicodeScalars: AsyncUnicodeScalarSequence<Self> { get }
}

因为异步序列十分方便,因此URL以及许多IO相关的比如本地文件读取的方法中,也加入了类似的方法:

let url = URL(string: "https://myurl.com")!
for try await line in url.lines {
  print(line)
}

在异步的前提下,URLSessionDataDelegate也有相应的改动,比如此前的非异步delegate方法会给你一个用来回调的闭包:

extension ViewModel: URLSessionDataDelegate {
  func urlSession(
    _ session: URLSession,
    dataTask: URLSessionDataTask,
    didReceive response: URLResponse,
    completionHandler: @escaping (URLSession.ResponseDisposition) -> Void
  ) {
    guard let scheme = response.url?.scheme,
      scheme.starts(with: "https") else {
      completionHandler(.cancel)
      return
    }
    completionHandler(.allow)
  }
}

以上代码的一个问题,是编译器无法确保completionHandler一定会被调用,异步方法的实现则避免了这个问题:

func urlSession(
  _ session: URLSession,
  dataTask: URLSessionDataTask,
  didReceive response: URLResponse
) async -> URLSession.ResponseDisposition {
  // 此处必须返回一个值,否则编译会报错
}

NotificationCenter也有相应的异步适配:

extension NotificationCenter {
  func notifications(
    named name: Notification.Name,
    object: AnyObject? = nil
  ) -> NotificationCenter.Notifications

  class Notifications: AsyncSequence {
    typealias Element = Notification
    // …
  }
}

具体使用时:

Task {
  let backgroundNotifications =
    NotificationCenter.default.notifications(
      named: UIApplication.didEnterBackgroundNotification,
      object: nil
    )
  for await notification in backgroundNotifications {
    // 处理notification
  }
}

以上代码有个需要注意的问题,因为Taskfor await会导致程序暂停,并且把尚未执行的部分作为续体,这会导致其上下文被持续持有,如果这个说法听起来有点别扭,英文的说法是"their context will be captured and retained"。意思是说,self即便没有写出来,但会被捕获而无法得到释放,在UIViewController里使用以上代码,等于是把self交给一个没有明确完结的通知。要解决这个问题,则要看实际需求,比如一个事件的通知我们只需要收到一次进行处理,不需要处理接下来的每一个通知:

for await notification in backgroundNotifications {
  // 处理notification
  break
}

或者明确表示只需要第一个通知:

if let notification = await backgroundNotifications
  .first(where: { _ in true }
) {
  // 处理notification
}

还可以在不需要的时候取消这个Task

let task = Task {
  let backgroundNotification = //…
  for await // …
}

// …

deinit() {
  task.cancel()
}

结构化并发

和同步操作相比,异步操作存在不确定性,在并发的情况下,如何保证代码路径的单一入口和单一出口,便是结构化并发要解决的问题。比如最开始的例子,要获取名和利:

async let fame = fetchFame()
async let fortune = fetchFortune()

let life = await [fame, fortune]

即便是fetch名与利不一定谁先返回,谁比谁快,但可以保证最后的人生包含了二者。

结构化并发需要异步函数,进而需要某个任务上下文(Task Context):

func start() {
  Task {
    // fetchData的上下文环境便是此Task
    await fetchData()
  }
}

func fetchData() async {
  withUnsafeCurrentTask { task in
    if let task = task {
      // 此处可检查task是否已被取消
    }
  }
}

Swift针对当前任务提供了一个简单方法用来查看状态:

extension Task where Success == Never, Failure == Never {
  static var isCancelled: Bool { get }
  static var currentPriority: TaskPriority { get }
}

在异步函数中,可以通过withTaskGroup为当前的任务添加结构化的并发子任务:

struct StudentEnrolment {

    func start() async throws {
        print("Start student enrolment.")
        await withTaskGroup(of: Int.self) { group in
            for studentId in 0..<3 {
                group.addTask {
                    await enrol(studentId)
                }
            }
            print("Students enrolment tasks added.")
            for await result in group {
                print("Enrolment result: \(result)")
            }
            print("Enrolments results received.")
        }
        print("Stop student enrolment.")
    }

    private func enrol(_ studentId: Int) async -> Int {
        print("Registering student ID \(studentId)")
        try! await Task.sleep(nanoseconds: UInt64(1 + studentId) * 1_000_000_000)
        print("Registration done for student ID \(studentId).")
        return studentId
    }
}

Task {
    try await StudentEnrolment().start()
}

输出:

Start student enrolment.
Students enrolment tasks added.
Registering student ID 0
Registering student ID 1
Registering student ID 2
Registration done for student ID 0.
Enrolment result: 0
Registration done for student ID 1.
Enrolment result: 1
Registration done for student ID 2.
Enrolment result: 2
Enrolments results received.
Stop student enrolment.

从以上输出可以看出,通过addTask方法添加的三个子任务,在获得资源后立刻开始执行,而group遵循AsyncSequence,可以使用for await来获取子任务的执行结果;子任务完成后,结果被放在异步序列的缓冲区里,在下一个next时被返回。

三个异步操作并发进行,各自拥有独立的子任务空间,在group作用域结束时汇集在一起。

值得注意的是,以上代码即便没有for await那部分,编译器在检测到结构化并发作用域结束时,会自动加上await来等待所有任务结束后再继续。必入上面代码中的for await部分如果不存在:

struct StudentEnrolment {

    func start() async throws {
        print("Start student enrolment.")
        await withTaskGroup(of: Int.self) { group in
            for studentId in 0..<3 {
                group.addTask {
                    await enrol(studentId)
                }
            }
            print("Students enrolment tasks added.")
//            for await result in group {
//                print("Enrolment result: \(result)")
//            }
            print("Enrolments results received.")
        }
        print("Stop student enrolment.")
    }

    private func enrol(_ studentId: Int) async -> Int {
        print("Registering student ID \(studentId)")
        try! await Task.sleep(nanoseconds: UInt64(1 + studentId) * 1_000_000_000)
        print("Registration done for student ID \(studentId).")
        return studentId
    }
}

Task {
    try await StudentEnrolment().start()
}

运行输出将会是:

Start student enrolment.
Students enrolment tasks added.
Enrolments results received.
Registering student ID 0
Registering student ID 1
Registering student ID 2
Registration done for student ID 0.
Registration done for student ID 1.
Registration done for student ID 2.
Stop student enrolment.

也就是说,以上代码相当于:

struct StudentEnrolment {

    func start() async throws {
        print("Start student enrolment.")
        await withTaskGroup(of: Int.self) { group in
            for studentId in 0..<3 {
                group.addTask {
                    await enrol(studentId)
                }
            }
            print("Students enrolment tasks added.")
//            for await result in group {
//                print("Enrolment result: \(result)")
//            }
            print("Enrolments results received.")

            // 编译器自动加入的代码
            for await _ in group {}
        }
        print("Stop student enrolment.")
    }

    private func enrol(_ studentId: Int) async -> Int {
        print("Registering student ID \(studentId)")
        try! await Task.sleep(nanoseconds: UInt64(1 + studentId) * 1_000_000_000)
        print("Registration done for student ID \(studentId).")
        return studentId
    }
}

Task {
    try await StudentEnrolment().start()
}

一个常见的错误是在addTask闭包里使用闭包外的变量,因为变量可能在并发操作之外被改变,并发闭包内访问就会导致不安全。可以考虑把变量声明为不可变的let,或者在闭包开始时捕获此变量的值([classroom] in

func start() async throws {
    print("Start student enrolment.")
    await withTaskGroup(of: Int.self) { group in
        var classroom = 1
        for studentId in 0..<3 {
            group.addTask {
                await enrol(studentId, classrom: classrom)
                classroom += 10
            }
        }
        print("Students enrolment tasks added.")
        for await result in group {
          print("Enrolment result: \(result)")
        }
        print("Enrolments results received.")
    }
    print("Stop student enrolment.")
}

为了进一步简化结构化并发,Swift提供了async let用来创建子任务:

func startEnrolment() async {
    async let result1 = enrol(1)
    async let result2 = enrol(2)
    async let result3 = enrol(3)
    let results = await [result1, result2, result3]
    print("students \(results) enrolled.")
}

async let看起来和let相似,同样是定义一个本地常量,通过等号右侧的表达式来初始化此常量,区别在于这个初始化的表达式必须是一个异步函数调用。

TaskGroup一样,如果没写await代码,编译器也会自动生成,来满足结构化并发(可简单理解为有始有终)。以下代码:

func start() async {
  async let result = doSomeWork()
}

相当于:

func start() async {
  async let result = doSomeWork()

  // 以下是自动生成的代码:
  result.task.cancel()
  _ = await result
}

除了TaskGroup.addTaskasync let之外,还可以使用Task.initTask.detached来创建新任务:

func start() async {
  Task {
    await doSomeWork()
  }
  Task.detached {
    await doSomeWork()
  }
}

创建非结构化的任务,可以得到一个具体的值:

extension Task {
  var value: Success { get async throws }
}

extension Task where Failure == Never {
  var value: Success { get async }
}

需要访问这个值则要await

func start() async {
  let task = Task { await doSomeWork() }
  let dtask = Task.detached { await doSomeWork() }

  let value1 = await task.value
  let value2 = await dtask.value
}

这类非结构化并发,外层Task取消并不会传递到内层Task:

let outer = Task {
  let inner = Task {
    await doSomeWork()
  }
  await doSomeWork()
}

outer.cancel()

outer.isCancelled // true
inner.isCancelled // false

Swift并发和任务的取消是基于协作式(coorperative)的,对一个任务调用cancel(),会做以下两件事:

对取消任务的处理需要任务本身来负责:

func count() async throws -> String {
    var s = ""
    for i in 0..<10 {
        guard !Task.isCancelled else {
            return s
        }
        try await Task.sleep(nanoseconds: NSEC_PER_SEC)
        print("Appending \(i)")
        s.append("\(i)")
    }
    return s
}

func start() async throws {
    let t = Task {
        let value = try await count()
        print(value)
    }

    try await Task.sleep(nanoseconds: UInt64(2) * NSEC_PER_SEC)
}

输出:

Appending 0
Appending 1
01

对于被取消的任务,根据需要,可以返回一个空值(nil),也可以返回一个Result,还可以抛出一个CancellationError错误。CancellationError是定义在系统库里的一个特殊的错误类型,只用来表示任务被取消。而相应的,系统也有一个检查任务是否取消,取消则抛出此错误的方法:

func doSomeWork() async throws -> String {
  var s = ""
  for i in 0..<10 {
    try Task.checkCancellation()
    // …
  }
}

早期的Thread.sleep(_:)不支持取消,所以现在基本都用一个新的可以取消的版本:

extension Task where Success == Never, Failure == Never {
  static func sleep(nanoseconds duration: UInt64) async throws
}

被取消时,sleep(nanoseconds:)会直接中断并抛出CancellationError。另外,有时候需要做一些清理工作,一个常见的做法是一开始用defer来写清楚需要清理的东西:

Task {
  defer {
    // cleanup
  }
  async let v = doSomeWork()
}

需要注意的是,defer只有在异步操作返回或者抛出错误时才会被触发,比如用checkCancellation来检查是否当前任务被取消,在检查之前,如果任务已经被取消,则不会立刻触发defer。需要更加及时的处理任务取消,可以用withTaskCancellationHandler

func withTaskCancellationHandler<T>(
  operation: () async throws -> T,
  onCancel handler: @Sendable () -> Void
) async rethrows -> T
func asyncWork() async throws -> String {
  let observer = Observer()
  return try await withTaskCancellationHandler {
    observer.start()
    return try await withUnsafeThrowingContinuation { continuation in
      observer.waitForNextValue { value, error in
        if let value = value {
          continuation.resume(returning: value)
        } else {
          continuation.resume(throwing: error)
        }
      }
    }
  } onCancel: {
    observer.stop()
  }
}

因为结构化并发中存在隐式await,所以建议无论是否需要子任务的返回值,都应该写出对group的等待操作。比如下面代码:

let t = Task {
  do {
    try await withThrowingTaskGroup(of: Int.self) { group in
      group.addTask { try await doSomeWork() }
      group.addTask { try await doSomeWork() }
      // 以下这行代码确保任务t被取消后可以正常捕获错误
      // 否则被取消时,错误只在内部被消化
      try await group.waitForAll()
    }
  } catch {
    // …
  }
}

Actor

Actor要解决的问题,举例来讲,比如多人共用一个本子,每次需要在本子上写圆周率的下一位数字,比如目前的数字是3,下一个应该是小数点后的1,如何保证不会出现两个人同时写而错误的写成3.11。

历史上解决此问题的方法,有给本子上锁,每次写字的人需要拿到钥匙,有钥匙才能写,写完后把钥匙交给下一个人,这样的结果是可能需要写字的人排了长队,等待钥匙,影响了效率。

Actor的思路,顾名思义,指派一个专人负责往本子上写字,人们可以把自己的写字请求交给此人,此人再一件件处理拿到的请求,这样既可以保证不会有两个人同时写出3.11的问题,也确保写字的人不需要等上一个人完成而增加了效率。

actor Pie {
  var value: String = "3.1"

  init() {}

  func add() -> String {
    switch value {
      case "3.1": value.append("4")
      case "3.14": value.append("1")
      case "3.141": value.append("5")
      case "3.1415": value.append("9")
      case "3.14159": value.append("2")
      case "3.141592": value.append("6")
      // …
      default: break
    }
    return value
  }
}

因为有专员负责更新内部状态, 在外部更改内部状态则会出错:

let pie = Pie()
pie.value = "3.1415926"
// Error:
// Actor-isolated property 'value' can not be
// mutated (referenced) from a non-isolated context

从actor外部引用actor隔离域中的生命必须在异步时才可以进行:

func makePie() async {
  let pie = Pie()
  let next = await pie.add()
  print("Next pie value: \(next)")
  print(await pie.value)
}

所有的actor类型都隐式的遵循Actor协议:

protocol Actor: AnyObject, Sendable {
  nonisolated var unownedExecutor: UnownedSerialExecutor { get }
}

因为actor类型默认的声明都被隔离在其域中,需要actor类型满足某个协议时就会出现一个问题,比如:

extension Pie: CustomStringConvertible {
    var description: String {
        value
    }
}

错误:

Actor-isolated property ‘description’ cannot be used to satisfy nonisolated protocol requirement

此时就需要或者协议或者actor类型做出妥协,比如让协议也遵循Actor:

protocol CustomStringConvertibleActor: Actor {
  var descriptionActor: String { get }
}

extension Pie: CustomStringConvertibleActor {
    var descriptionActor: String {
        value
    }
}

或者让协议明确为异步属性:

protocol CustomStringConvertibleAsync {
  var descriptionAsync: String { get async }
}

extension Pie: CustomStringConvertibleAsync {
    var descriptionAsync: String {
        get async {
            value
        }
    }
}

如果不需要访问actor类型隔离域中的成员,或者隔离域中let不可变成员,可以用nonisolated关键字来标记:

actor Pie {
  let name = "Pie"
  init() {}
}

extension Pie: CustomStringConvertible {
  nonisolated var description: String {
    name
  }
}

如果需要将actor外的声明放在隔离域中,可以用isolated关键字:

func compare(pie: isolated Pie, anotherPie: isolated Pie) {
    if pie.value.count > anotherPie.value.count {
      print("The other pie is smaller.")
    } else if pie.value.count < anotherPie.value.count {
      print("The other pie is bigger.")
    } else {
      print("The two pies are the same.")
    }
  }

以上函数,参数类型前加上isolated,就会把这个函数放在该参数actor的隔离域中,因此,函数内调用隔离域内的成员就可以用同步的方式了。不过,如果是从隔离域外使用,则需要await,例如:

actor Pie {
  func compareWithDefault() async {
    await compare(self, Pie())
  }
}

因为actor的数据隔离可以保证其成员的安全,有时候需要把隔离的作用域放大,这时候就有了全局actor。比如必须发生在主线程UI操作,可以将主线程看作是一个特殊的actor隔离区域,这时就有了MainActor,标准库中的一个特殊actor类型:

@globalActor final public actor MainActor: GlobalActor {
  public static let shared: MainActor
  // …
}

MainActor可以作为属性包装(property wrapper)来标注其他类型或方法:

@MainActor class MyClass {
  func myMethod() {}
}

class MyOtherClass {
  @MainActor var value: Int?
  @MainActor func method() {}
  func nonisolatedMethod() {}
}

@MainActor var globalFlag = true

和其他actor一样,使用时需要切换到MainActor的隔离域,可以通过await也可以通过将Task闭包标记为@MainActor来把闭包切换到同样的隔离域:

class Test {
  func makeTasks() {
    Task { await MyClass().method }
    Task { @MainActor in MyClass().method() }
    Task { @MainActor in globalFlag = false }
  }

  func somethingAsync() async {
    await MyClass().method()
  }
}

MainActor的执行器内部做的事情就是调用DispatchQueue.mainasync来把操作派发到主队列中;如此派发所接受的闭包,在Swift并发中其实是被隐式的标记为@MainActor的,所以下面代码时可以执行的:

class Test {
  func asyncTask() {
    DispatchQueue.main.async {
      MyClass().method()
      globalFlag = false
    }
  }
}

UIKit中许多类都已经被标记为@MainActor,比如:

@MainActor class UIViewController: UIResponder {}
@MainActor class UIView: UIResponder {}
@MainActor class UIButton: UIControl {}

因此,UIViewController的子类的运行环境已经在@MainActor域中,通过Task创建新任务,将继承actor的运行环境,其闭包也运行在MainActor的隔离域中:

final class MyViewController: UIViewController {
  override func viewDidLoad() {
    let url = URL(string: "https://request.com")!
    Task {
      let (data, _) = try await URLSession.shared.data(from: url)
      self.updateUI(data)
    }
  }
}

而如果使用的是Task.detached,闭包将忽略原有的隔离域:

Task.detached {
  let (data, _) = try await URLSession.shared.data(from: url)
  await self.updateUI(data)
  // self.updateUI(data)会产生编译错误
}

相应的,如果是发生在UIViewController之外的操作,因为无法确定调用域,而必须使用await

final class Test {
  func test() async {
    let button = UIButton()
    await button.setTitle("Click Me", for: .normal)
    // 这里如果不加await则会报错
  }
}

然而如果以上test()并非异步(async)的话,目前编译器不会报错,主要是为了照顾已有代码不会出错:

final class Test {
  func test() {
    let button = UIButton()
    button.setTitle("Click Me", for: .normal)
  }
}

如果某些成员被标记为@MainActor,编译器则会进行检查:

class ViewController: UIViewController {
  @MainActor func updateUI() {}
}

final class Test {
  func test() {
    // ViewController().updateUI()会报错
    // 必须让它能跳到`MainActor`的隔离域
    Task {
      await ViewController().updateUI()
    }
  }
}

在不同并发域之间传递数据,一个常用的协议是Sendable,它是一个标志协议(marker protocol),没有任何具体要求;声明某个类型满足Sendable会使编译器对它检查,来确认是否能满足要求。

@_marker
public protocol Sendable {}

标准库中的大部分基本类型都是满足Sendable的:

extension Int: Sendable {}
extension Bool: Sendable {}
extension String: Sendable {}

extension Optional: Sendable where Wrapped: Sendable {}
extension Array: Sendable where Element: Sendable {}
extension Dictionary: Sendable where Key: Sendable, Value: Sendable {}

同一模块中,如果一个struct所有成员都是Sendable,编译器会推断出并自动让该协议满足Sendable

如果能确定某个struct不会再改变,可以使用@frozen进行声明,这种struct的成员不会再被修改,编译器也可以直接访问并确定它的内部结构,从而添加隐式Sendable

要使class类型遵守Sendable有如下条件:

  1. 这个class必须是final的,不允许继承
  2. class类型成员必须都遵循Sendable
  3. 所有成员必须使用let声明不会改变

class类型不同,actor类型因为有内部的隔离机制,保证了内部状态的安全,无论在哪个模块,拥有什么类型的存储成员,编译器都会给actor类型加上Sendable

函数也会在并发域之间传递,因此函数也可以被Sendable标记:

func enrol(studentId: String, completion: @Sendable (Bool) -> Void) {}

@Sendable标记广泛存在于Swift的异步API中:

extension Task where Failure == Never {
  init(
    priority: TaskPriority? = nil,
    operation: @escaping @SEndable () async -> Success
  )
}

struct TaskGroup<ChildTaskResult> {
  mutating func addTask(
    priority: TaskPriority? = nil,
    operation: @escaping @Sendable () async -> ChildTaskResult
  )
}

struct AsyncStream<Element> {
  struct Continuation: Sendable {
    var onTermination: (@SEndable (Termination) -> Void)? { get nonmutating set }
  }
}

编译器会对标记为@Sendable的函数做如下检查:

下面这段代码会产生编译错误:

var cat = "Moe"
Task {
  let isMyCat = "noname" == cat
}

Mutation of captured var ’name’ in concurrently-executing code

这是因为虽然String遵循Sendable,但cat可能会在多个不同的任务上下文中被更改。如果只需要知道此变量的内容,可以进行捕获或者使用let来声明:

var cat = "Moe"
Task { [cat] in
  let isMyCat = "noname" == cat
}
let cat = "Moe"
Task {
  let isMyCat = "noname" == cat
}

异步函数出错时往往会抛出错误,因此Swift把Error协议也标记为Sendable

protocol Error: Sendable {}

如果自定义的错误类型包含无法满足Sendable的东西,比如:

class Detail {
  var code: Int
  var message: String
}

struct MyError: Error {
  var detail: Detail
}

为了尽可能的兼容已有代码,Swift 6之前暂时允许这样的错误存在。

@unchecked Sendable

如果你的类中已经有措施保证了数据安全,比如用NSLock来确保同一时间只有最多一个线程改变实例的内部状态,那么可以将此类标记为@unchecked Sendable,编译器则会跳过检查,默认此类型可以满足Sendable

@_unsafeSendable

假如你的代码使用了某个旧模块,还没有迁移到最新的Swift版本标准,那么可能会出现旧模块中符合Sendable却无法被编译器自动推断的类型,比如模块中的代码:

public class Position {
  public let row: Int
  public let column: Int

  public init(_ point: CGPoint) {
    row = Int(point.x)
    column = Int(point.y)
  }
}

而你使用该模块的代码:

struct Board {
  let slots: [[Position]]
}

如果需要让你的Board类型满足Sendable,有两个办法:

  1. 在自己的模块中给Position添加@unchecked Sendable的假设,让编译器可以推断出你的Board类型满足Sendable
extension Position: @unchecked Sendable {}
  1. 直接将你的Board类型标记为@unchecked Sendable,忽略Position的情况:
struct Board: @unchecked Sendable {
  let slots: [[Position]]
}

这里存在一个问题,如果Position所在的模块迁移之后,事实上无法满足Sendable,但由于存在@unchecked Sendable的假设,编译器不会报告问题。为了解决这个问题,Swift采取了一种渐进式的迁移策略,导入模块时如果模块没有完成并发适配,会先假设模块中像Position这样的类型可以进行隐式Sendable推断,这样一来就不需要我们主动来把它标记为@unchecked Sendable,以避免之后出错。