结合前两期 Dapr(一) 基于云原生了解Dapr(Dapr(一) 基于云原生了解Dapr-CSDN博客) Dapr(二) 分布式应用运行时搭建及服务调用(Dapr(二) 分布式应用运行时搭建及服务调用-CSDN博客)
下篇推出dapr服务注册与发现,dapr组件绑定,dapr Actor功能。
目录
1.0 Dapr状态管理
Dapr的状态管理允许应用程序保存和检索键值对数据,具有可插拔的存储、配置的行为和额外的安全特性。以下是主要特点:
可插拔状态存储:Dapr支持多种数据存储,比如MySQL、Redis、Azure CosmosDB等,可以在不修改代码的情况下切换。
配置存储行为:你可以指定并发控制和一致性级别。默认是最终一致性,但也支持强一致性。
并发控制:通过ETags实现乐观并发控制(OCC)。写操作需要匹配当前的ETag值,防止冲突。
自动加密:预览功能,支持应用程序状态的自动加密和密钥轮换。
一致性选项:可以选择强一致性的写入,等待所有副本确认,或者默认的最终一致性。
批量操作:支持一次性处理多条状态记录。
1.1 Dapr状态组件配置文件
Dapr默认使用的Redis进行存储。
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: statestore
spec:
type: state.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""
- name: actorStateStore
value: "true"
1.2 状态控制器
public class StateController : ControllerBase
{
private readonly ILogger<StateController> _logger;
private readonly DaprClient _daprClient;
public StateController(ILogger<StateController> logger, DaprClient daprClient)
{
_logger = logger;
_daprClient = daprClient;
}
// 获取一个值
[HttpGet]
public async Task<ActionResult> GetAsync()
{
var result = await _daprClient.GetStateAsync<string>("statestore", "guid");
return Ok(result);
}
//保存一个值
[HttpPost]
public async Task<ActionResult> PostAsync()
{
await _daprClient.SaveStateAsync<string>("statestore", "guid", Guid.NewGuid().ToString(), new StateOptions() { Consistency = ConsistencyMode.Strong });
return Ok("done");
}
//删除一个值
[HttpDelete]
public async Task<ActionResult> DeleteAsync()
{
await _daprClient.DeleteStateAsync("statestore", "guid");
return Ok("done");
}
//通过tag防止并发冲突,保存一个值
[HttpPost("withtag")]
public async Task<ActionResult> PostWithTagAsync()
{
var (_, etag) = await _daprClient.GetStateAndETagAsync<string>("statestore", "guid");
await _daprClient.TrySaveStateAsync("statestore", "guid", Guid.NewGuid().ToString(), etag);
return Ok("done");
}
//通过tag防止并发冲突,删除一个值
[HttpDelete("withtag")]
public async Task<ActionResult> DeleteWithTagAsync()
{
var (_, etag) = await _daprClient.GetStateAndETagAsync<string>("statestore", "guid");
return Ok(await _daprClient.TryDeleteStateAsync("statestore", "guid", etag));
}
// 从绑定获取一个值,健值name从路由模板获取
[HttpGet("frombinding/{name}")]
public ActionResult GetFromBindingAsync([FromState("statestore", "name")] StateEntry<string> state)
{
return Ok(state.Value);
}
// 根据绑定获取并修改值,健值name从路由模板获取
[HttpPost("withbinding/{name}")]
public async Task<ActionResult> PostWithBindingAsync([FromState("statestore", "name")] StateEntry<string> state)
{
state.Value = Guid.NewGuid().ToString();
return Ok(await state.TrySaveAsync());
}
// 获取多个个值
[HttpGet("list")]
public async Task<ActionResult> GetListAsync()
{
var result = await _daprClient.GetBulkStateAsync("statestore", new List<string> { "guid" }, 10);
return Ok(result);
}
// 删除多个个值
[HttpDelete("list")]
public async Task<ActionResult> DeleteListAsync()
{
var data = await _daprClient.GetBulkStateAsync("statestore", new List<string> { "guid" }, 10);
var removeList = new List<BulkDeleteStateItem>();
foreach (var item in data)
{
removeList.Add(new BulkDeleteStateItem(item.Key, item.ETag));
}
await _daprClient.DeleteBulkStateAsync("statestore", removeList);
return Ok("done");
}
}
1.3 切换其它状态存储
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: statestore
spec:
type: state.mysql
version: v1
metadata:
- name: connectionString
value: "root:123456@tcp(192.168.157.157:3306)/?allowNativePasswords=true"
切换状态为MySql进行存储。
1.4 工作原理
应用程序与 Dapr sidecar 交互,以存储和检索键/值数据。 在底层,sidecar API 使用**可配置的状态存储组件**来保存数据。 开发人员可以从不断增长的受支持状态存储集合中选择,其中包括 Azure Cosmos DB、SQL Server 和 Cassandra。
2.0 发布订阅
2.1 什么是发布订阅
发布订阅(Publish-Subscribe)是一种通信模式,允许发布者发送消息到一个中心节点(通常是消息代理或主题),而不关心具体哪些订阅者会接收到这些消息。订阅者则注册他们感兴趣的特定类型的消息,当匹配的消息发布时,他们会收到通知。这种模式的特点在于解耦了发布者和订阅者,提高了系统的灵活性和可扩展性。
关键元素包括:
- 发布者 (Publisher): 生产消息的实体,它向主题或消息代理发送消息,无需了解谁会接收这些消息。
- 订阅者 (Subscriber): 对特定消息感兴趣并希望接收通知的实体,它们通过订阅主题或消息代理来表达兴趣。
- 主题 或 消息代理 (Topic or Message Broker): 中间媒介,接收并分发消息,确保消息从发布者到达正确的订阅者。
一个简单的示例是新闻系统,其中发布者发布新闻到特定类别,而订阅者选择关注他们感兴趣的类别。发布者不直接通知订阅者,而是通过消息代理进行,这样订阅者仅接收与其订阅相匹配的新闻。
发布订阅模式的应用场景通常涉及异步通信、事件驱动的系统或需要解耦组件的场景。
2.2 设置发布订阅组件
Dapr默认使用的Redis进行发布订阅。
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""
2.3 控制器代码
2.3.1 发布控制器
[ApiController]
[Route("[controller]")]
public class PubsubController : ControllerBase
{
private DaprClient _daprClient;
private ILogger<PubsubController> _logger;
public PubsubController(DaprClient daprClient, ILogger<PubsubController> logger)
{
_daprClient = daprClient;
_logger = logger;
}
/// <summary>
/// 发布消息的方法
/// </summary>
/// <returns></returns>
[HttpPost]
[Route("pub")]
public async Task<IActionResult> PublishMessage()
{
_logger.LogInformation("***发布消息***");
var data = new UserInfo(10001,"操作员",19);
await _daprClient.PublishEventAsync("pubsub", "topic",data);
return Ok("***发布消息成功***");
}
}
2.3.2 订阅控制器
[ApiController]
[Route("[controller]")]
public class SubController : ControllerBase
{
private ILogger<SubController> _logger;
public SubController(ILogger<SubController> logger)
{
_logger = logger;
}
[HttpPost("sub")]
[Topic("pubsub", "topic")]
public IActionResult ConsumerMessage(UserInfo user)
{
_logger.LogInformation("***消费消息***");
Console.WriteLine($"userId:{user.UserId} userName:{user.UserName}");
return Ok();
}
}
2.4 修改文件Program.cs
app.UseCloudEvents();
app.MapSubscribeHandler();
2.5 切换组件
切换为RabbitMQ
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
namespace: default
spec:
type: pubsub.rabbitmq
version: v1
metadata:
- name: host
value: "amqp://123:123@192.168.157.157:5672"
- name: durable
value: "false"
- name: deletedWhenUnused
value: "false"
- name: autoAck
value: "false"
- name: deliveryMode
value: "0"
- name: requeueInFailure
value: "false"
- name: prefetchCount
value: "0"
- name: reconnectWait
value: "0"
- name: concurrencyMode
value: parallel
- name: backOffPolicy
value: "exponential"
- name: backOffInitialInterval
value: "100"
- name: backOffMaxRetries
value: "16"
2.6 工作原理
Dapr 发布&订阅构建基块提供了一个与平台无关的 API 框架来发送和接收消息。服务将消息发布到指定主题, 业务服务订阅主题以使用消息。服务在 Dapr sidecar 上调用 pub/sub API。 然后,sidecar 调用预定义 Dapr pub/sub 组件。
总结:
Dapr的发布订阅功能使得在分布式系统中实现发布/订阅消息模式变得更加简单。主要解决了不同消息产品之间实施复杂性和功能差异的问题。你可以通过Dapr的Sidecar API使用HTTP或gRPC来发布和订阅消息。以下是关键操作的概述:
发布(Publish)消息:
- 使用
http://localhost:<dapr-port>/v1.0/publish/<pub-sub-name>/<topic>
URL,其中<dapr-port>
是Dapr Sidecar监听的端口,<pub-sub-name>
是选择的发布/订阅组件名,而<topic>
是消息的目标主题。
- 使用
订阅(Subscribe)消息:
- 应用程序在启动时,通过
http://localhost:<appPort>/dapr/subscribe
指定其订阅,其中<appPort>
是应用程序监听的端口。 - 订阅者处理消息后返回非错误响应,Dapr认为消息传递成功。
- 支持订阅者通过响应负载中的状态进行精细化控制,比如指示重试(RETRY)或丢弃(DROP)消息。
- 应用程序在启动时,通过
Dapr的状态管理提供了一种跨服务持久化数据的方法,支持多种存储后端。关键特性包括:
原子性操作:支持原子性的读写操作,保证一致性。
版本控制:允许跟踪状态更改的历史版本,便于回滚。
事件驱动:状态变化可触发回调函数,实现基于状态变化的自动化操作。
过期策略:可设置状态项的过期时间。
备份与恢复:提供状态备份和恢复机制,确保高可用性。