踩坑记|FlinkSqlCount还有这种坑!

本文转载自微信公众号「大数据羊说」,作者antigeneral了呀。转载本文请联系大数据羊说公众号。

创新互联是一家专业提供长宁企业网站建设,专注与成都网站制作、网站建设、外贸网站建设H5场景定制、小程序制作等业务。10年已为长宁众多企业、政府机构等服务。创新互联专业网站建设公司优惠进行中。

 1.序篇

通过本文你可了解到

  1. 踩坑场景篇-这个坑是啥样的
  2. 问题排查篇-坑的排查过程
  3. 问题原理解析篇-导致问题的机制是什么
  4. 避坑篇-如何避免这种问题
  5. 展望篇-有什么机制可以根本避免这种情况

先说下结论:在非窗口类 flink sql 任务中,会存在 retract 机制,即上游会向下游发送「撤回消息(做减法)」,**最新的结果消息(做加法)**两条消息来计算结果,保证结果正确性。

而如果我们在上下游中间使用了映射类 udf 改变了**撤回消息(做减法)「的一些字段值时,就可能会导致」撤回消息(做减法)**不能被正常处理,最终导致结果的错误。

2.踩坑场景篇-这个坑是啥样的

在介绍坑之前我们先介绍下我们的需求、实现方案的背景。

2.1.背景

在各类游戏中都会有一种场景,一个用户可以从 A 等级升级到 B 等级,用户可以不断的升级,但是一个用户同一时刻只会在同一个等级。需求指标就是当前分钟各个等级的用户数。

2.2.预期效果

2

2.3.解决思路

获取到当前所有用户的最新等级

一个用户同一时刻只会在一个等级,所以对每一个等级的用户做 count 操作

2.4.解决方案

获取到当前所有用户的最新等级:flink sql row_number() 就可以实现,按照数据的 rowtime 进行逆序排序就可以获取到用户当前最新的等级

对每一个等级的用户做 count 操作:对 row_number() 的后的明细结果进行 count 操作

2.4.1.sql

