Materialized View Simple Demo | TiDB

Ref What is a materialized view?: A materialized view is a pre-computed data set derived from a query specification (the SELECT in the view definition) and stored for later use. Because the data is pre-computed, querying a materialized view is faster than executing a query against the base table of the view. This performance difference can be significant when a query is run frequently or is sufficiently complex. As a result, materialized views can speed up expensive aggregation, projection, and selection operations, especially those that run frequently and that run on large data sets.

本文记录一项基于 Flink / Hudi / Kafka / TiCDC 以及 TiDB 构建 Materialized View(物化视图,aka MV)的简易 demo。

相对于普通视图,MV 需要解决几个问题:

  • 数据一致性:TiDB 可以凭借全局唯一 Time Stamp Oracle(aka TSO) 来保障事务线性一致性。如果将 TiDB 作为数据源接入到第三方系统,需保障外部应用何种级别的一致性(无一致性,最终一致,线性一致)?如何解决多元异构数据源带来的一致性问题?
  • 数据存储:如何支持可更新数据的存储?如何构建 MV 并维护状态?
  • 数据加工:ETL(Extract Transform Load)流程是 OLAP 领域必不可少的一环,较常见的有拼宽表,字段过滤,类型转换,数据分区等。如何令 MV 支持可定制化的 ETL?如何以 SQL(Structured Query Language)形式支持外部定义?

Implementation

Flink 是目前业内广泛应用的流批一体处理平台,支持 SQL 的形式定义流式计算任务,为了方便搭建 demo,文中借助 Flink 来实现 ETL,构建并维护 MV。

Checkpointing

  • Flink 中的 checkpoint 表示应用(FlinkJob)的一致性快照,主要由 2 部分组成:应用的当前状态(相当于内部所有算子状态快照集合);输入数据流中的偏移量;
  • Flink 按照规则和配置周期性生成 checkpoint,然后将 checkpoint 写入到可靠的存储中(通常为分布式文件系统);将 checkpoint 写入存储为异步操作,也就意味着 Flink 应用的处理流程不会被 checkpointing 机制阻塞;
  • 当异常发生时,Flink 使用最近一次完成的 checkpoint 作为状态的初始点来重启应用;
  • Flink 1.3 开始支持在 rocksdb 状态后端中使用 增量快照(incremental checkpoint),可优化大状态(GB,TB 级状态)的场景;
  • 参考配置
    • state.backend:hashmap 或 rocksdb
    • state.checkpoints.dir
    • execution.checkpointing.interval
    • execution.checkpointing.mode
    • state.backend.incremental
    • state.backend.rocksdb.localdir:参考 state-location-in-rocksdb,建议使用本地 SSD 磁盘来提升吞吐量;
Checkpointing 实现

flink/flink-docs-master/fig/stream_barriers.svg

Flink 使用 Chandy-Lamport 算法的一种变体,称为异步屏障快照(asynchronous barrier snapshotting)。

  • 当 task manager 接收到 job manager 的命令开启一个 checkpoint,它会让所有数据源记录自身的偏移量,并在数据流中插入按序编号的 checkpoint barriers,这些 barrier 流过作业图,用于指示 checkpoint 的边界;作业图中的每个算子接收到 barrier 时,会记录自身的状态;

  • checkpoint n 包含每个算子处理完 barrier n 之前所有数据和事件后的状态;

  • 对于有多个输入流的算子,则会进行 barrier alignment 操作来对齐 barrier flink_stream_aligning

    • 对齐 barrier 可以减少状态快照的数据量;如图中所示,输入流在 barrier n 位置前均无待处理的数据,算子状态快照也就无需保存这类信息
    • 对齐操作缺点:
      • 增大延迟(快的 barrier 需阻塞等待),拖慢 checkpointing 流程
      • 面向类似双流 equal-join 的场景,可能因为输入流延迟不同而导致行为不符合预期
        • 例如预期被处理的数据对分别为 (1,a) (2,b) (3,c)。对齐 barrier 之后,则变成 (1,e) (2,f) (3,g)
  • barrier 这类抽象可以引申为 checkpoint 的边界;明确的边界可以有效地避免冗余;面向 unaligned barrier 这类无明确边界的场景,则需要在状态快照中额外保存冗余数据:flink_unaligned_barrier

    • 在 barrier 对齐的场景中,barrier 必须遵循其在数据流中的位置,算子需要等待 barrier 被实际处理才开始快照;对于 barrier 不对齐的场景,当任一数据流的 barrier 到达输入缓存,算子立即保存 3 部分数据至 checkpoint;然后将 barrier 优先放到输出缓存的队首;需保存的数据如下(例如图中绿色部分):
      • 所有数据流中 barrier 之前的全部未处理数据;
        • 如果 barrier 尚未出现,需阻塞等待,因为此时数据流已持久化的 checkpoint 序号一定小于 barrier,异常后无法恢复;如果要处理该场景,则需要自定义应用的 checkpoint 机制,令数据流自身的 checkpoint 可独立控制;
      • 算子输入缓存中 barrier 之前的全部未处理数据;
      • 算子输出缓存中的数据;
    • 不对齐 barrier 可以避免对齐造成的阻塞等待
    • 从 checkpoint 恢复时,不对齐的数据会被 checkpoint 恢复到相应的数据队列中,所以依然能提供 exactly-once 保证
    • 不对齐 barrier 的缺点:
      • 由于要持久化冗余数据,加重存储负载
      • 随着 checkpoint 增大,可能增大作业恢复时间
      • 适合容易产生高反压的复杂作业,但是对于像数据 ETL 同步等简单作业,可以选择轻量级的对齐处理。
        • 反压是在实时数据处理中,数据管道某个节点上游产生数据的速度大于该节点处理数据速度的一种现象。

An Overview of End-to-End Exactly-Once Processing in Apache Flink

Flink 的一致性可以分为 3 个级别,由弱到强依次为:

  1. at-most-once:无一致性保障;
  2. at-least-once:数据或事件至少被处理一次(不丢失,有重复);下游可实现幂等机制来保障一致性;
  3. exactly-once:数据或事件只会被处理一次(不丢失,不重复);

