模板聚合的翻译 【翻译】Flink Table Api & SQL — 性能调优 — 流式聚合
日期:2023-03-11 12:39:36 / 人气: 570 / 发布者:成都翻译公司
在此页面中,我们将介绍一些有用的优化选项以及流聚合的内部原理,这将在某些情况下带来很大的改进。注意:当前,仅对无边界聚合支持流聚合优化。小型批处理聚合的核心思想是将一组输入缓存在聚合运算符内部的缓冲区中。数据流中的记录可能会倾斜,因此聚合运算符的某些实例会比其他实例处理更多的记录,这会导致热点。注意:但是,当前,拆分优化不支持包含用户定义的AggregateFunction的聚合。本文翻译自官网:Streaming Aggregation
Flink Table Api & SQL 翻译目录
SQL 是*广泛使用的数据分析语言。Flink 的 Table API 和 SQL 使用户能够以更少的时间和精力定义高效的流分析应用程序。而且,Flink Table API 和 SQL 都得到了有效的优化,集成了很多查询优化和优化算子的实现。但是,并非所有优化都默认启用,因此对于某些工作负载,您可以通过打开某些选项来提高性能。
在这个页面中,我们将介绍一些有用的优化选项和流聚合的内部原理模板聚合的翻译,它们在某些情况下会带来很大的改进。
注意:目前只有 Blink planner 支持本页提到的优化选项。
注意:目前,流聚合优化仅支持无边界聚合。以后会支持窗口聚合的优化。
默认情况下,无界聚合算子对输入的记录进行一条一条的处理,即(1)从状态中读取累加器,(2)将记录累加/收回到累加器中,(3)将累加器写回状态,(4)下一条记录会从(1))再次处理。这种处理方式可能会增加StateBackend的开销(尤其是RocksDB StateBackend)。另外,生产中很常见的数据倾斜会加剧问题并使工作更容易受到背压情况的影响。
小批量聚合
小批量聚合的核心思想是在聚合算子内部的缓冲区中缓存一组输入。当输入被触发进行处理时,每个键只需一个操作即可访问状态。这可以大大减少状态开销并获得更好的吞吐量。但是,这可能会增加一些延迟,因为它会缓冲一些记录而不是立即处理它们。这是吞吐量和延迟之间的权衡。
下图说明了小批量聚合如何减少状态操作。
默认情况下禁用 MiniBatch 优化。要启用此优化,您应该设置 table.exec.mini-batch.enabled、table.exec.mini-batch.allow-latency 和 table.exec.mini-batch.size。请参阅配置页面了解更多详细信息。
以下示例显示了如何启用这些选项。
// instantiate table environment
val tEnv: TableEnvironment = ...
// access flink configuration
val configuration = tEnv.getConfig().getConfiguration()
// set low-level key-value options
configuration.setString("table.exec.mini-batch.enabled", "true") // enable mini-batch optimization
configuration.setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds to buffer input records
configuration.setString("table.exec.mini-batch.size", "5000") // the maximum number of records can be buffered by each aggregate operator task
本地全局聚合
提出将局部聚合分为两个阶段来解决数据倾斜的问题,即先在上游进行局部聚合,然后在下游进行全局聚合,类似于MapReduce中的Combine+Reduce模式. 例如,请考虑以下 SQL:
SELECT color, sum(id)
FROM T
GROUP BY color
数据流中的记录可能会发生倾斜,因此聚合运算符的某些实例将处理比其他实例更多的记录,这可能会导致热点。本地聚合可以帮助将一定数量的具有相同密钥的输入累积到单个累加器中。全局摘要只会接收减少的累加器,而不是大量的原始输入。这样可以大大降低网络重组和状态访问的成本。每个本地聚合累积的输入数量基于*小批处理间隔。这意味着本地-全局聚合取决于启用小批量优化。
下图显示了本地全局聚合如何提高性能。
以下示例显示如何启用本地全局聚合。
// instantiate table environment
val tEnv: TableEnvironment = ...
// access flink configuration
val configuration = tEnv.getConfig().getConfiguration()
// set low-level key-value options
configuration.setString("table.exec.mini-batch.enabled", "true") // local-global aggregation depends on mini-batch is enabled
configuration.setString("table.exec.mini-batch.allow-latency", "5 s")
configuration.setString("table.exec.mini-batch.size", "5000")
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE") // enable two-phase, i.e. local-global aggregation
拆分不同的聚合
局部和全局优化可以有效消除常规聚合的数据倾斜,如SUM、COUNT、MAX、MIN、AVG。然而,在处理不同的聚合反应时,其性能并不令人满意。
例如,如果我们要分析今天有多少独立用户登录。我们可能有以下查询:
SELECT day, COUNT(DISTINCT user_id)
FROM T
GROUP BY day
如果distinct key(即user_id)的值是稀疏的,COUNT DISTINCT不能减少记录。即使启用了局部和全局优化,也无济于事。因为累加器仍然包含几乎所有的原始记录,全局聚合将成为瓶颈(大部分重累加器由一个任务处理,即同一天)。
这个优化的思路是把不同的聚合(比如COUNT(DISTINCT col))分成两个层次。第一个聚合由组键和其他桶键打乱。用于计算桶键 HASH_CODE(distinct_key)% BUCKET_NUM。BUCKET_NUM 默认为 1024,可以通过 table.optimizer.distinct-agg.split.bucket-num 选项进行配置。第二次聚合由原始组键打乱,用于 SUM 聚合来自不同桶的 COUNT DISTINCT 值。由于相同的唯一键只会在同一个桶中计算,因此转换是等效的。桶键作为额外的组键,分担组键中热点的负担。Bucket 关键字使工作具有可扩展性,以解决不同聚合中的数据倾斜/热点问题。
拆分非重复聚合后,上述查询将自动改写为以下查询:
SELECT day, SUM(cnt)
FROM (
SELECT day, COUNT(DISTINCT user_id) as cnt
FROM T
GROUP BY day, MOD(HASH_CODE(user_id), 1024)
)
GROUP BY day
下图展示了拆分非重复聚合如何提高性能(假设颜色代表日期,字母代表user_id)。
注意:以上是*简单的例子,可以从这个优化中受益。此外,Flink 支持拆分更复杂的聚合查询,例如多个不同key的(如不同的集合COUNT(DISTINCT a)、SUM(DISTINCT b)),以及其他非重复的聚合工作(例如模板聚合的翻译,总和、*大值、*小值、计数)。
注意:但是,目前,拆分优化不支持包含用户定义的 AggregateFunction 的聚合。
以下示例显示了如何启用拆分不同聚合优化。
// instantiate table environment
val tEnv: TableEnvironment = ...
tEnv.getConfig // access high-level configuration
.getConfiguration // set low-level key-value options
.setString("table.optimizer.distinct-agg.split.enabled", "true") // enable distinct agg split
在不同的聚合上使用 FILTER 修饰符
在某些情况下,用户可能需要从不同维度计算 UV(唯一身份访问者)的数量,例如 Android UV、iPhone UV、Web UV 和总 UV。很多用户会选择CASE WHEN来支持这个功能,例如:
SELECT
day,
COUNT(DISTINCT user_id) AS total_uv,
COUNT(DISTINCT CASE WHEN flag IN ('android', 'iphone') THEN user_id ELSE NULL END) AS app_uv,
COUNT(DISTINCT CASE WHEN flag IN ('wap', 'other') THEN user_id ELSE NULL END) AS web_uv
FROM T
GROUP BY day
但是,在这种情况下,建议使用 FILTER 语法而不是 CASE WHEN。因为FILTER更符合SQL标准,会得到更多的性能提升。FILTER 是聚合函数中使用的修饰符,用于限制聚合中使用的值。用 FILTER 修饰符替换上面的例子,如下所示:
SELECT
day,
COUNT(DISTINCT user_id) AS total_uv,
COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('android', 'iphone')) AS app_uv,
COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('wap', 'other')) AS web_uv
FROM T
GROUP BY day
Flink SQL 优化器可以在同一个唯一键上识别不同的过滤参数。例如,在上面的示例中,所有三个 COUNT DISTINCT 都在 user_id 列上。那么 Flink 可以只使用一个共享状态实例而不是三个状态实例来减少状态访问和状态大小。在某些工作负载中,这可以显着提高性能。
相关阅读Relate
热门文章 Recent
- dna复制转录翻译的场所模板产物原料原则 必修二-第4章-基因的表达复习2023-03-11
- 公司流水翻译模板 银行流水怎么翻译成英文2023-03-11
- 翻译工作经历模板 英文翻译专员求职简历2023-03-11
- 翻译实习生证明模板-翻译专业实习自我鉴定2023-03-11
- 翻译实习生证明模板-实习手册样本模板2023-03-11
- 翻译专业资格水平考试报名表模板 2018年度翻译专业资格(水平)考试科目及代码对应表2023-03-11
- 英语六级翻译高分模板 英语六级作文高分要诀:王长喜十二句作文法.doc2023-03-11
- 四六级翻译必背5大篇章模板 沪江:六级翻译必背3大篇章模板2023-03-11
- 翻译公司翻译服务合同模板 翻译服务协议样本2023-03-11
- 西班牙语成绩单翻译模板 dele(西班牙语水平考试)2023-03-11


