聊聊流式数据湖Paimon(三)

概述

如果表没有定义主键,则默认情况下它是仅追加 表类型(Append Only Table)。 根据桶(Bucket)的定义,我们有两种不同的仅追加模式:"Append For Scalable Table"和"Append For Queue";两种模式支持不同的场景,提供不同的功能。
只能向表中插入一条完整的记录。 不支持删除或更新,并且不能定义主键。 此类表适合 不需要更新的用例(例如日志数据同步)。

Append 场景特指"无主键"的场景,比如日志数据的记录,不具有直接Upsert更新的能力。

Append For Scalable Table

其支持的功能如下:

  1. 支持批读批写 INSERT OVERWRITE
  2. 支持流读流写 自动合并小文件
  3. 支持湖存储特性 ACID、Time Travel
  4. order与z-order排序

Definition

通过在表属性中定义 'bucket' = '-1',可以为此表分配特殊模式(我们称之为"unaware-bucket 模式")。 在这种模式下,一切都不同了。 我们已经没有了桶的概念,也不保证流式读取的顺序。 我们将此表视为批量离线表(尽管我们仍然可以流式读写)。 所有记录都会进入一个目录(为了兼容性,我们将它们放在bucket-0中),并且我们不再维护顺序。 由于我们没有桶的概念,所以我们不会再按桶对输入记录进行混洗,这将加快插入速度。
使用此模式,可以将 Hive 表替换为 Lake 表。

Compaction

在unaware-bucket模式下,我们不在writer中进行压缩,而是使用Compact Coordinator扫描小文件并将压缩任务提交给Compact Worker。 这样,我们就可以轻松地对一个简单的数据目录进行并行压缩。 在流模式下,如果在flink中运行insert sql,拓扑将是这样的:


它会尽力压缩小文件,但是当一个分区中的单个小文件长时间保留并且没有新文件添加到该分区时,压缩协调器会将其从内存中删除以减少内存使用。 重新启动作业后,它将扫描小文件并将其再次添加到内存中。 控制紧凑行为的选项与 Append For Qeueue 完全相同。 如果将 write-only 设置为 true,Compact Coordinator 和 Compact Worker 将从拓扑中删除。
自动压缩仅在 Flink 引擎流模式下支持。还可以通过 paimon 中的 flink 操作在 flink 中启动压缩作业,并通过 set write-only 禁用所有其他压缩。

Sort Compact

每个分区中的数据乱序会导致选择缓慢,压缩可能会减慢插入速度。 将插入作业设置为只写是一个不错的选择,并且在每个分区数据完成后,触发分区排序压缩操作。

Streaming Source

Unaware-bucket模式 Append Only Table 支持流式读写,但不再保证顺序。 你不能把它看作一个队列,而是一个有bin的湖。每次提交都会生成一个新的binbin存储记录 来读取增量,但是一个 bin 中的记录会流向它们想要的任何地方,并且我们以任何可能的顺序获取它们。 在Append For Queue模式下,记录不存储在bin中,而是存储在record pipe中。 记录 存储,我们可以通过读取新的存储记录 来读取增量,但是一个 bin 中的记录会流向它们想要的任何地方,并且我们以任何可能的顺序获取它们。 在Append For Queue模式下,记录不存储在bin中,而是存储在record pipe中。

bin:储物箱

Streaming Multiple Partitions Write

由于Paimon-sink需要处理的写入任务数量为:数据写入的分区数量 * 每个分区的桶数量。 因此,我们需要尽量控制每个paimon-sink任务的写任务数量,使其分布在合理的范围内。 如果每个sink-task处理过多的写任务,不仅会导致小文件过多的问题,还可能导致内存不足的错误。
另外,写入失败会引入孤儿文件,这无疑增加了维护paimon的成本。 我们需要尽可能避免这个问题。
对于启用自动合并的 flink-jobs,我们建议尝试按照以下公式来调整 paimon-sink 的并行度(这不仅仅适用于append-only-tables,它实际上适用于大多数场景):

