Dart笔记:Isolate及其通信机制

发布于:2024-07-11 ⋅ 阅读:(21) ⋅ 点赞:(0)
Dart笔记
多隔离及其通信机制

- 文章信息 -
Author: 李俊才 (jcLee95)
Visit me at CSDN: https://jclee95.blog.csdn.net
My WebSitehttp://thispage.tech/
Email: 291148484@163.com.
Shenzhen China
Address of this article:https://blog.csdn.net/qq_28550263/article/details/138823881
HuaWei:https://bbs.huaweicloud.com/blogs/XXXXXXXXXX

【介绍】:本文介绍Dart中多线程及其通信机制。

flutter-ljc


1. Isolate基础

Isolate是Dart提供的一种轻量级的并发编程模型,通过Isolate可以方便地编写高效、安全的多线程程序。在Dart中,Isolate是一种独立的执行线程,有自己的内存和事件循环。每个Isolate都在自己的内存堆中运行,不共享任何可变状态,因此Isolate之间的通信必须通过消息传递来完成。

Isolate具有以下特点:

  1. 独立性:每个Isolate都是完全独立的,有自己的内存空间和事件循环,不会被其他Isolate干扰。
  2. 隔离性:Isolate之间不共享任何可变状态,因此不会出现多线程编程中常见的竞态条件和死锁等问题。
  3. 通信方式:Isolate之间通过消息传递进行通信,消息可以是任意的Dart对象,但必须是不可变的。
  4. 并发性:多个Isolate可以并发执行,充分利用多核CPU的计算能力,提高程序的性能。
  5. 异常处理:每个Isolate都有自己的异常处理机制,不会影响其他Isolate的运行。

2. Isolate的创建方式

2.1 通过Isolate.spawn创建隔离

我们可以使用Isolate.spawn方法创建一个新的隔离(Isolate)。该方法的签名如下:

static Future<Isolate> spawn<T>(
    void entryPoint(T message), T message,
    {bool paused = false,
    bool errorsAreFatal = true,
    SendPort? onExit,
    SendPort? onError,
    ("2.3") String? debugName});

各个参数的含义如表所示:

参数名 类型 默认值 描述
entryPoint void Function(T) 必需 新隔离的入口函数,接收一个类型为T的消息参数。
message T 必需 传递给新隔离的入口函数的消息,类型为T。
paused bool false 如果为true,则新隔离在启动时会被暂停。
errorsAreFatal bool true 如果为true,隔离中未捕获的异常会导致隔离终止。
onExit SendPort? null 隔离退出时的回调端口,可以用来接收隔离的退出信号。
onError SendPort? null 隔离中发生未捕获异常时的回调端口,可以用来接收错误信息。
debugName String? null 隔离的调试名称,用于在调试时标识隔离。自Dart 2.3版本引入。

其中,我们使用该方法时主要关注的是entryPointmessage这两个参数,例如:

import 'dart:isolate';

void main() {
  // 创建新隔离,并传递一个字符串消息
  Isolate.spawn(workerIsolate, 'Hello from main isolate');
}

/// 新隔离的入口函数
///
/// [message] 主隔离传递过来的消息
void workerIsolate(String message) {
  print('New isolate received message: $message');
  // 在这里执行新隔离的任务
  // ...
}

在上面的代码中:

  1. 在主隔离(main函数)中,调用Isolate.spawn方法创建了一个新隔离。
  2. workerIsolate函数被指定为新隔离的入口函数,它接收一个字符串类型的消息参数。
  3. 字符串’Hello from main isolate’被作为消息参数传递给新隔离。
  4. 在新隔离中,workerIsolate函数被执行,并打印出接收到的消息。

通过Isolate.spawn方法,可以方便地创建新的隔离,并可以向新隔离传递初始化消息。新隔离将在独立的内存空间中运行,与主隔离相互隔离,从而实现并发执行的效果。

2.2 通过Isolate.spawnUri创建隔离

除了使用Isolate.spawn方法创建隔离外,Dart还提供了Isolate.spawnUri方法,可以通过指定一个URI来创建隔离。这个URI可以是一个Dart文件的路径或者一个包含Dart代码的字符串。

Isolate.spawnUri方法的签名如下:

static Future<Isolate> spawnUri(
    Uri uri, List<String> args, var message,
    {bool paused = false,
    SendPort? onExit,
    SendPort? onError,
    bool errorsAreFatal = true,
    bool? checked,
    Map<String, String>? environment,
    Uri? packageConfig,
    bool automaticPackageResolution = false,
    ("2.3") String? debugName});

各个参数的含义如表所示:

参数名 类型 默认值 描述
uri Uri 必需 包含隔离入口点的URI。可以是一个Dart文件的路径或包含Dart代码的URI。
args List 必需 传递给隔离入口点的参数列表。
message dynamic 必需 传递给隔离的初始消息。可以是任意类型的对象。
paused bool false 如果为true,则新隔离在启动时会被暂停。
onExit SendPort? null 隔离退出时的回调端口,可以用来接收隔离的退出信号。
onError SendPort? null 隔离中发生未捕获异常时的回调端口,可以用来接收错误信息。
errorsAreFatal bool true 如果为true,隔离中未捕获的异常会导致隔离终止。
checked bool? null 表示是否启用运行时类型检查。默认为null,使用与当前隔离相同的设置。
environment Map<String, String>? null 传递给隔离的环境变量映射。
packageConfig Uri? null 包配置文件的URI。
automaticPackageResolution bool false 表示是否自动解析包。默认为false。
debugName String? null 隔离的调试名称,用于在调试时标识隔离。自Dart 2.3版本引入。

相比于Isolate.spawn方法,Isolate.spawnUri允许指定一个URI作为新隔离的入口点。例如:

import 'dart:isolate';

void main() async {
  // 指定包含隔离入口点的URI
  Uri uri = Uri.parse('package:my_package/worker.dart');

  // 创建隔离,并传递初始消息
  Isolate isolate = await Isolate.spawnUri(
    uri,
    ['Hello', 'from', 'main'],
    'Initial message',
    debugName: 'WorkerIsolate',
  );

  print('Isolate created: ${isolate.debugName}');
}

在上面的代码中:

  1. 通过Uri.parse方法指定了包含隔离入口点的URI,这里假设隔离的入口点位于package:my_package/worker.dart文件中。
  2. 调用Isolate.spawnUri方法创建隔离,传递了URI、参数列表和初始消息。
  3. 通过debugName参数指定了隔离的调试名称为 ‘WorkerIsolate’
  4. 创建隔离后,打印出隔离的调试名称。

在worker.dart文件中,需要定义隔离的入口点函数,例如:

// worker.dart
void main(List<String> args, dynamic message) {
  print('Worker isolate started with args: $args');
  print('Received initial message: $message');
  // 在这里执行隔离的任务
  // ...
}

在隔离的入口点函数中,可以接收传递的参数列表args和初始消息message。这里简单地打印出接收到的参数和消息。

通过Isolate.spawnUri方法,可以方便地将隔离的代码放在单独的Dart文件中,使代码结构更加清晰和模块化。同时,还可以向隔离传递参数和初始消息,方便隔离的初始化和配置。

需要注意的是,使用Isolate.spawnUri创建隔离时,需要确保URI指向的Dart文件是可访问的,并且具有正确的入口点函数签名。

Isolate.spawnUri提供了另一种创建隔离的方式,通过指定包含隔离代码的URI,可以将隔离的逻辑与主程序分离,提高代码的可读性和可维护性。同时,还支持传递参数和初始消息,使隔离的创建和配置更加灵活。

2.3 compute函数

Dart提供了一个便捷的函数compute,用于在后台isolate中执行耗时操作,并返回执行结果。compute函数会自动创建一个新的isolate,在其中运行指定的回调函数,并将结果返回给调用方。

Future<R> compute<M, R>(ComputeCallback<M, R> callback, M message, {String? debugLabel}) {
  return isolates.compute<M, R>(callback, message, debugLabel: debugLabel);
}

compute函数接受以下参数:

  • callback: 类型为ComputeCallback<M, R>,表示要在后台isolate中执行的回调函数。该函数接受一个类型为M的参数,并返回一个类型为R的结果。

  • message: 类型为M,表示要传递给回调函数的参数。

  • debugLabel: 可选参数,类型为String,用于为后台isolate指定一个调试标签。当进行性能分析时,该标签会与isolate产生的Timeline事件相关联,方便识别和定位问题。