每个组件都保证其自身的一致性,端到端的一致性级别取决于所有组件中一致性最弱的。倘若要实现 exactly-once 一致性,以最简单的端到端处理部分为例:

  • 总体分为 3 个部分,数据流向:source -> internal -> sink
  • source:需要外部源可指定数据读取的位置(例如 Kafka 消息队列)
    • 位置信息作为状态的一部分,异常恢复时可重新开始消费;
  • internalCheckpointing 机制;
  • sink:需要保证从故障恢复时,数据不会重复写入外部系统;sink 端有 2 种具体实现方式:
    • 幂等(Idempotent)写入:操作可以重复执行很多次,但只导致一次结果更改
    • 事务性(Transactional)写入:一个事务中的操作要么全部成功,要么全部失败
      • 应用程序中一系列严密的操作,所有操作必须成功完成,否则在每个操作中所作的所有更改都会被撤销;

sink 端事务性写入实现:

  • 外部 sink 系统必须支持事务性写入(构建的事务需要与 checkpoint 对应),等 checkpoint 完成时,才把所有对应的结果写入外部系统;
  • 对于事务性写入,具体有 2 种实现方式:
    • 预写日志(WAL)
      • 把结果数据保存到状态中作为 WAL,等 checkpoint 完成时再写入下游;
        • 具体的写入行为可视作基于 WAL 的幂等回放;
      • 该方式易于实现,但存在几个缺点:
        • 导致状态数据量增大,增加存储负载;
        • 写入需要等到 checkpoint 完成,会导致数据延迟;
    • 两阶段提交(2PC)
      • checkpoint 启动时开启 sink 系统的事务
      • 结果数据写入该事务,但不提交
      • 等 checkpoint 完成时,提交该事务,实现真正写入

2PC 实现:

  • 事务启动:外部 sink 系统开启一个事务,并与 checkpoint n 相关联
  • 事务预写:事务在 checkpoint n 范围内接受数据预写入
  • 事务提交:
    • sink 任务处理 barrier n 时,等事务可变成 可提交 的状态后,再保存状态快照。
    • 所有子任务状态快照保存完毕,表示 checkpoint n 已完成。
      • 出现异常后应用则恢复至该 checkpoint,sink 任务需要恢复该事务并提交。
    • 上游广播通知 checkpoint n 完成,sink 任务正式提交该事务。
    • 提交事务的行为必须是幂等操作。
      • Flink 从 checkpoint 故障恢复时总会尝试提交相关事务(通过冗余来实现容错,否则需要存储额外的信息表示事务状态)
  • 事务撤回

增量物化视图(Incremental Materialized View,aks IMV) 本质上等同于 Flink 中的一个有状态算子,以 Example4 为例:

  • ETL 定义的处理流程为 select b, sum(a) from ... group by b
  • 输入 source 为 Kafka 消息队列,输出 sink 系统为 Hudi
  • 算子状态包含以 b 字段为 Key,sum(a) 为 Value 的基本元素,状态数据量与 Key 数量呈正相关
  • sum() 聚合可通过累加中间结果得到最终值,因而能够增量维护
    • 输入数据为 upsert 类型,则累加 a 字段新旧值的差值
    • 输入数据为 delete 类型,则减去旧值
  • 增量维护 GROUP BY() 等价于增加一个 COUNT() 聚合算子,值为 0 时表示无数据。
  • 状态的实际存储后端可选:内存/文件/rocksdb
    • 建议配置为 rocksdb 后段,增量快照面对大状态 checkpointing 较为高效
  • 输入/输出系统满足 exactly-once 的基本要求,整体上该应用可保证 exactly-once 一致性

min() / max() 函数不支持直接累加,其状态机维护类似于 select col_?, count(*) from ? group by col_? order by col_? [desc] limit 1,当 count 为 0 时则删除相关元素。

avg() 函数则类似 select sum(col_?), count(col_?) from ?,对外输出时再执行算数除法。

distinct() 函数则类似 select count(*) from (select col_?, count(*) from ? group by col_?)

Join 场景下的 IMV 维护与上面类似,主要不同体现在数据源的构造和处理流程上。Flink 本身提供的 Join 类型大致分为:

  • Regular join
    • left join
    • right join
    • inner join(默认类型)
    • outer join
  • Interval Join
  • Temporal Join
  • Lookup Join(在 Example5 被使用)
  • Window Join

Interval JoinWindow Join 属于比较典型的流式计算场景下的功能,需要外部约束来保证数据一致性。

Storage

Hudi 作为下一代数据湖产品,加强了对于数据更新的支持。其不仅兼容 Hadoop 系的数仓生态,而且与 Flink 的衔接也较为完善。文中以 Hudi 和 HDFS 作为 MV 相关的数据存储系统。

Flink 相关参数

  • write.tasks:写任务的并行度
  • write.bucket_assign.tasks:当 index.typeFLINK_STATE 时,覆盖 write.tasks

Bucket Index: Flink 默认使用状态后端来存储文件索引(primary key -> fileId)。当输入数据量较大时,状态相关的开销容易成为性能瓶颈。BUCKET 的方式通过确定性 hash 算法来讲数据映射到 buckets 中,从而避免了建立索引过程中的存储/查询相关的开销。

  • index.typeFLINK_STATE(default), BUCKET
  • hoodie.bucket.index.hash.field
  • hoodie.bucket.index.num.buckets

Consistency

TiCDC 是一款 TiDB 增量数据同步工具,通过拉取上游 TiKV 的数据变更日志,TiCDC 可以将数据解析为有序的行级变更数据输出到下游。

  • 借助 Kafka 实现存量数据和增量数据的线性一致。该方式会引入额外的数据同步开销,实际场景中则可以通过存量数据构建 MV Base 部分,利用增量数据构建 MV Delta 部分。详见 Example3Example4Example5
    • 先选取构建 TiCDC 任务时的 TSO 作为快照点 S,令导出增量数据到 Topic I
    • 利用 Dumpling 导出 S 对应的存量数据并同步到新 Topic B,该步骤完成后再提交异步任务,将 I 的数据持续同步到 B
    • 下游 Flink 则只消费 B
    • IB 均只有 1 个 Partition 以保证线性一致性;
  • Option:理论上 TiCDC 还可支持将 DDLDMLcommit-ts 通过 ticdc-canal-json#tidb-extension-field 字段导出,令下游根据 ts 还原事务上下文。这类流程需要自定义 Flink 插件实现,不在本 demo 的展示范围。