(N*B)/P < 100   (This value needs to be adjusted according to the actual situation)
N(the number of partitions to which the data is written)
B(bucket number)
P(parallelism of paimon-sink)
100 (This is an empirically derived threshold,For flink-jobs with auto-merge disabled, this value can be reduced.
However, please note that you are only transferring part of the work to the user-compaction-job, you still have to deal with the problem in essence,
the amount of work you have to deal with has not been reduced, and the user-compaction-job still needs to be adjusted according to the above formula.)

还可以将 write-buffer-spillable 设置为 true,writer 可以将记录溢出到磁盘。 这可以尽可能地减少小文件。要使用此选项,的 flink 集群需要有一定大小的本地磁盘。 这对于那些在 k8s 上使用 flink 的人来说尤其重要。
对于仅追加表,您可以为仅追加表设置 write-buffer-for-append 选项。 将此参数设置为true,writer将使用Segment Pool缓存记录以避免OOM。

Example

以下是创建Append-Only表并指定bucket key的示例。

CREATE TABLE MyTable (
  product_id BIGINT,
  price DOUBLE,
  sales BIGINT
) WITH (
  'bucket' = '-1'
);

Append For Queue

其支持的功能如下:

  1. 严格保证顺序,可以带消息队列
  2. 支持Watermark且对齐
  3. 自动合并小文件
  4. 支持Consumer-ID (类似Group-ID)

Definition

在这种模式下,可以将append-only table看成是一个由bucket分隔的队列。 同一个桶中的每条记录都是严格排序的,流式读取会严格按照写入的顺序将记录传输到下游。 使用此模式,不需要进行特殊配置,所有数据都会以队列的形式放入一个桶中。还可以定义bucketbucket-key以实现更大的并行性和分散数据。

Compaction

默认情况下,sink节点会自动进行compaction来控制文件数量。 以下选项控制压缩策略:

Streaming Source

目前仅 Flink 引擎支持流式源行为。

Streaming Read Order

对于流式读取,记录按以下顺序生成:

  • 对于来自两个不同分区的任意两条记录
    • 如果 scan.plan-sort-partition 设置为 true,则首先生成分区值较小的记录。
    • 否则,将先产生分区创建时间较早的记录。
  • 对于来自同一分区、同一桶的任意两条记录,将首先产生第一条写入的记录。
  • 对于来自同一分区但两个不同桶的任意两条记录,不同的桶由不同的任务处理,它们之间没有顺序保证。
Watermark Definition

定义读取 Paimon 表的watermark:

CREATE TABLE T (
    `user` BIGINT,
    product STRING,
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (...);

-- launch a bounded streaming job to read paimon_table
SELECT window_start, window_end, COUNT(`user`) FROM TABLE(
 TUMBLE(TABLE T, DESCRIPTOR(order_time), INTERVAL '10' MINUTES)) GROUP BY window_start, window_end;

还可以启用 Flink Watermark 对齐,这将确保没有源/拆分/分片/分区将其 Watermark 增加得远远超出其他部分:

Bounded Stream

Streaming Source 也可以是有界的,指定 scan.bounded.watermark 来定义有界流模式的结束条件,流读取将结束,直到遇到更大的 watermark 快照。
快照中的watermark 是由writer生成的,例如,指定kafka源并声明watermark 的定义。当使用此kafka源写入Paimon表时,Paimon表的快照将生成相应的watermark,以便流式读取此Paimon表时可以使用有界watermark的功能。

CREATE TABLE kafka_table (
    `user` BIGINT,
    product STRING,
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka'...);

-- launch a streaming insert job
INSERT INTO paimon_table SELECT * FROM kakfa_table;

-- launch a bounded streaming job to read paimon_table
SELECT * FROM paimon_table /*+ OPTIONS('scan.bounded.watermark'='...') */;

Example

以下是创建Append-Only表并指定bucket key的示例。

CREATE TABLE MyTable (
    product_id BIGINT,
    price DOUBLE,
    sales BIGINT
) WITH (
    'bucket' = '8',
    'bucket-key' = 'product_id'
);

参考

基于 Apache Paimon 的 Append 表处理
Apache Paimon 实时数据湖 Streaming Lakehouse 的存储底座

热门相关:我有一张均富卡   惊世第一妃:魔帝,宠上身!   一等狂后:绝色驭兽师   诛天至极   我有一个进化点