在大数据行业内,尤其是数仓建设中,一直有一个绕不开的难题,就是大表的分析计算(这里的大表指亿级以上)。特别是大表之间的 Join 分析,对任何公司数据部门都是一个挑战!
我们提供的服务有:成都网站制作、做网站、微信公众号开发、网站优化、网站认证、嘉陵ssl等。为成百上千家企事业单位解决了网站和推广的问题。提供周到的售前咨询和贴心的售后服务,是有科学管理、有技术的嘉陵网站制作公司
主要有以下挑战:
优点:简单粗暴,对业务和数据开发人员友好,不用调整。
缺点:费钱,看你公司是否有钱。
优点:可以在不大幅增加计算集群成本的情况下,完成日常计算任务。
缺点:对数据和业务都有一定要求,数据一般要求是日志类数据。或者具有一定的生命周期数据(历史数据可归档)。
Spark 经典算法 SortMergeJoin(以大表间的 Join 分析为例)。
该算法也可以简化流程为: Map 一> Shuffle 一> Sort 一> Merge 一> Reduce
该算法的性能瓶颈主要在 Sort Merge Shuffle 阶段(红色流程部分),数据量越大,资源要求越高,性能越低。
大数据计算优化思路,核心无非就三条:增加计算资源;减少被计算数据量;优化计算算法。其中前两条是我们普通人最常用的方法。
两个大表的 Join ,是不是真的每天都有大量的数据有变更呢?如果是的话,那我们的业务就应该思考一下是否合理了。
其实在我们的日常实践场景中,大部分是两个表里面的数据每天只有少量(十万百万至千万级)数据随机变化,大部分数据是不变的。
说到这里,很多人的第一想法是,我们增加分区,按数据是否有变化进行区分,计算有变化的(今日有更新的业务数据),合并未变化的(昨日计算完成的历史数据),不就可以解决问题了。其实这个想法存在以下问题:
图片
问题读到这里,如果我们分别把表 A、表 B 的有变化记录的关联主键取出来合并在一起,形成一个数组变量。计算的时候用这个变量分别从表 A 和表 B 中过滤出有变化的数据进行计算,并从未变化的表(昨日计算完成的历史数据)中过滤出不存在的(即未变化历史结果数据)。这样两份数据简单合并到一起,不就是表 A 和表 B 全量 Join 计算的结果了吗!
也许这里有人会有疑惑,不是说布隆过滤器是命中并不代表一定存在,不命中才代表一定不存在!其实这个命中不代表一定存在,是一个极少量概率问题,即极少量没有更新的数据也会命中布隆过滤器,从而参与了接下来的数据计算,实际上只要所有变化的数据能命中即可。这个不影响它已经帮我买过滤了绝大部分不需要计算的数据。
大家可以根据需要参考、修改和优化,有更好的实现方式欢迎大家分享交流。
程序流程图
图片
Spark 函数 Java 代码实现。
package org.example;
import org.apache.curator.shaded.com.google.common.hash.BloomFilter;
import org.apache.curator.shaded.com.google.common.hash.Funnels;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.api.java.*;
import org.apache.spark.SparkConf;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.lucene.util.RamUsageEstimator;
/**
* add by chengwansheng
*/
class MyBloomFilter {
private BloomFilter bloomFilter;
public MyBloomFilter(BloomFilter b) {
bloomFilter = b;
}
public BloomFilter getBloomFilter() {
return bloomFilter;
}
}
public class BloomUdf implements UDF2
表信息和数据准备。
--建表数据
create table default.A (
item_id bigint comment '商品ID',
item_name string comment '商品名称',
item_price bigint comment '商品价格',
create_time timestamp comment '创建时间',
update_time timestamp comment '创建时间'
)
create table default.B (
item_id bigint comment '商品ID',
sku_id bigint comment 'skuID',
sku_price bigint comment '商品价格',
create_time timestamp comment '创建时间',
update_time timestamp comment '创建时间'
)
create table default.ot (
item_id bigint comment '商品ID',
sku_id bigint comment 'skuID',
sku_price bigint comment '商品价格',
item_price bigint comment '商品价格'
) PARTITIONED BY (pt string COMMENT '分区字段')
--准备数据
insert overwrite table default.A
values
(1,'测试1',101,'2023-03-25 08:00:00','2023-03-25 08:00:00'),
(2,'测试2',102,'2023-03-25 08:00:00','2023-03-25 08:00:00'),
(3,'测试2',103,'2023-03-25 08:00:00','2023-03-25 08:00:00'),
(4,'测试2',104,'2023-03-25 08:00:00','2023-04-22 08:00:00'),
(5,'测试2',105,'2023-03-25 08:00:00','2023-04-22 08:00:00');
insert overwrite table default.B
values
(1,11,201,'2023-03-25 08:00:00','2023-03-25 08:00:00'),
(1,12,202,'2023-03-25 08:00:00','2023-03-25 08:00:00'),
(1,13,203,'2023-04-22 08:00:00','2023-04-22 08:00:00'),
(2,21,211,'2023-03-25 08:00:00','2023-03-25 08:00:00'),
(2,22,212,'2023-03-25 08:00:00','2023-03-25 08:00:00'),
(4,42,212,'2023-03-25 08:00:00','2023-03-25 08:00:00'),
(5,51,251,'2023-03-25 08:00:00','2023-03-25 08:00:00'),
(5,52,252,'2023-04-22 08:00:00','2023-04-22 08:00:00'),
(5,53,253,'2023-04-22 08:00:00','2023-04-22 08:00:00');
insert overwrite table default.ot partition(pt='20230421')
values
(1,11,201,101),
(1,12,202,101),
(2,21,211,102),
(2,22,212,102),
(4,42,212,114),
(5,51,251,110);
原来处理的 SQL 语句。
insert overwrite table default.ot partition(pt='20230422')
select B.item_id
,B.sku_id
,B.sku_price
,A.item_price
from B
left join A on(A.item_id=B.item_id)
使用布隆过滤器的 SQL(Java 函数导入 Spark,函数名为 “bloom_filter”)。
--构建布隆过滤器
drop table if exists tmp.tmp_primary_key;
create table tmp.tmp_primary_key stored as TEXTFILE as
select item_id
from (
select item_id
from default.A
where update_time>='2023-04-22'
union all
select item_id
from default.B
where update_time>='2023-04-22'
) where length(item_id)>0
group by item_id;
--增量数据计算
insert overwrite table default.ot partition(pt='20230422')
select B.item_id
,B.sku_id
,B.sku_price
,A.item_price
from default.B
left join default.A on(A.item_id=B.item_id and bloom_filter(A.item_id, "tmp.tmp_primary_key"))
where bloom_filter(B.item_id, "tmp.tmp_primary_key")
union all
--合并历史未变更数据
select item_id
,sku_id
,sku_price
,item_price
from default.ot
where not bloom_filter(item_id, "tmp.tmp_primary_key")
and pt='20230421'
从上面代码可以看出,使用布隆过滤器的 SQL,核心业务逻辑代码只是在原来全量计算的逻辑中增加了过滤条件而已,使用起来还是比较方便的。
以我司的 “dim.dim_itm_sku_info_detail_d” 和 “dim.dim_itm_info_detail_d” 任务为例,使用引擎 Spark2。
图片
从理论分析和实测效果来看,使用布隆过滤器的解决方案可以大幅提升任务的性能,并减少集群资源的使用。
该方案不仅适用大表间 Join 分析计算,也适用大表相关的其它分析计算需求,核心思想就是计算有必要的数据,排除没必要数据,减小无效的计算损耗。
网站栏目:一种基于布隆过滤器的大表计算优化方法
当前网址:http://www.mswzjz.cn/qtweb/news8/156458.html
攀枝花网站建设、攀枝花网站运维推广公司-贝锐智能,是专注品牌与效果的网络营销公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 贝锐智能