假如可以保证单表数据源的事务线性一致性,且写入当量可控,则线性一致的单表 IMV 实现并不复杂。但如果涉及到多表 Join,则关联数据造成的写入放大就很难控制(极端情况例如笛卡尔积),对于事务的相关处理也需要考虑多方场景。通常的 OLAP 场景都存有基本假设:

  • 或维度表不更新
  • 或维度表写入模式为 Insert Only
  • 或关联数据写放大可控
  • 或数据源保证线性一致性
    • 或数据源严格依照事务顺序排序
    • 或数据源可根据事务相关序列标识(例如 TiDB 体系的 commit-ts)回溯上下文
  • 或仅考虑最终一致性
  • 或不考虑一致性

本文中给出的 Join 实现较为简单,仅令维度表写入模式为 Insert Only,数据关联时则令下游直接回溯上游 TiDB 原始数据。

Usage

ENV

install basic components

  • install mysql, python3, git, java-1.8.0-openjdk, maven, docker, golang
  • pip3 install requests mysql-connector-python minio
  • disable firewalld
  • sudo root

prepare source code

1
2
3
4
5
6
7
8
9
10
11
12
mkdir ${self_deploy_path}
cd ${self_deploy_path}
git clone https://github.com/apache/hudi.git
cd hudi
git checkout release-0.12.3
cd ${self_deploy_path}
git clone https://github.com/solotzg/__tmp.git
self_demo_scripts_path=${self_deploy_path}/__tmp
cd ${self_deploy_path}
git clone https://github.com/apache/flink.git
cd flink
git checkout release-1.13.6

prepare hudi

  • set JAVA_HOME to jdk 1.8 like /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.372.b07-2.el9.x86_64
1
2
3
4
5
6
7
8
cd ${self_demo_scripts_path}
mkdir .tmp.env_libs
export self_demo_env_libs=${self_demo_scripts_path}/.tmp.env_libs
JAVA_HOME="xxxxxx" ./setup-demo.py --cmd compile_hudi --hudi_repo=${self_deploy_path}/hudi
cp ${self_deploy_path}/hudi/packaging/hudi-flink-bundle/target/hudi-flink1.13-bundle-0.12.3.jar ${self_demo_env_libs}

curl https://repo.maven.apache.org/maven2/com/amazonaws/aws-java-sdk/1.10.34/aws-java-sdk-1.10.34.jar -o ${self_demo_env_libs}/aws-java-sdk-1.10.34.jar
curl https://repo.maven.apache.org/maven2/org/apache/hadoop/hadoop-aws/2.7.3/hadoop-aws-2.7.3.jar -o ${self_demo_env_libs}/hadoop-aws-2.7.3.jar

prepare flink

1
2
3
4
5
6
7
flink_repo_path=${self_deploy_path}/flink
cd ${flink_repo_path}/flink-connectors/flink-sql-connector-kafka
mvn clean package -DskipTests -Drat.skip=true
cp ${flink_repo_path}/flink-connectors/flink-sql-connector-kafka/target/flink-sql-connector-kafka_2.11-1.13.6.jar ${self_demo_env_libs}
cd ${flink_repo_path}/flink-connectors/flink-connector-jdbc
mvn clean package -DskipTests -Drat.skip=true
cp ${flink_repo_path}/flink-connectors/flink-connector-jdbc/target/flink-connector-jdbc_2.11-1.13.6.jar ${self_demo_env_libs}

prepare mysql

1
curl https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/8.0.20/mysql-connector-java-8.0.20.jar -o ${self_demo_env_libs}/mysql-connector-java-8.0.20.jar

CMD

deploy

  • use 12345 as the start port for different components
  • use release-7.1 as the version of tidb cluster
  • set a compose_project_name for docker compose env. otherwise pingcap will be the default value
1
2
cd ${self_demo_scripts_path}
./setup-demo.py --cmd deploy_hudi_flink_tidb --start_port=12345 --env_libs=${self_demo_env_libs} --hudi_repo=${self_deploy_path}/hudi --tidb_branch=release-7.1 --compose_project_name=${self_project_name}

show env vars info

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
cd ${self_demo_scripts_path}
./setup-demo.py --cmd show_env_vars_info

{
"flink_jobmanager_port": 12345,
"hadoop_web_port": 12346,
"hdfs_port": 12347,
"historyserver_port": 12348,
"hiveserver_port": 12349,
"kafka_port": 12350,
"spark_master_port": 12351,
"spark_web_port": 12352,
"HUDI_WS": "/.../demo/hudi",
"pingcap_demo_path": "/.../demo/pingcap-lakehouse",
"env_libs": "/.../demo/pingcap-lakehouse/.tmp.env_libs",
"hufi-flink-compose": "/.../demo/pingcap-lakehouse/.tmp.docker-compose_hadoop_hive_spark_flink.yml",
"hudi_flink_running": true,
"pd_port": 12353,
"ticdc_port": 12354,
"tidb_port": 12355,
"tikv_status_port": 12356,
"TIDB_BRANCH": "release-7.1",
"tidb-compose": "/.../demo/pingcap-lakehouse/.tmp.tidb-cluster.yml",
"tidb_running": true,
"demo_host": "10.2.12.125",
"start_port": 12345,
"JAVA_HOME": "/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.372.b07-2.el9.x86_64",
"etl_jobs": {},
"COMPOSE_PROJECT_NAME": "pingcap",
"LAKE_HOUSE_COMPOSE_PROJECT_NAME": "pingcap_lakehouse",
"TIDB_COMPOSE_PROJECT_NAME": "pingcap_tidb",
"hudi_compiled": "2023-07-03 13:49:42.261539",
"FLINK_VERSION": "1.13",
"KAFKA_VERSION": "2.4.0"
}

list etl jobs

1
./setup-demo.py --cmd list_etl_jobs

list ticdc jobs

1
./setup-demo.py --cmd list_ticdc_jobs
1
./setup-demo.py --cmd list_flink_jobs

list kafka topics

1
./setup-demo.py --cmd list_kafka_topics

remove etl job

1
./setup-demo.py --cmd rm_etl_job --etl_job_id etl1

remove ticdc job

1
./setup-demo.py --cmd rm_ticdc_job --cdc_changefeed_id etl1-sink-1

remove hdfs dir

1
./setup-demo.py --cmd rm_hdfs_dir --hdfs_url=hdfs://namenode:8020/a/b/c

remove kafka topic

1
./setup-demo.py --cmd rm_kafka_topic --kafka_topic etl3-sink-3
1
./setup-demo.py --cmd rm_flink_job --flink_job_id 8f488441a8c155eca62655927cb1c02e

remove all jobs

1
./setup-demo.py --cmd rm_all_jobs