具体实现 sql 如下,非常简单:

 
 
 
 
  1. WITH detail_tmp AS ( 
  2.   SELECT 
  3.     等级, 
  4.     id, 
  5.     `timestamp` 
  6.   FROM 
  7.     ( 
  8.       SELECT 
  9.         等级, 
  10.         id, 
  11.         `timestamp`, 
  12.         -- row_number 获取最新状态 
  13.         row_number() over( 
  14.           PARTITION by id 
  15.           ORDER BY 
  16.             `timestamp` DESC 
  17.         ) AS rn 
  18.       FROM 
  19.         source_db.source_table 
  20.     ) 
  21.   WHERE 
  22.     rn = 1 
  23. SELECT 
  24.   DIM.中文等级 as 等级, 
  25.   sum(part_uv) as uv 
  26. FROM 
  27.   ( 
  28.     SELECT 
  29.       等级, 
  30.       count(id) as part_uv 
  31.     FROM 
  32.       detail_tmp 
  33.     GROUP BY 
  34.       等级, 
  35.       mod(id, 1024) 
  36.   ) 
  37. -- 上游数据的等级名称是数字,需求方要求给转换成中文,所以这里加了一个 udf 映射 
  38. LEFT JOIN LATERAL TABLE(等级中文映射_UDF(等级)) AS DIM(中文等级) ON TRUE 
  39. GROUP BY 
  40.   DIM.中文等级 

2.4.2.参数配置

使用 minibatch 参数方式控制数据输出频率。

 
 
 
 
  1. table.exec.mini-batch.enabled : true 
  2. -- 设定 60s 的触发间隔 
  3. table.exec.mini-batch.allow-latency : 60s 
  4. table.exec.mini-batch.size : 10000000000 

任务 plan。

1

2.5.问题场景

这段 SQL 跑了 n 年都没有问题,但是有一天运营在配置【等级中文映射_UDF】时,不小心将一个等级的中文名给映射错了,虽然马上恢复了,但是当天的实时数据和离线数据对比后却发现,实时产出的数值比离线大很多!!!而之前都是保持一致的。

3.问题排查篇-坑的排查过程

首先我们想一下,这个指标是算 uv 的,运营将等级中文名配置错了,也应该是把原有等级的最终结果算少啊,怎么会算多呢???

然后我们将场景复现了下,来看看代码:

任务代码,大家可以直接 copy 到本地运行:

 
 
 
 
  1. public class Test { 
  2.  
  3.     public static void main(String[] args) throws Exception { 
  4.  
  5.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
  6.  
  7.         env.setParallelism(1); 
  8.  
  9.         EnvironmentSettings settings = EnvironmentSettings 
  10.                 .newInstance() 
  11.                 .useBlinkPlanner() 
  12.                 .inStreamingMode().build(); 
  13.  
  14.         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); 
  15.  
  16.         // 模拟输入 
  17.         DataStream> tuple3DataStream = 
  18.                 env.fromCollection(Arrays.asList( 
  19.                         Tuple3.of("2", 1L, 1627218000000L), 
  20.                         Tuple3.of("2", 101L, 1627218000000L + 6000L), 
  21.                         Tuple3.of("2", 201L, 1627218000000L + 7000L), 
  22.                         Tuple3.of("2", 301L, 1627218000000L + 7000L))); 
  23.         // 分桶取模 udf 
  24.         tEnv.registerFunction("mod", new Mod_UDF()); 
  25.  
  26.         // 中文映射 udf 
  27.         tEnv.registerFunction("status_mapper", new StatusMapper_UDF()); 
  28.  
  29.         tEnv.createTemporaryView("source_db.source_table", tuple3DataStream, 
  30.                 "status, id, timestamp"); 
  31.  
  32.         String sql = "WITH detail_tmp AS (\n" 
  33.                 + "  SELECT\n" 
  34.                 + "    status,\n" 
  35.                 + "    id,\n" 
  36.                 + "    `timestamp`\n" 
  37.                 + "  FROM\n" 
  38.                 + "    (\n" 
  39.                 + "      SELECT\n" 
  40.                 + "        status,\n" 
  41.                 + "        id,\n" 
  42.                 + "        `timestamp`,\n" 
  43.                 + "        row_number() over(\n" 
  44.                 + "          PARTITION by id\n" 
  45.                 + "          ORDER BY\n" 
  46.                 + "            `timestamp` DESC\n" 
  47.                 + "        ) AS rn\n" 
  48.                 + "      FROM source_db.source_table" 
  49.                 + "    )\n" 
  50.                 + "  WHERE\n" 
  51.                 + "    rn = 1\n" 
  52.                 + ")\n" 
  53.                 + "SELECT\n" 
  54.                 + "  DIM.status_new as status,\n" 
  55.                 + "  sum(part_uv) as uv\n" 
  56.                 + "FROM\n" 
  57.                 + "  (\n" 
  58.                 + "    SELECT\n" 
  59.                 + "      status,\n" 
  60.                 + "      count(distinct id) as part_uv\n" 
  61.                 + "    FROM\n" 
  62.                 + "      detail_tmp\n" 
  63.                 + "    GROUP BY\n" 
  64.                 + "      status,\n" 
  65.                 + "      mod(id, 100)\n" 
  66.                 + "  )\n" 
  67.                 + "LEFT JOIN LATERAL TABLE(status_mapper(status)) AS DIM(status_new) ON TRUE\n" 
  68.                 + "GROUP BY\n" 
  69.                 + "  DIM.status_new"; 
  70.  
  71.         Table result = tEnv.sqlQuery(sql); 
  72.  
  73.         tEnv.toRetractStream(result, Row.class).print(); 
  74.  
  75.         env.execute(); 
  76.     } 
  77.  

UDF 代码:

 
 
 
 
  1. public class StatusMapper_UDF extends TableFunction { 
  2.  
  3.     public void eval(String status) { 
  4.         if (status.equals("1")) { 
  5.             collector.collect("等级1"); 
  6.         } else if (status.equals("2")) { 
  7.             collector.collect("等级2"); 
  8.         } else if (status.equals("3")) { 
  9.             collector.collect("等级3"); 
  10.         } 
  11.     } 
  12.  

在正确情况(模拟 UDF 没有任何变动的情况下)的输出结果:

 
 
 
 
  1. (true,等级2,1) 
  2. (false,等级2,1) 
  3. (true,等级2,2) 
  4. (false,等级2,2) 
  5. (true,等级2,3) 
  6. (false,等级2,3) 
  7. (true,等级2,4) 

最终等级2 的 uv 数为 4,结果复合预期?。

模拟下用户修改了 udf 配置之后,UDF 代码如下:

 
 
 
 
  1. public class StatusMapper_UDF extends TableFunction { 
  2.  
  3.     private int i = 0; 
  4.  
  5.     public void eval(String status) { 
  6.  
  7.         if (i == 5) { 
  8.             collect("等级4"); 
  9.         } else { 
  10.             if ("1".equals(status)) { 
  11.                 collector.collect("等级1"); 
  12.             } else if ("2".equals(status)) { 
  13.                 collector.collect("等级2"); 
  14.             } else if ("3".equals(status)) { 
  15.                 collector.collect("等级3"); 
  16.             } 
  17.         } 
  18.         i++; 
  19.     } 
  20.  

得到的结果如下:

 
 
 
 
  1. (true,等级2,1) 
  2. (false,等级2,1) 
  3. (true,等级2,2) 
  4. (false,等级2,2) 
  5. (true,等级2,3) 
  6. (false,等级2,3) 
  7. (true,等级2,7) 

最终等级2 的 uv 数为 7,很明显这是错误结果?。

因此可以确定是由于这个 UDF 的处理逻辑变换而导致的结果出现错误。

下文就让我们来分析下其中缘由。

问题原理解析篇-导致问题的机制是什么

我们首先来分析下上述 SQL,可以发现整个 flink sql 任务是使用了 unbounded + minibatch 实现的,在 minibatch 触发条件触发时,上游算子会将之前的结果撤回,然后将最新的结果发出。

这个任务的 execution plan 如图所示。

7

可以从算子图中的一些计算逻辑可以看到,整个任务都是基于 retract 机制运行(count_retract、sum_retract 等)。

而涉及到 udf 的核心逻辑是在 Operator(ID = 7),和 Operator(ID = 12) 之间。当 Operator(ID = 7) GroupAggregate 结果发生改变之后,会发一条「撤回消息(做减法)」,一条**最新的结果消息(做加法)**到 Operator(ID = 12) GroupAggregate。

5

Notes:简单解释下上面说的「撤回消息(做减法)」,「最新的结果消息(做加法)」。举个算 count 的例子:当整个任务的第一条数据来之后,之前没有数据,所以不用撤回,结果就是 0(没有数据) + 1(第一条数据) = 1(结果),当第二条结果来之后,就要将上次发的 1 消息(可以理解为是整个任务的一个中间结果)撤回,将最新的结果 2 发下去。那么计算方法就是 1(上次的结果) - 1(撤回) + 2(当前最新的结果消息)= 2(结果)。

通过算子图可以发现,【中文名称映射】UDF 是处于两个 GroupAggregate 之间的。也就是说 Operator(ID = 7) GroupAggregate 发出的「撤回消息(做减法)」,**最新的结果消息(做加法)「都会执行这个 UDF,那么就有可能」撤回消息(做减法)「中的某个作为下游 GroupAggregate 算子 key 的字段会被更改成其他值,那么这条消息就不会发到原来下游 GroupAggregate 算子的原始 key 中,那么原来的 key 的历史结果就撤回不了了。。。但是」最新的结果消息(做加法)**的字段没有被更改时,那么这个消息依然被发到了下游 GroupAggregate 算子,这就会导致没做减法,却做了加法,就会导致结果增加,如下图所示。

从这个角度出发,我们来分析下上面的 case,从内层发给外层的消息一条一条来分析。

内层消息怎么来看呢?其实就是将上面的 SQL 中的 left join 删除,重新跑一遍就可以得到结果,结果如下:

 
 
 
 
  1. (true,等级2,1) 
  2. (false,等级2,1) 
  3. (true,等级2,2) 
  4. (false,等级2,2) 
  5. (true,等级2,3) 
  6. (false,等级4,3) 
  7. (true,等级2,4) 

来分析下内层消息发出之后对应到外层消息的操作:

内层 外层
(true,等级2,1)(true,等级2,1)
(false,等级2,1)(false,等级2,1)
(true,等级2,2)(true,等级2,2)
(false,等级2,2)(false,等级2,2)
(true,等级2,3)(true,等级2,3)

前五条消息不会导致错误,不用详细说明。

内层 外层
(true,等级2,1)(true,等级2,1)
(false,等级2,1)(false,等级2,1)
(true,等级2,2)(true,等级2,2)
(false,等级2,2)(false,等级2,2)
(true,等级2,3)(true,等级2,3)
(false,等级4,3) 

第六条消息发出之后,经过 udf 的处理之后,中文名被映射成了【等级4】,而其再通过 hash partition 策略向下发送消息时,就不能将这条撤回消息发到原本 key 为【等级2】的算子中了,这条撤回消息也无法被处理了。

内层 外层
(true,等级2,1)(true,等级2,1)
(false,等级2,1)(false,等级2,1)
(true,等级2,2)(true,等级2,2)
(false,等级2,2)(false,等级2,2)
(true,等级2,3)(true,等级2,3)
(false,等级4,3) 
(true,等级2,4)(false,等级2,3) (true,等级2,7)

第七条消息 (true,等级2,4) 发出后,外层 GroupAggregate 算子首先会将上次发出的记过撤回,即(false,等级2,3),然后将(true,等级2,4)累加到当前的记过上,即 3(上次结果)+ 4(这次最新的结果)= 7(结果)。就导致了上述的错误结果。

定位到问题原因之后,我们来看看怎么避免上述错误。

6.避坑篇-如何避免这种问题

6.1.从源头避免

udf 这种映射维度的 udf 尽量在上线前就固定下来,避免后续更改造成的数据错误。

6.2.替换为 ScalarFunction 进行映射

 
 
 
 
  1. WITH detail_tmp AS ( 
  2.   SELECT 
  3.     status, 
  4.     id, 
  5.     `timestamp` 
  6.   FROM 
  7.     ( 
  8.       SELECT 
  9.         status, 
  10.         id, 
  11.         `timestamp`, 
  12.         row_number() over( 
  13.           PARTITION by id 
  14.           ORDER BY 
  15.             `timestamp` DESC 
  16.         ) AS rn 
  17.       FROM 
  18.         ( 
  19.           SELECT 
  20.             status, 
  21.             id, 
  22.             `timestamp` 
  23.           FROM 
  24.             source_db.source_table 
  25.         ) t1 
  26.     ) t2 
  27.   WHERE 
  28.     rn = 1 
  29. SELECT 
  30.   -- 在此处进行中文名称映射 
  31.   等级中文映射_UDF(status) as status, 
  32.   sum(part_uv) as uv 
  33. FROM 
  34.   ( 
  35.     SELECT 
  36.       status, 
  37.       count(distinct id) as part_uv 
  38.     FROM 
  39.       detail_tmp 
  40.     GROUP BY 
  41.       status, 
  42.       mod(id, 100) 
  43.   ) 
  44. GROUP BY 
  45.   status 

还是刚刚的逻辑,刚刚的配方,我们先来看一下结果。

 
 
 
 
  1. public class StatusMapper_UDF extends ScalarFunction { 
  2.  
  3.     private int i = 0; 
  4.  
  5.     public String eval(String status) { 
  6.  
  7.         if (i == 5) { 
  8.             i++; 
  9.             return "等级4"; 
  10.         } else { 
  11.             i++; 
  12.             if ("1".equals(status)) { 
  13.                 return "等级1"; 
  14.             } else if ("2".equals(status)) { 
  15.                 return "等级2"; 
  16.             } else if ("3".equals(status)) { 
  17.                 return "等级3"; 
  18.             } 
  19.         } 
  20.         return "未知"; 
  21.     } 
  22.  

发现虽然依然会有 (false,等级4,3) 这样的错误撤回数据(这是 udf 决定的,没法避免),但是我们可以发现最终的结果是 (true,等级2,4),结果依然是正确的。

再来分析下问什么这种方式可以解决,如图 plan。

6

发现映射 udf 算子所处的位置不在两个 GroupAggregrate 之间了,因此在 retract 消息发送之后,不会被映射到错误其他 key 中,因此所有的 retract 消息都会正常处理。

7.展望篇-有什么机制可以根本避免这种情况

可以将「撤回消息(做减法)」,**最新的结果消息(做加法)**做成一个原子消息从上游发给下游,下游统一进行原子性处理,关联 udf 时,也只对 group key 关联一次即可。

分享题目:踩坑记|FlinkSqlCount还有这种坑!
当前路径:http://www.mswzjz.cn/qtweb/news26/98026.html

攀枝花网站建设、攀枝花网站运维推广公司-贝锐智能,是专注品牌与效果的网络营销公司;服务项目有等

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 贝锐智能