使用compute函数的示例如下:

import 'package:flutter/foundation.dart';

void main() async {
  int result = await compute(fibonacci, 40);
  print('斐波那契数列第40位: $result');
}

/// 计算斐波那契数列的回调函数
///
/// [n] 要计算的斐波那契数列的位置
int fibonacci(int n) {
  if (n <= 0) {
    return 0;
  } else if (n == 1) {
    return 1;
  } else {
    return fibonacci(n - 1) + fibonacci(n - 2);
  }
}

在上面的示例中:

  1. 导入了package:flutter/foundation.dart包,其中包含了compute函数的定义。
  2. 在main函数中,调用了compute函数,传入了fibonacci回调函数和参数40,表示要计算斐波那契数列的第40位。
  3. compute函数会自动创建一个新的isolate,并在其中执行fibonacci函数,计算斐波那契数列的第40位。
  4. 计算完成后,compute函数会将结果返回给调用方,并打印出结果。

fibonacci函数是一个递归函数,用于计算斐波那契数列的指定位置的值。由于斐波那契数列的计算是一个比较耗时的操作,特别是对于较大的位置值,使用compute函数可以将计算任务放到后台isolate中执行,避免阻塞主isolate,提高应用的响应性能。

传递给compute函数的回调函数以及参数都必须是可以在isolate之间传递的对象。大多数对象都可以在isolate之间传递,但是有一些例外情况需要注意,例如包含了不可传递状态的闭包等。

3. Isolate之间通信

3.1 单向通信

在Dart中,Isolate之间的单向通信可以通过SendPort和ReceivePort来实现。发送端通过SendPort将消息发送给接收端,接收端通过ReceivePort接收消息。

下面是一个示例代码,演示了如何在Isolate之间进行单向通信:

import 'dart:isolate';

void main() {
  startSingleDirectionExample();
}

/// 单向通信示例函数
Future<void> startSingleDirectionExample() async {
  print('SingleDirection start----------');
  String mainDebugName = Isolate.current.debugName!;
  print('[$mainDebugName]为主线程');

  // 创建主线程的ReceivePort
  ReceivePort mainReceivePort = ReceivePort();

  // 创建新线程,并将主线程的ReceivePort传递给新线程
  await Isolate.spawn(
    workerThread,
    mainReceivePort.sendPort,
    debugName: 'WorkerIsolate',
  );

  // 监听来自新线程的消息
  await for (var message in mainReceivePort) {
    if (message == null) {
      break;
    }
    print('[$mainDebugName]收到了来自新线程的消息: $message');
  }

  print('SingleDirection end----------');
}

/// 新线程的入口函数
///
/// [mainSendPort] 主线程传递过来的SendPort
void workerThread(SendPort mainSendPort) {
  String workerDebugName = Isolate.current.debugName!;
  print('[$workerDebugName]为新线程');

  // 向主线程发送消息
  mainSendPort.send('Hello from $workerDebugName');
  mainSendPort.send('How are you?');
  mainSendPort.send('Goodbye!');

  // 发送null值,表示不再有新的消息了
  mainSendPort.send(null);

  // 关闭新线程
  Isolate.exit();
}

在上面的代码中:

  1. 在主线程(main函数)中,创建了一个ReceivePort对象mainReceivePort,用于接收来自新线程的消息。
  2. 调用Isolate.spawn方法创建一个新线程,并将主线程的SendPort对象mainReceivePort.sendPort传递给新线程的入口函数workerThread。同时,通过debugName参数指定新线程的调试名称为**‘WorkerIsolate’**。
  3. 在主线程中,使用await for循环监听mainReceivePort上的消息。每当收到新线程发送的非null消息时,就会打印出消息内容。如果收到null消息,则表示新线程不再发送消息,此时跳出循环。
  4. 在新线程(workerThread函数)中,通过传递过来的SendPort对象mainSendPort,向主线程发送了三条非null的消息。
  5. 在发送完非null的消息后,新线程额外发送了一个null值,表示不再有新的消息了。
  6. 最后,新线程调用 **Isolate.exit()**方法关闭自己。
    运行该代码,输出结果如下:
SingleDirection start----------
[main]为主线程
[WorkerIsolate]为新线程
[main]收到了来自新线程的消息: Hello from WorkerIsolate
[main]收到了来自新线程的消息: How are you?
[main]收到了来自新线程的消息: Goodbye!
SingleDirection end----------

从输出结果可以看到,主线程成功接收到了新线程发送的三条非null的消息,并在接收到null消息后跳出了循环,继续执行后面的代码,打印出了'SingleDirection end----------'

这就是Isolate之间单向通信的基本实现。发送端通过SendPort对象将消息发送给接收端,接收端通过ReceivePort对象接收消息。当发送端发送null值时,表示不再有新的消息,接收端可以根据这个信号来结束接收循环。

需要注意的是,在新线程中发送完消息后,需要显式关闭新线程,以释放资源。可以通过调用Isolate.exit()方法来关闭新线程。

3.2 双向通信

import 'dart:isolate';

void main() {
  startMultiThreadExample();
}

/// 多线程示例函数
Future<void> startMultiThreadExample() async {
  print('mutiTheread start----------');
  String debugName = Isolate.current.debugName!;
  print('[$debugName]为当前线程');

  // 创建主线程的ReceivePort和SendPort
  ReceivePort mainReceivePort = ReceivePort();
  SendPort mainSendPort = mainReceivePort.sendPort;

  // 创建新线程,并将主线程的SendPort传递给新线程
  Isolate.spawn(workerThread, mainSendPort);

  // 等待新线程返回其SendPort
  SendPort workerSendPort = await mainReceivePort.first;

  // 向新线程发送消息,并等待回复
  var reply1 = await sendAndReceive<String>(workerSendPort, 'Hello');
  print('[$debugName]接收到:$reply1');

  var reply2 = await sendAndReceive<String>(workerSendPort, 'World');
  print('[$debugName]接收到:$reply2');
  print('mutiTheread end----------');
}

/// 新线程的入口函数
///
/// [mainSendPort] 主线程传递过来的SendPort
workerThread(SendPort mainSendPort) async {
  String debugName = Isolate.current.debugName!;
  print('[$debugName]为当前线程');

  // 创建新线程的ReceivePort和SendPort
  ReceivePort workerReceivePort = ReceivePort();
  SendPort workerSendPort = workerReceivePort.sendPort;

  // 将新线程的SendPort发送给主线程
  mainSendPort.send(workerSendPort);

  // 持续监听新线程的消息
  await for (var message in workerReceivePort) {
    // 检查消息格式是否正确
    if (message is List && message.length == 2) {
      var data = message[0];
      // 检查消息类型是否为字符串
      if (data is String) {
        print('[$debugName]收到了来自主线程的消息:$data');
        SendPort replyPort = message[1];
        // 给主线程回复消息
        replyPort.send(data);
      } else {
        print('[$debugName]收到了无效的消息类型:${data.runtimeType}');
      }
    } else {
      print('[$debugName]收到了无效的消息格式');
    }
  }
}

/// 向指定的SendPort发送消息,并等待回复
///
/// [targetPort] 目标SendPort
/// [message] 要发送的消息
/// 返回: 收到的回复消息
Future<T> sendAndReceive<T>(SendPort targetPort, T message) {
  String debugName = Isolate.current.debugName!;
  print('[$debugName]发送消息给新线程:$message');

  // 创建接收回复消息的ReceivePort
  ReceivePort responsePort = ReceivePort();

  // 发送消息给目标SendPort,并携带接收回复的SendPort
  targetPort.send([message, responsePort.sendPort]);

  // 等待回复消息,并检查类型是否匹配
  return responsePort.first.then((value) {
    if (value is T) {
      return value;
    } else {
      throw Exception('接收到的消息类型与预期不符');
    }
  });
}
mutiTheread start----------
[main]为当前线程
[workerThread]为当前线程
[main]发送消息给新线程:Hello
[workerThread]收到了来自主线程的消息:Hello
[main]接收到:Hello
[main]发送消息给新线程:World
[workerThread]收到了来自主线程的消息:World
[main]接收到:World
mutiTheread end----------