list all hobs

1
./setup-demo.py --cmd list_all_jobs

down demo cluster

1
2
3
4
5
6
cd ${self_demo_scripts_path}
./setup-demo.py --cmd=down_tidb
./setup-demo.py --cmd=down_hudi_flink

# If you need to down tidb, hudi, flink at same time, use:
./setup-demo.py --cmd=down

clean all

1
./setup-demo.py --cmd=clean

display cluster

1
./setup-demo.py --cmd list_cluster_env

Run With External TiDB Cluster

Required argv

  • cdc_bin_path
  • ticdc_addr
  • tidb_addr

Optional argv

  • dumpling_bin_path
1
2
./setup-demo.py --cmd sink_task --sink_task_desc="etl3.3.demo.t3" --sink_task_flink_schema_path ./example3/flink.sql.template --cdc_bin_path=/.../pingcap-lakehouse-demo/cdc --ticdc_addr=10.2.12.124:12354 --tidb_addr=10.2.12.124:12355
./setup-demo.py --cmd rm_all_jobs --cdc_bin_path=/.../pingcap-lakehouse-demo/cdc --ticdc_addr=10.2.12.124:12354 --tidb_addr=10.2.12.124:12355 --dumpling_bin_path=/tmp/bin/dumpling

Tips

Flink SQL Client:

  • SET table.dml-sync=true; is necessary if you need to execute DML statements synchronously. It is useful to load data from external files.
  • create table ? with('connector'='kafka', ..., 'scan.startup.mode'='earliest-offset'); use earliest-offset mode to avoid losing data.
  • SET sql-client.execution.result-mode=TABLEAU;
    • The tableau mode is more like a traditional way which will display the results in the screen directly with a tableau format. The displaying content will be influenced by the query execution type (execution.type).
  • SET execution.runtime-mode='batch'; use batch mode to make flink agg sql select count() | min() | max() only show the final result

kafka

  • To run kafka without zookeeper, it can be using with Kafka Raft metadata mode: KRaft
    • Kafka 3.3.1: Mark KRaft as Production Ready

Example1: Sink Incremental Data

create an example tidb table

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
cd ${self_demo_scripts_path}
# tidb_port: 12355
mysql -h 0.0.0.0 -P 12355 -u root < example/tidb.sql

mysql -h 0.0.0.0 -P 12355 -u root -e "select * from demo.t1"
+---+------+
| a | b |
+---+------+
| 1 | 1 |
| 2 | 2 |
| 3 | 3 |
| 4 | 4 |
+---+------+

cat example/tidb.sql

create database IF NOT EXISTS demo;
create table IF NOT EXISTS demo.t1(a int PRIMARY KEY, b int);
insert into demo.t1 values(1,1),(2,2),(3,3);
insert into demo.t1 (select max(a)+1,max(a)+1 from demo.t1);

create a template for flink and hudi like example/flink.sql.template

  • necessary fields:
    • ${kafka_topic}
    • ${kafka_address}
    • ${hdfs_address}

create sink table task

  • sink_task_desc: format: etl_uid.table_id.db_name.table_name. sink tidb table: ticdc -> kafka -> flink -> hudi
  • sink_task_flink_schema_path: path to sql file include table schema for flink and hudi
1
2
cd ${self_demo_scripts_path}
./setup-demo.py --cmd=sink_task --sink_task_desc="etl1.1.demo.t1" --sink_task_flink_schema_path="${self_demo_scripts_path}/example/flink.sql.template" --env_libs=${self_demo_env_libs}
  • setup-demo.py 自动填上相关字段并生成文件 .tmp.flink.sink-{etl_uid}-{table_id}-{db}.{table_name}.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
create database demo_flink;
set execution.checkpointing.interval = 2sec;
create table demo_flink.t1(a int PRIMARY KEY, b int) with('connector'='kafka', 'topic'='etl1-sink-1', 'properties.bootstrap.servers'='kafkabroker:9092', 'properties.group.id'='pingcap-demo-group', 'format'='canal-json', 'scan.startup.mode'='latest-offset');
create database demo_hudi;
CREATE TABLE demo_hudi.t1(
a INT PRIMARY KEY NOT ENFORCED,
b INT
) WITH (
'connector' = 'hudi',
'path' = 'hdfs://namenode:8020/pingcap/demo/hudi/etl1-sink-1',
'table.type' = 'MERGE_ON_READ',
'read.streaming.enabled' = 'true',
'read.streaming.check-interval' = '1'
);
INSERT INTO demo_hudi.t1(a,b) (select a,b from demo_flink.t1);
  • get new files: .tmp.flink.sink-etl1-1-demo.t1.sql; .tmp.ticdc-config-etl1-1-demo.t1.toml

insert record to tidb

1
2
3
4
mysql -h 0.0.0.0 -P 12355 -u root -e "insert into demo.t1 values(9,9),(10,10)"
.
..
...

use flink client to query demo_hudi.t1, use flink client and get 2 incremental records (9,9),(10,10)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
cd ${self_demo_scripts_path}
./run-flink-bash.sh /pingcap/demo/flink-sql-client.sh

Flink SQL> create database demo_hudi;
[INFO] Execute statement succeed.

Flink SQL> CREATE TABLE demo_hudi.t1(
> a INT PRIMARY KEY NOT ENFORCED,
> b INT
> ) WITH (
> 'connector' = 'hudi',
> 'path' = 'hdfs://namenode:8020/pingcap/demo/hudi/etl1-sink-1',
> 'table.type' = 'MERGE_ON_READ',
> 'read.streaming.enabled' = 'true',
> 'read.streaming.check-interval' = '1'
> );
>
[INFO] Execute statement succeed.
Flink SQL> set sql-client.execution.result-mode = tableau;
[INFO] Session property has been set.

Flink SQL> select * from demo_hudi.t1;
+----+-------------+-------------+
| op | a | b |
+----+-------------+-------------+
| +I | 9 | 9 |
| +I | 10 | 10 |

update demo.t1

1
2
3
4
5
6
7
8
9
10
11
mysql -h 0.0.0.0 -P 12355 -u root -e "update demo.t1 set b=a*10;"
mysql -h 0.0.0.0 -P 12355 -u root -e "delete from demo.t1 where a < 3"
mysql -h 0.0.0.0 -P 12355 -u root -e "select * from demo.t1"
+----+------+
| a | b |
+----+------+
| 3 | 30 |
| 4 | 40 |
| 9 | 90 |
| 10 | 100 |
+----+------+

