英文原文:https://www.singularity-data.com/blog/build-with-Redpanda-and-RisingWave/
过去的几年,流数据的规模呈爆炸性增长。很多企业意识到他们需要转向流处理,但他们却很难弄清楚该采取什么路线。大多数流处理框架的设计和实现都过于复杂。
- RisingWave 是一个云原生流数据库,它使用SQL作为接口。其旨在降低实时应用的开发和维护成本。RisingWave会消费流式数据,并进行流式处理,从而动态更新结果。
- Redpanda 是一个兼容Apache Kafka®的流式数据平台。它从一开始就以性能和简单为前提。它不需要Zookeeper®,不需要JVM,也不需要修改代码。
RisingWave能够与Redpanda无缝集成,我们提供了一个实时数据流处理的解决方案,这将使开发和维护实时应用变得格外简单。
概述
在本教程中,你将学习如何使用RisingWave来消费Redpanda数据流并进行数据分析。我们将使用广告曝光和点击事件作为样本数据,并计算广告曝光后一分钟内的点击量。
以下是广告曝光事件和点击事件的数据模型:
{
"user_id": 2926375,
"click_timestamp": "2022-05-11 16:04:06.416369",
"impression_timestamp": "2022-05-11 16:04:06.273401",
"ad_id": 8184596
}
对于不熟悉数字广告的用户来说,曝光即广告在App或网站上的一次展示。impression_timestamp
是广告曝光的时间。在此数据模型中,impression_timestamp
一定会比click_timestamp
更小(更早),以确保只有印象之后的点击被计算在内。
我们专门为Redpanda和RisingWave设置了一个演示集群,这样你就不需要单独安装它们。
运行前提
- 请确保你的环境中安装了 Docker 和 Docker Compose。请注意,Docker Compose包含在Windows和MacOS的Docker Desktop中。如果你使用Docker Desktop,请在启动演示集群之前确保它正在运行。
- 确保 PostgreSQL 的交互式终端psql已安装在你的环境中。
- 要在macOS上安装
psql
,请运行这个命令。brew install postgres
. - 要在Ubuntu上安装
psql
,请运行这个命令。sudo apt-get install postgresql-client
.
- 要在macOS上安装
第1步:启动演示集群
首先,把 risingwave-demo 仓库克隆到你的环境中。
git clone https://github.com/singularity-data/risingwave-demo.git
现在让我们定位到ad-click目录,并用docker compose启动演示集群。
cd ad-click
docker-compose up -d
我们将会启动一个Redpanda实例和必要的RisingWave组件,包括前端节点、计算节点、元数据节点和MinIO。
我们在docker-compose中还打包了一个负载生成器。它将生成一些随机数据并发送至 Redpanda。
第2步:将RisingWave连接到Redpanda流上
现在让我们连接到RisingWave,以便我们能够管理数据流并进行数据分析。
psql -h localhost -p 4566
请注意,RisingWave可以通过psql连接,默认端口为4566,而Redpanda则是监听端口9092。如果你打算直接从Redpanda摄取数据,你应该使用端口9092。
我们将用如下SQL语句来设置与Redpanda的连接。
create source ad_source (
user_id bigint,
ad_id bigint,
click_timestamp timestamp,
impression_timestamp timestamp
) with (
'connector' = 'kafka',
'kafka.topic' = 'ad_clicks',
'kafka.brokers' = 'message_queue:9092',
'kafka.scan.startup.mode' = 'latest'
) row format json;
让我们再深入了解一下 WITH
中的参数。
'connector' = 'kafka'
: 由于Redpanda与Kafka兼容,它可以以与Kafka相同的方式连接。'kafka.topic' = 'user_activities'
: Redpanda的topic。'kafka.brokers' = 'redpanda:9092'
: Redpanda Broker的网络地址。'kafka.scan.startup.mode' = 'earliest'
: 这意味着RisingWave将从流中最早的记录开始消费数据。或者,你也可以将此参数设置为latest
,这意味着RisingWave将从最新的记录开始消耗数据。
第3步:分析数据
我们将定义一个物化视图来计算每个广告在曝光后一分钟内的点击量。
通过物化视图,每次有新的事件出现时,RisingWave将只会进行增量计算。一但新事件的计算完成后,结果就会被持久化。
create materialized view m_click_statistic as
select
ad_id,
count(user_id) as clicks_count
from
ad_source
where
click_timestamp is not null
and impression_timestamp < click_timestamp
and impression_timestamp + interval '1' minute >= click_timestamp
group by
ad_id;
我们希望只算被点击过的广告曝光,因此我们使用click_timestamp is not null
的条件来限制范围。我们希望排除任何在曝光后一分钟内的点击,因此这里有一个impression_timestamp + interval '1' minute >= click_timestamp
条件。
第4步:查询结果
RisingWave旨在通过对数据流进行预聚合,实现秒级实时性和以及低查询延迟。由此一来,下游应用可以在极短瞬间内查询结果。
我们用以下语句来进行查询:
select * from m_click_statistic;
其结果可能是这样的:
ad_id | clicks_count
------+--------------
1 | 356
2 | 340
3 | 319
4 | 356
5 | 333
6 | 368
7 | 355
8 | 349
9 | 359
(9 rows)
如果你多次查询,你将能够看到,随着新事件的出现,结果在不断变化。例如,如果你在10秒后再次运行查询,你可能得到如下结果:
ad_id | clicks_count
------+--------------
1 | 362
2 | 345
3 | 325
4 | 359
5 | 335
6 | 369
7 | 360
8 | 353
9 | 360
(9 rows)
结束后,你可以执行以下命令来删除Docker容器:
docker-compose down
总结
在本教程中,我们将 RisingWave 连接至 Redpanda 流,并进行了基本的广告性能分析。该用例有点简单,旨在提供启发而非提供完整案例。如果你想分享你的RisingWave案例,或者你对某个特定的使用场景感兴趣,请在Slack上的RisingWave Community工作区中告诉我们。请使用此邀请链接加入工作区。