Materialized View Simple Demo | TiDB
Ref What is a materialized view?: A materialized view is a pre-computed data set derived from a query specification (the SELECT in the view definition) and stored for later use. Because the data is pre-computed, querying a materialized view is faster than executing a query against the base table of the view. This performance difference can be significant when a query is run frequently or is sufficiently complex. As a result, materialized views can speed up expensive aggregation, projection, and selection operations, especially those that run frequently and that run on large data sets.
本文记录一项基于 Flink / Hudi / Kafka / TiCDC 以及 TiDB 构建 Materialized View
(物化视图,aka MV
)的简易 demo。
相对于普通视图,MV 需要解决几个问题:
- 数据一致性:TiDB 可以凭借全局唯一
Time Stamp Oracle
(akaTSO
) 来保障事务线性一致性。如果将 TiDB 作为数据源接入到第三方系统,需保障外部应用何种级别的一致性(无一致性,最终一致,线性一致)?如何解决多元异构数据源带来的一致性问题? - 数据存储:如何支持可更新数据的存储?如何构建 MV 并维护状态?
- 数据加工:
ETL
(Extract Transform Load)流程是 OLAP 领域必不可少的一环,较常见的有拼宽表,字段过滤,类型转换,数据分区等。如何令 MV 支持可定制化的 ETL?如何以SQL
(Structured Query Language)形式支持外部定义?
Implementation
Flink
Flink 是目前业内广泛应用的流批一体处理平台,支持 SQL 的形式定义流式计算任务,为了方便搭建 demo,文中借助 Flink 来实现 ETL,构建并维护 MV。
- Flink 状态(state)是被定义在算子(operator/task)中的空间,通过存储过去事件中的信息来影响未来事件的处理,即主要用于保存中间计算结果和缓存数据。
- Flink 有 2 种基本类型的状态:
- 托管状态(Managed State)
- 原生状态(Raw State):需要开发者自己管理,序列化/反序列化;
- Flink 的分布式状态流(stateful stream)容错机制(通过 Checkpointing 实现)较为完善,并可保证端到端
exactly-once
的状态转换(详见 Flink Consistency)
Flink Checkpointing
- Flink 中的 checkpoint 表示应用(FlinkJob)的一致性快照,主要由 2 部分组成:应用的当前状态(相当于内部所有算子状态快照集合);输入数据流中的偏移量;
- Flink 按照规则和配置周期性生成 checkpoint,然后将 checkpoint 写入到可靠的存储中(通常为分布式文件系统);将 checkpoint 写入存储为异步操作,也就意味着 Flink 应用的处理流程不会被 checkpointing 机制阻塞;
- 当异常发生时,Flink 使用最近一次完成的 checkpoint 作为状态的初始点来重启应用;
- Flink 1.3 开始支持在
rocksdb
状态后端中使用增量快照(incremental checkpoint)
,可优化大状态(GB,TB 级状态)的场景; - 参考配置
state.backend
:hashmap 或 rocksdbstate.checkpoints.dir
execution.checkpointing.interval
execution.checkpointing.mode
state.backend.incremental
state.backend.rocksdb.localdir
:参考 state-location-in-rocksdb,建议使用本地 SSD 磁盘来提升吞吐量;
Checkpointing 实现
Flink 使用 Chandy-Lamport 算法的一种变体,称为异步屏障快照(asynchronous barrier snapshotting
)。
当 task manager 接收到 job manager 的命令开启一个 checkpoint,它会让所有数据源记录自身的偏移量,并在数据流中插入按序编号的
checkpoint barriers
,这些 barrier 流过作业图,用于指示 checkpoint 的边界;作业图中的每个算子接收到 barrier 时,会记录自身的状态;checkpoint
n
包含每个算子处理完 barriern
之前所有数据和事件后的状态;对于有多个输入流的算子,则会进行
barrier alignment
操作来对齐 barrier- 对齐 barrier 可以减少状态快照的数据量;如图中所示,输入流在 barrier
n
位置前均无待处理的数据,算子状态快照也就无需保存这类信息 - 对齐操作缺点:
- 增大延迟(快的 barrier 需阻塞等待),拖慢 checkpointing 流程
- 面向类似双流
equal-join
的场景,可能因为输入流延迟不同而导致行为不符合预期- 例如预期被处理的数据对分别为
(1,a)
(2,b)
(3,c)
。对齐 barrier 之后,则变成(1,e)
(2,f)
(3,g)
。
- 例如预期被处理的数据对分别为
- 对齐 barrier 可以减少状态快照的数据量;如图中所示,输入流在 barrier
barrier 这类抽象可以引申为 checkpoint 的边界;明确的边界可以有效地避免冗余;面向
unaligned barrier
这类无明确边界的场景,则需要在状态快照中额外保存冗余数据:- 在 barrier 对齐的场景中,barrier 必须遵循其在数据流中的位置,算子需要等待 barrier 被实际处理才开始快照;对于 barrier 不对齐的场景,当任一数据流的 barrier 到达输入缓存,算子立即保存 3 部分数据至 checkpoint;然后将 barrier 优先放到输出缓存的队首;需保存的数据如下(例如图中绿色部分):
- 所有数据流中 barrier 之前的全部未处理数据;
- 如果 barrier 尚未出现,需阻塞等待,因为此时数据流已持久化的 checkpoint 序号一定小于 barrier,异常后无法恢复;如果要处理该场景,则需要自定义应用的 checkpoint 机制,令数据流自身的 checkpoint 可独立控制;
- 算子输入缓存中 barrier 之前的全部未处理数据;
- 算子输出缓存中的数据;
- 所有数据流中 barrier 之前的全部未处理数据;
- 不对齐 barrier 可以避免对齐造成的阻塞等待
- 从 checkpoint 恢复时,不对齐的数据会被 checkpoint 恢复到相应的数据队列中,所以依然能提供
exactly-once
保证 - 不对齐 barrier 的缺点:
- 由于要持久化冗余数据,加重存储负载
- 随着 checkpoint 增大,可能增大作业恢复时间
- 适合容易产生高反压的复杂作业,但是对于像数据 ETL 同步等简单作业,可以选择轻量级的对齐处理。
- 反压是在实时数据处理中,数据管道某个节点上游产生数据的速度大于该节点处理数据速度的一种现象。
- 在 barrier 对齐的场景中,barrier 必须遵循其在数据流中的位置,算子需要等待 barrier 被实际处理才开始快照;对于 barrier 不对齐的场景,当任一数据流的 barrier 到达输入缓存,算子立即保存 3 部分数据至 checkpoint;然后将 barrier 优先放到输出缓存的队首;需保存的数据如下(例如图中绿色部分):
Flink Consistency
An Overview of End-to-End Exactly-Once Processing in Apache Flink
Flink 的一致性可以分为 3
个级别,由弱到强依次为:
at-most-once
:无一致性保障;at-least-once
:数据或事件至少被处理一次(不丢失,有重复);下游可实现幂等机制来保障一致性;exactly-once
:数据或事件只会被处理一次(不丢失,不重复);
每个组件都保证其自身的一致性,端到端的一致性级别取决于所有组件中一致性最弱的。倘若要实现 exactly-once
一致性,以最简单的端到端处理部分为例:
- 总体分为 3 个部分,数据流向:
source -> internal -> sink
source
:需要外部源可指定数据读取的位置(例如 Kafka 消息队列)- 位置信息作为状态的一部分,异常恢复时可重新开始消费;
internal
:Checkpointing 机制;sink
:需要保证从故障恢复时,数据不会重复写入外部系统;sink 端有 2 种具体实现方式:幂等(Idempotent)
写入:操作可以重复执行很多次,但只导致一次结果更改事务性(Transactional)
写入:一个事务中的操作要么全部成功,要么全部失败- 应用程序中一系列严密的操作,所有操作必须成功完成,否则在每个操作中所作的所有更改都会被撤销;
sink 端事务性写入实现:
- 外部 sink 系统必须支持事务性写入(构建的事务需要与 checkpoint 对应),等 checkpoint 完成时,才把所有对应的结果写入外部系统;
- 对于事务性写入,具体有 2 种实现方式:
预写日志(WAL)
- 把结果数据保存到状态中作为 WAL,等 checkpoint 完成时再写入下游;
- 具体的写入行为可视作基于 WAL 的幂等回放;
- 该方式易于实现,但存在几个缺点:
- 导致状态数据量增大,增加存储负载;
- 写入需要等到 checkpoint 完成,会导致数据延迟;
- 把结果数据保存到状态中作为 WAL,等 checkpoint 完成时再写入下游;
两阶段提交(2PC)
- checkpoint 启动时开启 sink 系统的事务
- 结果数据写入该事务,但不提交
- 等 checkpoint 完成时,提交该事务,实现真正写入
2PC 实现:
- 事务启动:外部 sink 系统开启一个事务,并与 checkpoint
n
相关联 - 事务预写:事务在 checkpoint
n
范围内接受数据预写入 - 事务提交:
- sink 任务处理 barrier
n
时,等事务可变成可提交
的状态后,再保存状态快照。 - 所有子任务状态快照保存完毕,表示 checkpoint
n
已完成。- 出现异常后应用则恢复至该 checkpoint,sink 任务需要恢复该事务并提交。
- 上游广播通知 checkpoint
n
完成,sink 任务正式提交该事务。 - 提交事务的行为必须是幂等操作。
- Flink 从 checkpoint 故障恢复时总会尝试提交相关事务(通过冗余来实现容错,否则需要存储额外的信息表示事务状态)
- sink 任务处理 barrier
- 事务撤回
Flink Incremental Materialized View
增量物化视图(Incremental Materialized View
,aks IMV
) 本质上等同于 Flink 中的一个有状态算子,以 Example4 为例:
- ETL 定义的处理流程为
select b, sum(a) from ... group by b
- 输入 source 为 Kafka 消息队列,输出 sink 系统为 Hudi
- 算子状态包含以
b
字段为 Key,sum(a)
为 Value 的基本元素,状态数据量与 Key 数量呈正相关 sum()
聚合可通过累加中间结果得到最终值,因而能够增量维护- 输入数据为 upsert 类型,则累加
a
字段新旧值的差值 - 输入数据为 delete 类型,则减去旧值
- 输入数据为 upsert 类型,则累加
- 增量维护
GROUP BY()
等价于增加一个COUNT()
聚合算子,值为 0 时表示无数据。 - 状态的实际存储后端可选:内存/文件/rocksdb
- 建议配置为 rocksdb 后段,增量快照面对大状态 checkpointing 较为高效
- 输入/输出系统满足
exactly-once
的基本要求,整体上该应用可保证exactly-once
一致性
min()
/ max()
函数不支持直接累加,其状态机维护类似于 select col_?, count(*) from ? group by col_? order by col_? [desc] limit 1
,当 count 为 0 时则删除相关元素。
avg()
函数则类似 select sum(col_?), count(col_?) from ?
,对外输出时再执行算数除法。
distinct()
函数则类似 select count(*) from (select col_?, count(*) from ? group by col_?)
。
Join 场景下的 IMV 维护与上面类似,主要不同体现在数据源的构造和处理流程上。Flink 本身提供的 Join 类型大致分为:
- Regular join
- left join
- right join
- inner join(默认类型)
- outer join
- Interval Join
- Temporal Join
- Lookup Join(在 Example5 被使用)
- Window Join
Interval Join
和 Window Join
属于比较典型的流式计算场景下的功能,需要外部约束来保证数据一致性。
Storage
Hudi 作为下一代数据湖产品,加强了对于数据更新的支持。其不仅兼容 Hadoop 系的数仓生态,而且与 Flink 的衔接也较为完善。文中以 Hudi 和 HDFS 作为 MV 相关的数据存储系统。
write.tasks
:写任务的并行度write.bucket_assign.tasks
:当index.type
为FLINK_STATE
时,覆盖write.tasks
Bucket Index: Flink 默认使用状态后端来存储文件索引(primary key -> fileId
)。当输入数据量较大时,状态相关的开销容易成为性能瓶颈。BUCKET
的方式通过确定性 hash 算法来讲数据映射到 buckets 中,从而避免了建立索引过程中的存储/查询相关的开销。
index.type
:FLINK_STATE
(default),BUCKET
hoodie.bucket.index.hash.field
hoodie.bucket.index.num.buckets
Consistency
TiCDC 是一款 TiDB 增量数据同步工具,通过拉取上游 TiKV 的数据变更日志,TiCDC 可以将数据解析为有序的行级变更数据输出到下游。
- 借助 Kafka 实现存量数据和增量数据的线性一致。该方式会引入额外的数据同步开销,实际场景中则可以通过存量数据构建 MV Base 部分,利用增量数据构建 MV Delta 部分。详见 Example3,Example4,Example5
- 先选取构建 TiCDC 任务时的 TSO 作为快照点
S
,令导出增量数据到 TopicI
; - 利用 Dumpling 导出
S
对应的存量数据并同步到新 TopicB
,该步骤完成后再提交异步任务,将I
的数据持续同步到B
; - 下游 Flink 则只消费
B
; I
和B
均只有1
个 Partition 以保证线性一致性;
- 先选取构建 TiCDC 任务时的 TSO 作为快照点
- Option:理论上 TiCDC 还可支持将
DDL
和DML
的commit-ts
通过 ticdc-canal-json#tidb-extension-field 字段导出,令下游根据 ts 还原事务上下文。这类流程需要自定义 Flink 插件实现,不在本 demo 的展示范围。
假如可以保证单表数据源的事务线性一致性,且写入当量可控,则线性一致的单表 IMV 实现并不复杂。但如果涉及到多表 Join,则关联数据造成的写入放大就很难控制(极端情况例如笛卡尔积),对于事务的相关处理也需要考虑多方场景。通常的 OLAP 场景都存有基本假设:
- 或维度表不更新
- 或维度表写入模式为
Insert Only
- 或关联数据写放大可控
- 或数据源保证线性一致性
- 或数据源严格依照事务顺序排序
- 或数据源可根据事务相关序列标识(例如 TiDB 体系的
commit-ts
)回溯上下文
- 或仅考虑最终一致性
- 或不考虑一致性
本文中给出的 Join 实现较为简单,仅令维度表写入模式为 Insert Only
,数据关联时则令下游直接回溯上游 TiDB 原始数据。
Usage
ENV
install basic components
- install
mysql
,python3
,git
,java-1.8.0-openjdk
,maven
,docker
,golang
pip3 install requests mysql-connector-python minio
- disable
firewalld
sudo root
prepare source code
1 | mkdir ${self_deploy_path} |
prepare hudi
- set
JAVA_HOME
to jdk 1.8 like/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.372.b07-2.el9.x86_64
1 | cd ${self_demo_scripts_path} |
prepare flink
1 | flink_repo_path=${self_deploy_path}/flink |
prepare mysql
1 | curl https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/8.0.20/mysql-connector-java-8.0.20.jar -o ${self_demo_env_libs}/mysql-connector-java-8.0.20.jar |
CMD
deploy
- use
12345
as the start port for different components - use
release-7.1
as the version of tidb cluster - set a
compose_project_name
for docker compose env. otherwisepingcap
will be the default value
1 | cd ${self_demo_scripts_path} |
show env vars info
1 | cd ${self_demo_scripts_path} |
list etl jobs
1 | ./setup-demo.py --cmd list_etl_jobs |
list ticdc jobs
1 | ./setup-demo.py --cmd list_ticdc_jobs |
list flink jobs
1 | ./setup-demo.py --cmd list_flink_jobs |
list kafka topics
1 | ./setup-demo.py --cmd list_kafka_topics |
remove etl job
1 | ./setup-demo.py --cmd rm_etl_job --etl_job_id etl1 |
remove ticdc job
1 | ./setup-demo.py --cmd rm_ticdc_job --cdc_changefeed_id etl1-sink-1 |
remove hdfs dir
1 | ./setup-demo.py --cmd rm_hdfs_dir --hdfs_url=hdfs://namenode:8020/a/b/c |
remove kafka topic
1 | ./setup-demo.py --cmd rm_kafka_topic --kafka_topic etl3-sink-3 |
remove flink job
1 | ./setup-demo.py --cmd rm_flink_job --flink_job_id 8f488441a8c155eca62655927cb1c02e |
remove all jobs
1 | ./setup-demo.py --cmd rm_all_jobs |
list all hobs
1 | ./setup-demo.py --cmd list_all_jobs |
down demo cluster
1 | cd ${self_demo_scripts_path} |
clean all
1 | ./setup-demo.py --cmd=clean |
display cluster
1 | ./setup-demo.py --cmd list_cluster_env |
Run With External TiDB Cluster
Required argv
- cdc_bin_path
- ticdc_addr
- tidb_addr
Optional argv
- dumpling_bin_path
1 | ./setup-demo.py --cmd sink_task --sink_task_desc="etl3.3.demo.t3" --sink_task_flink_schema_path ./example3/flink.sql.template --cdc_bin_path=/.../pingcap-lakehouse-demo/cdc --ticdc_addr=10.2.12.124:12354 --tidb_addr=10.2.12.124:12355 |
Tips
SET table.dml-sync=true;
is necessary if you need to execute DML statements synchronously. It is useful to load data from external files.create table ? with('connector'='kafka', ..., 'scan.startup.mode'='earliest-offset');
use earliest-offset mode to avoid losing data.SET sql-client.execution.result-mode=TABLEAU;
- The tableau mode is more like a traditional way which will display the results in the screen directly with a tableau format. The displaying content will be influenced by the query execution type (execution.type).
SET execution.runtime-mode='batch';
use batch mode to make flink agg sqlselect count() | min() | max()
only show the final result
kafka
- To run kafka without zookeeper, it can be using with Kafka Raft metadata mode: KRaft
Kafka 3.3.1
: Mark KRaft as Production Ready
Example1: Sink Incremental Data
create an example tidb table
1 | cd ${self_demo_scripts_path} |
create a template for flink and hudi like example/flink.sql.template
- necessary fields:
- ${kafka_topic}
- ${kafka_address}
- ${hdfs_address}
create sink table task
sink_task_desc
:format: etl_uid.table_id.db_name.table_name
. sink tidb table:ticdc -> kafka -> flink -> hudi
sink_task_flink_schema_path
: path to sql file include table schema for flink and hudi
1 | cd ${self_demo_scripts_path} |
setup-demo.py
自动填上相关字段并生成文件.tmp.flink.sink-{etl_uid}-{table_id}-{db}.{table_name}.sql
1 | create database demo_flink; |
- get new files:
.tmp.flink.sink-etl1-1-demo.t1.sql
;.tmp.ticdc-config-etl1-1-demo.t1.toml
insert record to tidb
1 | mysql -h 0.0.0.0 -P 12355 -u root -e "insert into demo.t1 values(9,9),(10,10)" |
use flink client to query demo_hudi.t1
, use flink client and get 2 incremental records (9,9),(10,10)
1 | cd ${self_demo_scripts_path} |
update demo.t1
1 | mysql -h 0.0.0.0 -P 12355 -u root -e "update demo.t1 set b=a*10;" |
use flink client and disable read.streaming.enabled
1 | Flink SQL> drop table demo_hudi.t1; |
Example2: ETL
sink tables and store parts of columns by partition
example2/tidb.sql
example2/flink.sql.template
- partition by
DATE_FORMAT(b, 'yyyyMMdd')
as partition INSERT INTO demo_hudi.t2(a,b,partition) (select a, b, DATE_FORMAT(b, 'yyyyMMdd') as partition from demo_flink.t2 where c is not null);
- partition by
1 | mysql -h 0.0.0.0 -P 12355 -u root < example2/tidb.sql |
create sink table etl job
1 | ./setup-demo.py --cmd sink_task --sink_task_desc="etl2.2.demo.t2" --sink_task_flink_schema_path ./example2/flink.sql.template |
insert 2 records (4,’4’) and (5, null)
1 | mysql -h 0.0.0.0 -P 12355 -u root -e "insert into demo.t2 (a,c) values (4,'4'),(5, null)" |
query by flink client
- ref
.tmp.flink.sink-etl2-2-demo.t2.sql
- ref content above
1 | ./run-flink-bash.sh /pingcap/demo/flink-sql-client.sh |
Example3: Consistency
sink historical and incremental data of table to flink then hudi
example3/tidb.sql
example3/flink.sql.template
- use filesystem of flink to upsert data into base kafka topic
- upsert incremental data from self-defined kafka into base kafka topic
- create
hoodie_stream_write
and upsert incremental data from base kafka topic
1 | mysql -h 0.0.0.0 -P 12355 -u root < example3/tidb.sql |
1 | ./run-flink-bash.sh /pingcap/demo/flink-sql-client.sh |
Example4: Consistency Aggregation Materialized View
sink tables and calculate incremental aggregation result
example4/tidb.sql
example4/flink.sql.template
- dump data of table
demo.t4
into csv file - load csv file into kafka topic
etl4-sink-4_base
- sink incremental data to
etl4-sink-4
- sink topic
etl4-sink-4
into topicetl4-sink-4_base
- ticdc -> kafka -> flink streaming aggregation -> hudi:
insert into demo_hudi.t4_agg (c,d) (select b, sum(a) from demo_flink.t4 group by b);
- dump data of table
create sink table etl job
1 | ./setup-demo.py --cmd sink_task --sink_task_desc="etl4.4.demo.t4" --sink_task_flink_schema_path ./example4/flink.sql.template |
run tidb sql
1 | mysql -h 0.0.0.0 -P 12355 -u root < example4/tidb.sql |
1 | ./run-flink-bash.sh /pingcap/demo/flink-sql-client.sh |
Example5: Consistency Inner Join & Aggregation Materialized View
join & aggregation
example5/tidb.sql
example5/flink.sql.template
- create
build
table(INSERT ONLY
) andprob
table - use flink to maintain materialized view about
select agg(...) from build inner join prob on ... group by ...
- add filed
proctime as PROCTIME()
to the schema ofprob
table prob inner join ... FOR SYSTEM_TIME AS OF prob.proctime as build on prob.b = build.b ...)
: makeprob
table always use the latestbuild
table by jdbc connector.
- add filed
1 | mysql -h 0.0.0.0 -P 12355 -u root < example5/tidb.sql |
1 | ./run-flink-bash.sh |
Example6: Use S3 As Backend
Use s3 as storage backend for example3
example3-s3/tidb.sql
example3-s3/flink.sql.template
Use s3 as storage backend for example4
example4-s3/tidb.sql
example4-s3/flink.sql.template
Set flink state dir and hudi storage path in template file
1 | ... |
Test ticdc --> s3(minio) --> inner join two modifiable table
- Run
mysql -h ? -P ? -u root < example5/tidb.sql
- Let TiCDC sink table to S3 storage with option
--cdc_sink_s3
:./setup-demo.py --cmd sink_task --sink_task_desc="etl5.5.demo.t5" --sink_task_flink_schema_path ./example5/flink.sql.template --cdc_sink_s3 --cdc_config_file=?
- Run
ticdc/storage-consumer/main.go
:./ticdc/storage-consumer/bin/storage-consumer -upstream-uri s3://etl5-sink-5/ticdc-sink?protocol=canal-json&endpoint=http://${s3_address}&access-Key=minioadmin&secret-Access-Key=minioadmin&enable-tidb-extension=true -flush-interval=500ms -log-level=debug -etl-tables=demo.t5,demo.t5_build -test-str demo.t5.b = demo.t5_build.b
- check
checkpoint-ts
by interval500ms
- select related data by condition
demo.t5.b = demo.t5_build.b
when any record of tabledemo.t5
ordemo.t5_build
is modified.
- check
Integration Test
Process
- create a table and start to update it continuously
- wait for a few seconds and create the etl task to sink the table data into hudi
- check data consistency between tidb and hudi
Incremental Consistency Test
1 | ./test-consistency.py |
Incremental Aggregation Consistency Test
1 | ./test-consistency-agg.py |
1 | ./test-consistency-agg-s3.py |
Incremental Inner Join Aggregation Consistency Test
1 | ./test-consistency-join.py |
Further Implementation
数据源 exactly-once
一致性:
- 目前 TiCDC 仅提供
at-least-once
级别的数据源输入,以下 2 种典型场景需建立额外的机制来提供exactly-once
语义:- 同步数据至 Kafka:开启
enable-tidb-extension
,下游算子将 TIDB_WATERMARK::watermarkTs 纳入数据源状态,过滤重复数据。- 涉及到多数据流 Join 场景,需要参照 watermarkTs 构建 barrier 并对齐。
- 同步数据至外部存储:开启
enable-tidb-extension
,下游将 metadata/checkpoint-ts 纳入数据源状态,过滤重复数据。- 涉及到多表关联的场景,建议把相关的表配置在同个 ticdc 同步任务中以便于共享 metadata,需根据 checkpoint-ts 构建 barrier 并对齐。
- 同步数据至 Kafka:开启
目前 Demo 应用依赖外部组件:Flink,Kafka(Zookeeper),Hudi(Hadoop 相关各组件)。需要优化整体架构:
- Flink 作为一个通用且成熟的流处理平台,外部依赖以及内部运行机制均较为繁重,产品化时需舍弃 Flink
- Java 系产品与其他语言亲和度较弱,不利于当前的 TiDB 社区生态
- 需要人力去维护和排查 Flink 自身可能带来的问题
- TiSpark 之类项目前车之鉴,非与 TiDB 体系紧密耦合的项目终陷入死局
- Hudi 依赖 Hadoop 和 Spark 生态体系,产品化时是否需要令外部感知到?是否需要舍弃 Hudi 而是用 TiDB 生态内部的存储系统?
- TiDB 体系可结合自身需求,实现轻量级的流式处理框架:
- 参考 Flink 的 Checkpointing 机制实现分布式容错
- 类似 Flink 流式状态存储和管理体系,可由 TiKV 提供的分布式 KV 存储和事务体系实现
- 参考 Flink 实现端到端 exactly-once 一致性
- 构建维护 增量物化视图 的基本原理与上文类似
- 存量数据和增量数据可以直接在物化视图中分别处理再合并,保证最终结果线性一致
- 参考 Flink 的 Checkpointing 机制实现分布式容错
- 实现轻量级的数据同步流程,去掉对于 Kafka 的依赖
- 类 cdc 模块将增量数据持续写入到外部可信任的存储系统中,由使用方控制消费过程(同步数据到存储服务)
- ticdc 中 Canal-JSON 和 Open Protocol 两种协议支持当开启 Old Value 功能时 (
enable-old-value = true
),输出更新事件的旧值。
- ticdc 中 Canal-JSON 和 Open Protocol 两种协议支持当开启 Old Value 功能时 (
- 其他可考虑的方案:侵入式改造 TiDB 的事务模型,按需将 MV 相关的数据和事件纳入到完整的事务流程中,去掉对于 TiCDC,Kafka,Flink 等外部组件的依赖
- 类 cdc 模块将增量数据持续写入到外部可信任的存储系统中,由使用方控制消费过程(同步数据到存储服务)
IMV 一致性
- 增量方式维护状态机的关键步骤是感知新旧数据的变化,对于 TiDB 这类满足全局线性一致性事务的系统而言,就是不同 timestamp 下的数据差异
- resolved-ts 或者 ticdc 提供的 watermark-ts 可以很自然地作为推进 checkpoint 的 barrier
- 处理多表或者单表多分区数据时,可以对齐 barrier,并对同个 checkpoint 内的增量数据按照 commit-ts 排序后依次处理
- 处理多表 JOIN 时,任一表的改动均需要找出与之关联的其他表数据,并还原出 JOIN 前后的数据变化
- 例如
t1 inner join t2 on t1.a = t2.a
,t1
的某一行(假设 primary key 为k1
) 在ts_n
时刻字段a
发生改动从a1
变成a2
。- 找出
t2
在ts_n - 1
时刻字段a
为a1
的所有数据,找出t1
在ts_n - 1
时刻 primary key 为k1
的数据,执行 JOIN 操作后得到结果集j1
- 找出
t2
在ts_n
时刻字段a
为a2
的所有数据,找出t1
在ts_n
时刻 primary key 为k1
的数据,执行 JOIN 操作后得到结果集j2
j1
和j2
之间的数据差异才能用于维护增量状态机
- 找出
- 例如
t1 inner join t2 on t1.a = t2.a
,t1
的某一行(假设 primary key 为k1
) 在ts_n
时刻字段a
发生改动从a1
变成a2
,t2
的某一行(假设 primary key 为k2
) 在ts_n
时刻字段a
发生改动从a3
变成a4
- 找出
t2
在ts_n - 1
时刻字段a
为a1
的所有数据,找出t1
在ts_n - 1
时刻 primary key 为k1
的数据,执行 JOIN 操作后得到结果集j1_1
- 找出
t1
在ts_n - 1
时刻字段a
为a3
的所有数据,找出t2
在ts_n - 1
时刻 primary key 为k2
的数据,执行 JOIN 操作后得到结果集j1_2
j1_1
和j1_2
可能存在重复数据,按照t1
,t2
组合的 primary key 去重合并后得到j1
- 同理找出
ts_n
时刻的结果集j2
,j1
和j2
之间的数据差异用于维护增量状态机;
- 找出
- 例如
- 处理多表 JOIN 时,任一表的改动均需要找出与之关联的其他表数据,并还原出 JOIN 前后的数据变化
数据存储
- 为了便于兼容云原生架构,需要用兼容云上存储体系的存储系统替换 Hadoop HDFS
- MinIO 是一个轻量级的高性能对象存储系统,兼容 Amazon S3 API
- RELEASE.2021-05-11T23-27-41Z 开始使用 AGPL-3.0 协议,小于等于 RELEASE.2021-04-22T15-44-28Z 版本使用 Apache-2.0 协议
- 可以令云下用户自己部署维护 minio,TiDB 内部仅使用 S3 协议接口
- MinIO 是一个轻量级的高性能对象存储系统,兼容 Amazon S3 API