use flink client and disable read.streaming.enabled

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
Flink SQL> drop table demo_hudi.t1;
[INFO] Execute statement succeed.

Flink SQL> CREATE TABLE demo_hudi.t1(
> a INT PRIMARY KEY NOT ENFORCED,
> b INT
> ) WITH (
> 'connector' = 'hudi',
> 'path' = 'hdfs://namenode:8020/pingcap/demo/hudi/etl1-sink-1',
> 'table.type' = 'MERGE_ON_READ'
> );
[INFO] Execute statement succeed.

Flink SQL> select * from demo_hudi.t1;
+----+-------------+-------------+
| op | a | b |
+----+-------------+-------------+
| +I | 1 | 10 |
| +I | 2 | 20 |
| +I | 3 | 30 |
| +I | 4 | 40 |
| +I | 9 | 90 |
| +I | 10 | 100 |
+----+-------------+-------------+
Received a total of 6 rows

-- after "delete from demo.t1 where a < 3"

Flink SQL> select * from demo_hudi.t1;
+----+-------------+-------------+
| op | a | b |
+----+-------------+-------------+
| +I | 3 | 30 |
| +I | 4 | 40 |
| +I | 9 | 90 |
| +I | 10 | 100 |
+----+-------------+-------------+
Received a total of 4 rows

Example2: ETL

sink tables and store parts of columns by partition

  • example2/tidb.sql
  • example2/flink.sql.template
    • partition by DATE_FORMAT(b, 'yyyyMMdd') as partition
    • INSERT INTO demo_hudi.t2(a,b,partition) (select a, b, DATE_FORMAT(b, 'yyyyMMdd') as partition from demo_flink.t2 where c is not null);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
mysql -h 0.0.0.0 -P 12355 -u root < example2/tidb.sql
mysql -h 0.0.0.0 -P 12355 -u root -e "select *from demo.t2"
+---+---------------------+------+
| a | b | c |
+---+---------------------+------+
| 1 | 2023-06-13 05:07:40 | 1 |
| 2 | 2023-06-13 05:07:40 | 2 |
| 3 | 2023-06-13 05:07:40 | NULL |
+---+---------------------+------+
mysql -h 0.0.0.0 -P 12355 -u root -e "update demo.t2 set c=a*10"
mysql -h 0.0.0.0 -P 12355 -u root -e "select * from demo.t2"
+---+---------------------+------+
| a | b | c |
+---+---------------------+------+
| 1 | 2023-06-13 05:07:40 | 10 |
| 2 | 2023-06-13 05:07:40 | 20 |
| 3 | 2023-06-13 05:07:40 | 30 |
| 4 | 2023-06-13 05:19:52 | 40 |
| 5 | 2023-06-13 05:19:52 | 50 |
+---+---------------------+------+

create sink table etl job

1
./setup-demo.py --cmd sink_task --sink_task_desc="etl2.2.demo.t2" --sink_task_flink_schema_path ./example2/flink.sql.template

insert 2 records (4,’4’) and (5, null)

1
mysql -h 0.0.0.0 -P 12355 -u root -e "insert into demo.t2 (a,c) values (4,'4'),(5, null)"

query by flink client

  • ref .tmp.flink.sink-etl2-2-demo.t2.sql
  • ref content above
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
./run-flink-bash.sh /pingcap/demo/flink-sql-client.sh

Flink SQL> create database demo_hudi;
[INFO] Execute statement succeed.

Flink SQL> CREATE TABLE demo_hudi.t2(
> a INT PRIMARY KEY NOT ENFORCED,
> b TIMESTAMP(3),
> `partition` VARCHAR(20)
> )
> PARTITIONED BY (`partition`)
> WITH (
> 'connector' = 'hudi',
> 'path' = 'hdfs://namenode:8020/pingcap/demo/hudi_t2',
> 'table.type' = 'MERGE_ON_READ'
> );
[INFO] Execute statement succeed.

Flink SQL> set sql-client.execution.result-mode = tableau;
[INFO] Session property has been set.

-- record with a = 5 and c is null will be filtered

Flink SQL> select * from demo_hudi.t2;
2023-06-13 05:25:38,525 INFO org.apache.hadoop.conf.Configuration.deprecation [] - mapred.job.map.memory.mb is deprecated. Instead, use mapreduce.map.memory.mb
+----+-------------+-------------------------+--------------------------------+
| op | a | b | partition |
+----+-------------+-------------------------+--------------------------------+
| +I | 4 | 2023-06-13 05:19:52.000 | 20230613 |
+----+-------------+-------------------------+--------------------------------+
Received a total of 1 row

-- after "update demo.t2 set c=a*10"

Flink SQL> select * from demo_hudi.t2;
+----+-------------+-------------------------+--------------------------------+
| op | a | b | partition |
+----+-------------+-------------------------+--------------------------------+
| +I | 1 | 2023-06-13 05:07:40.000 | 20230613 |
| +I | 2 | 2023-06-13 05:07:40.000 | 20230613 |
| +I | 3 | 2023-06-13 05:07:40.000 | 20230613 |
| +I | 4 | 2023-06-13 05:19:52.000 | 20230613 |
| +I | 5 | 2023-06-13 05:19:52.000 | 20230613 |
+----+-------------+-------------------------+--------------------------------+
Received a total of 5 rows

Example3: Consistency

sink historical and incremental data of table to flink then hudi

  • example3/tidb.sql
  • example3/flink.sql.template
    • use filesystem of flink to upsert data into base kafka topic
    • upsert incremental data from self-defined kafka into base kafka topic
    • create hoodie_stream_write and upsert incremental data from base kafka topic
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
mysql -h 0.0.0.0 -P 12355 -u root < example3/tidb.sql
mysql -h 0.0.0.0 -P 12355 -u root -e "select *from demo.t3"
+---+------+
| a | b |
+---+------+
| 1 | 1 |
| 2 | 2 |
| 3 | 3 |
| 4 | 4 |
+---+------+
mysql -h 0.0.0.0 -P 12355 -u root -e "update demo.t3 set b=a*10"
mysql -h 0.0.0.0 -P 12355 -u root -e "select * from demo.t3"
+---+------+
| a | b |
+---+------+
| 1 | 10 |
| 2 | 20 |
| 3 | 30 |
| 4 | 40 |
+---+------+
mysql -h 0.0.0.0 -P 12355 -u root -e "delete from demo.t3 where a < 3"
mysql -h 0.0.0.0 -P 12355 -u root -e "select* from demo.t3"
+---+------+
| a | b |
+---+------+
| 3 | 30 |
| 4 | 40 |
+---+------+
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
./run-flink-bash.sh /pingcap/demo/flink-sql-client.sh

Flink SQL> create database demo_hudi;
>
[INFO] Execute statement succeed.

Flink SQL> CREATE TABLE demo_hudi.t3(
> a INT PRIMARY KEY NOT ENFORCED,
> b INT
> ) WITH (
> 'connector' = 'hudi',
> 'path' = 'hdfs://namenode:8020/pingcap/demo/etl3-sink-3',
> 'table.type' = 'MERGE_ON_READ'
> );
>
[INFO] Execute statement succeed.

Flink SQL> set sql-client.execution.result-mode = tableau;
[INFO] Session property has been set.

-- read 4 records which are historical data

Flink SQL> select * from demo_hudi.t3;
+----+-------------+-------------+
| op | a | b |
+----+-------------+-------------+
| +I | 1 | 1 |
| +I | 2 | 2 |
| +I | 3 | 3 |
| +I | 4 | 4 |
+----+-------------+-------------+
Received a total of 4 rows

-- after "update demo.t3 set b=a*10"

Flink SQL> select * from demo_hudi.t3;
+----+-------------+-------------+
| op | a | b |
+----+-------------+-------------+
| +I | 1 | 10 |
| +I | 2 | 20 |
| +I | 3 | 30 |
| +I | 4 | 40 |
+----+-------------+-------------+
Received a total of 4 rows

-- after "delete from demo.t3 where a < 3"

Flink SQL> select * from demo_hudi.t3;
+----+-------------+-------------+
| op | a | b |
+----+-------------+-------------+
| +I | 3 | 30 |
| +I | 4 | 40 |
+----+-------------+-------------+
Received a total of 2 rows

Example4: Consistency Aggregation Materialized View

sink tables and calculate incremental aggregation result

  • example4/tidb.sql
  • example4/flink.sql.template
    • dump data of table demo.t4 into csv file
    • load csv file into kafka topic etl4-sink-4_base
    • sink incremental data to etl4-sink-4
    • sink topic etl4-sink-4 into topic etl4-sink-4_base
    • ticdc -> kafka -> flink streaming aggregation -> hudi: insert into demo_hudi.t4_agg (c,d) (select b, sum(a) from demo_flink.t4 group by b);

create sink table etl job

1
./setup-demo.py --cmd sink_task --sink_task_desc="etl4.4.demo.t4" --sink_task_flink_schema_path ./example4/flink.sql.template

run tidb sql

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
mysql -h 0.0.0.0 -P 12355 -u root < example4/tidb.sql
mysql -h 0.0.0.0 -P 12355 -u root -e "select * from demo.t4"
+---+------+
| a | b |
+---+------+
| 1 | 1 |
| 2 | 2 |
| 3 | 3 |
| 4 | 4 |
+---+------+
mysql -h 0.0.0.0 -P 12355 -u root -e "select b, sum(a) from demo.t4 group by b"
+------+--------+
| b | sum(a) |
+------+--------+
| 2 | 2 |
| 4 | 4 |
| 1 | 1 |
| 3 | 3 |
+------+--------+
mysql -h 0.0.0.0 -P 12355 -u root -e "delete from demo.t4 where a = 3"
mysql -h 0.0.0.0 -P 12355 -u root -e "insert into demo.t4 (select max(a)+1,max(a)+1 from demo.t4)"
mysql -h 0.0.0.0 -P 12355 -u root -e "update demo.t4 set b = 10"
mysql -h 0.0.0.0 -P 12355 -u root -e "SELECT b, sum(a) from demo.t4 GROUP BY b"
+------+--------+
| b | sum(a) |
+------+--------+
| 10 | 12 |
+------+--------+
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
./run-flink-bash.sh /pingcap/demo/flink-sql-client.sh

Flink SQL> create database demo_hudi;
>
[INFO] Execute statement succeed.

Flink SQL> CREATE TABLE demo_hudi.t4_agg(
> c INT PRIMARY KEY NOT ENFORCED,
> d BIGINT
> ) WITH (
> 'connector' = 'hudi',
> 'path' = 'hdfs://namenode:8020/pingcap/demo/etl4-sink-4/agg',
> 'table.type' = 'MERGE_ON_READ'
> );
>
[INFO] Execute statement succeed.

Flink SQL> SET sql-client.execution.result-mode=TABLEAU;
[INFO] Session property has been set.

Flink SQL> SET execution.runtime-mode='batch';
[INFO] Session property has been set.

Flink SQL> select * from demo_hudi.t4_agg;
2023-06-29 07:40:24,347 INFO org.apache.hadoop.conf.Configuration.deprecation [] - mapred.job.map.memory.mb is deprecated. Instead, use mapreduce.map.memory.mb
+---+---+
| c | d |
+---+---+
| 1 | 1 |
| 2 | 2 |
| 3 | 3 |
| 4 | 4 |
+---+---+
4 rows in set

-- after "delete from demo.t4 where a = 3"

Flink SQL> select * from demo_hudi.t4_agg;
+---+---+
| c | d |
+---+---+
| 1 | 1 |
| 2 | 2 |
| 4 | 4 |
+---+---+
3 rows in set

-- after "insert into demo.t4 (select max(a)+1,max(a)+1 from demo.t4)"

Flink SQL> select * from demo_hudi.t4_agg;
+---+---+
| c | d |
+---+---+
| 1 | 1 |
| 2 | 2 |
| 4 | 4 |
| 5 | 5 |
+---+---+
4 rows in set

-- after "update demo.t4 set b = 10"

Flink SQL> select * from demo_hudi.t4_agg;
+----+----+
| c | d |
+----+----+
| 10 | 12 |
+----+----+
1 row in set
Flink SQL> CREATE TABLE demo_hudi.t4_agg_stream(
> c INT PRIMARY KEY NOT ENFORCED,
> d BIGINT
> ) WITH (
> 'connector' = 'hudi',
> 'path' = 'hdfs://namenode:8020/pingcap/demo/etl4-sink-4/agg',
> 'table.type' = 'MERGE_ON_READ',
> 'read.streaming.enabled' = 'true',
> 'read.streaming.check-interval' = '1'
> );
[INFO] Execute statement succeed.

Flink SQL> SET sql-client.execution.result-mode=TABLEAU;
[INFO] Session property has been set.

-- after "delete from demo.t4 where a = 3"

Flink SQL> select * from demo_hudi.t4_agg_stream;
+----+-------------+----------------------+
| op | c | d |
+----+-------------+----------------------+
| +I | 1 | 1 |
| +I | 2 | 2 |
| +I | 3 | 3 |
| +I | 4 | 4 |
| -D | 3 | (NULL) |

-- after "insert into demo.t4 (select max(a)+1,max(a)+1 from demo.t4)"

Flink SQL> select * from demo_hudi.t4_agg_stream;
+----+-------------+----------------------+
| op | c | d |
+----+-------------+----------------------+
| +I | 1 | 1 |
| +I | 2 | 2 |
| +I | 3 | 3 |
| +I | 4 | 4 |
| -D | 3 | (NULL) |
| +I | 5 | 5 |

-- after "update demo.t4 set b = 10"

Flink SQL> select * from demo_hudi.t4_agg_stream;
+----+-------------+----------------------+
| op | c | d |
+----+-------------+----------------------+
| +I | 1 | 1 |
| +I | 2 | 2 |
| +I | 3 | 3 |
| +I | 4 | 4 |
| -D | 3 | (NULL) |
| +I | 5 | 5 |
| -D | 1 | (NULL) |
| -D | 2 | (NULL) |
| -D | 4 | (NULL) |
| -D | 5 | (NULL) |
| +I | 10 | 12 |

-- after "update demo.t4 set b = 100 where a=4"

Flink SQL> select * from demo_hudi.t4_agg_stream;
+----+-------------+----------------------+
| op | c | d |
+----+-------------+----------------------+
| +I | 1 | 1 |
| +I | 2 | 2 |
| +I | 3 | 3 |
| +I | 4 | 4 |
| -D | 3 | (NULL) |
| +I | 5 | 5 |
| -D | 1 | (NULL) |
| -D | 2 | (NULL) |
| -D | 4 | (NULL) |
| -D | 5 | (NULL) |
| +I | 10 | 12 |
| +I | 100 | 4 |
| +I | 10 | 8 |

Example5: Consistency Inner Join & Aggregation Materialized View

join & aggregation

  • example5/tidb.sql
  • example5/flink.sql.template
  • create build table(INSERT ONLY) and prob table
  • use flink to maintain materialized view about select agg(...) from build inner join prob on ... group by ...
    • add filed proctime as PROCTIME() to the schema of prob table
    • prob inner join ... FOR SYSTEM_TIME AS OF prob.proctime as build on prob.b = build.b ...): make prob table always use the latest build table by jdbc connector.
1
2
3
4
mysql -h 0.0.0.0 -P 12355 -u root < example5/tidb.sql

./setup-demo.py --cmd sink_task --sink_task_desc="etl5.5.demo.t5" --sink_task_flink_schema_path ./example5/fli
nk.sql.template
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
./run-flink-bash.sh
/pingcap/demo/flink-sql-client.sh

Flink SQL> create database demo_hudi;
[INFO] Execute statement succeed.

Flink SQL> SET sql-client.execution.result-mode=TABLEAU;
[INFO] Session property has been set.

Flink SQL> CREATE TABLE demo_hudi.test(
> b INT PRIMARY KEY NOT ENFORCED,
> v BIGINT
> ) WITH (
> 'connector' = 'hudi',
> 'path' = 'hdfs://namenode:8020/pingcap/demo/etl5-sink-5/join',
> 'table.type' = 'MERGE_ON_READ',
> 'read.streaming.enabled' = 'true',
> 'read.streaming.check-interval' = '1'
> );
[INFO] Execute statement succeed.

Flink SQL> select * from demo_hudi.test;
+----+-------------+----------------------+
| op | b | v |
+----+-------------+----------------------+
| +I | 1 | -1 |
| +I | 2 | -4 |
| +I | 3 | -3 |

-- after "insert into demo.t5 values (5,3)"

Flink SQL> select * from demo_hudi.test;
+----+-------------+----------------------+
| op | b | v |
+----+-------------+----------------------+
| +I | 1 | -1 |
| +I | 2 | -4 |
| +I | 3 | -3 |
| +I | 3 | -6 |

-- after "delete from demo.t5 where a=1"

Flink SQL> select * from demo_hudi.test;
+----+-------------+----------------------+
| op | b | v |
+----+-------------+----------------------+
| +I | 1 | -1 |
| +I | 2 | -4 |
| +I | 3 | -3 |
| +I | 3 | -6 |
| -D | 1 | (NULL) |

-- after "update demo.t5_build set c=100"
-- no change
Flink SQL> select * from demo_hudi.test;
+----+-------------+----------------------+
| op | b | v |
+----+-------------+----------------------+
| +I | 1 | -1 |
| +I | 2 | -4 |
| +I | 3 | -3 |
| +I | 3 | -6 |
| -D | 1 | (NULL) |

-- after "insert into demo.t5 values (6,1)"

Flink SQL> select * from demo_hudi.test;
+----+-------------+----------------------+
| op | b | v |
+----+-------------+----------------------+
| +I | 1 | -1 |
| +I | 2 | -4 |
| +I | 3 | -3 |
| +I | 3 | -6 |
| -D | 1 | (NULL) |
| +I | 1 | 100 |

-- after "insert into demo.t5 values (7,3)"
-- add 100 to -6 --> 94

Flink SQL> select * from demo_hudi.test;
+----+-------------+----------------------+
| op | b | v |
+----+-------------+----------------------+
| +I | 1 | -1 |
| +I | 2 | -4 |
| +I | 3 | -3 |
| +I | 3 | -6 |
| -D | 1 | (NULL) |
| +I | 1 | 100 |
| +I | 3 | 94 |

Example6: Use S3 As Backend

Use s3 as storage backend for example3

  • example3-s3/tidb.sql
  • example3-s3/flink.sql.template

Use s3 as storage backend for example4

  • example4-s3/tidb.sql
  • example4-s3/flink.sql.template

Set flink state dir and hudi storage path in template file

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
...

set state.checkpoints.dir = ${s3_dir}/flink-state/checkpoint;
set state.savepoints.dir = ${s3_dir}/flink-state/savepoints;

...

CREATE TABLE demo_hudi.t?(
?,
?
) WITH (
'connector' = 'hudi',
'path' = '${s3_dir}/hudi',
...
);

...

Test ticdc --> s3(minio) --> inner join two modifiable table

  • Run mysql -h ? -P ? -u root < example5/tidb.sql
  • Let TiCDC sink table to S3 storage with option --cdc_sink_s3: ./setup-demo.py --cmd sink_task --sink_task_desc="etl5.5.demo.t5" --sink_task_flink_schema_path ./example5/flink.sql.template --cdc_sink_s3 --cdc_config_file=?
  • Run ticdc/storage-consumer/main.go: ./ticdc/storage-consumer/bin/storage-consumer -upstream-uri s3://etl5-sink-5/ticdc-sink?protocol=canal-json&endpoint=http://${s3_address}&access-Key=minioadmin&secret-Access-Key=minioadmin&enable-tidb-extension=true -flush-interval=500ms -log-level=debug -etl-tables=demo.t5,demo.t5_build -test-str demo.t5.b = demo.t5_build.b
    • check checkpoint-ts by interval 500ms
    • select related data by condition demo.t5.b = demo.t5_build.b when any record of table demo.t5 or demo.t5_build is modified.

Integration Test

Process

  • create a table and start to update it continuously
  • wait for a few seconds and create the etl task to sink the table data into hudi
  • check data consistency between tidb and hudi

Incremental Consistency Test

1
./test-consistency.py

Incremental Aggregation Consistency Test

1
./test-consistency-agg.py
1
./test-consistency-agg-s3.py

Incremental Inner Join Aggregation Consistency Test

1
./test-consistency-join.py

Further Implementation

数据源 exactly-once 一致性:

  • 目前 TiCDC 仅提供 at-least-once 级别的数据源输入,以下 2 种典型场景需建立额外的机制来提供 exactly-once 语义:
    • 同步数据至 Kafka:开启 enable-tidb-extension,下游算子将 TIDB_WATERMARK::watermarkTs 纳入数据源状态,过滤重复数据。
      • 涉及到多数据流 Join 场景,需要参照 watermarkTs 构建 barrier 并对齐。
    • 同步数据至外部存储:开启 enable-tidb-extension,下游将 metadata/checkpoint-ts 纳入数据源状态,过滤重复数据。
      • 涉及到多表关联的场景,建议把相关的表配置在同个 ticdc 同步任务中以便于共享 metadata,需根据 checkpoint-ts 构建 barrier 并对齐。

目前 Demo 应用依赖外部组件:Flink,Kafka(Zookeeper),Hudi(Hadoop 相关各组件)。需要优化整体架构:

  • Flink 作为一个通用且成熟的流处理平台,外部依赖以及内部运行机制均较为繁重,产品化时需舍弃 Flink
    • Java 系产品与其他语言亲和度较弱,不利于当前的 TiDB 社区生态
    • 需要人力去维护和排查 Flink 自身可能带来的问题
    • TiSpark 之类项目前车之鉴,非与 TiDB 体系紧密耦合的项目终陷入死局
  • Hudi 依赖 Hadoop 和 Spark 生态体系,产品化时是否需要令外部感知到?是否需要舍弃 Hudi 而是用 TiDB 生态内部的存储系统?
  • TiDB 体系可结合自身需求,实现轻量级的流式处理框架:
    • 参考 Flink 的 Checkpointing 机制实现分布式容错
      • 类似 Flink 流式状态存储和管理体系,可由 TiKV 提供的分布式 KV 存储和事务体系实现
    • 参考 Flink 实现端到端 exactly-once 一致性
    • 构建维护 增量物化视图 的基本原理与上文类似
    • 存量数据和增量数据可以直接在物化视图中分别处理再合并,保证最终结果线性一致
  • 实现轻量级的数据同步流程,去掉对于 Kafka 的依赖
    • 类 cdc 模块将增量数据持续写入到外部可信任的存储系统中,由使用方控制消费过程(同步数据到存储服务
    • 其他可考虑的方案:侵入式改造 TiDB 的事务模型,按需将 MV 相关的数据和事件纳入到完整的事务流程中,去掉对于 TiCDC,Kafka,Flink 等外部组件的依赖

IMV 一致性

  • 增量方式维护状态机的关键步骤是感知新旧数据的变化,对于 TiDB 这类满足全局线性一致性事务的系统而言,就是不同 timestamp 下的数据差异
  • resolved-ts 或者 ticdc 提供的 watermark-ts 可以很自然地作为推进 checkpoint 的 barrier
  • 处理多表或者单表多分区数据时,可以对齐 barrier,并对同个 checkpoint 内的增量数据按照 commit-ts 排序后依次处理
    • 处理多表 JOIN 时,任一表的改动均需要找出与之关联的其他表数据,并还原出 JOIN 前后的数据变化
      • 例如 t1 inner join t2 on t1.a = t2.at1 的某一行(假设 primary key 为 k1) 在 ts_n 时刻字段 a 发生改动从 a1 变成 a2
        • 找出 t2ts_n - 1 时刻字段 aa1 的所有数据,找出 t1ts_n - 1 时刻 primary key 为 k1 的数据,执行 JOIN 操作后得到结果集 j1
        • 找出 t2ts_n 时刻字段 aa2 的所有数据,找出 t1ts_n 时刻 primary key 为 k1 的数据,执行 JOIN 操作后得到结果集 j2
        • j1j2 之间的数据差异才能用于维护增量状态机
      • 例如 t1 inner join t2 on t1.a = t2.at1 的某一行(假设 primary key 为 k1) 在 ts_n 时刻字段 a 发生改动从 a1 变成 a2t2 的某一行(假设 primary key 为 k2) 在 ts_n 时刻字段 a 发生改动从 a3 变成 a4
        • 找出 t2ts_n - 1 时刻字段 aa1 的所有数据,找出 t1ts_n - 1 时刻 primary key 为 k1 的数据,执行 JOIN 操作后得到结果集 j1_1
        • 找出 t1ts_n - 1 时刻字段 aa3 的所有数据,找出 t2ts_n - 1 时刻 primary key 为 k2 的数据,执行 JOIN 操作后得到结果集 j1_2
        • j1_1j1_2 可能存在重复数据,按照 t1t2 组合的 primary key 去重合并后得到 j1
        • 同理找出 ts_n 时刻的结果集 j2j1j2 之间的数据差异用于维护增量状态机;

数据存储

  • 为了便于兼容云原生架构,需要用兼容云上存储体系的存储系统替换 Hadoop HDFS