首页 > 其他分享 >分布式场景怎么Join | 京东云技术团队

分布式场景怎么Join | 京东云技术团队

时间:2024-02-20 16:47:45浏览次数:20  
标签:排序 Join 内存 京东 联接 数据 id 分布式

背景

最近在阅读查询优化器的论文,发现System R中对于Join操作的定义一般分为了两种,即嵌套循环、排序-合并联接。在原文中,更倾向使用排序-合并联接逻辑。

考虑到我的领域是在处理分库分表或者其他的分区模式,这让我开始不由得联想我们怎么在分布式场景应用这个Join逻辑,对于两个不同库里面的不同表我们是没有办法直接进行Join操作的。查阅资料后发现原来早有定义,即分布式联接算法。

分布式联接算法

跨界点处理数据即分布式联接算法,常见的有四种模型:Shuffle Join(洗牌联接)、Broadcast Join(广播联接)、MapReduce Join(MapReduce联接)、Sort-Merge Join(排序-合并联接)。

接下来将进行逐一了解与分析,以便后续开发的应用。

Shuffle Join(洗牌联接)

先上原理解释:

Shuffle Join的核心思想是将来自不同节点的数据重新分发(洗牌),使得可以联接的数据行最终位于同一个节点上。 通常,对于要联接的两个表,会对联接键应用相同的哈希函数,哈希函数的结果决定了数据行应该被发送到哪个节点。这样,所有具有相同哈希值的行都会被送到同一个节点,然后在该节点上执行联接操作。

可能解释完还是有点模糊,举个例子,有两张表,分别以id字段进行分库操作,且哈希算法相同(为了简单,这里只介绍分库场景,分库分表同理。算法有很多种,这里举例是hash算法),那么这两张表的分片或许可以在同一个物理库中,这样我们不需要做大表维度的处理,我们可以直接下推Join操作到对应的物理库操作即可。

在ShardingSphere中,这种场景类似于绑定表的定义,如果两张表的算法相同,可以直接配置绑定表的关系,进行相同算法的连接查询,避免复杂的笛卡尔积。

这样做的好处是可以尽量下推到数据库操作,在中间件层面我们可以做并行处理,适合大规模的数据操作。

但是,这很理想,有多少表会采用相同算法处理呢。

Broadcast Join(广播联接)

先上原理解释:

当一个表的大小相对较小时,可以将这个小表的全部数据广播到所有包含另一个表数据的节点上。 每个节点上都有小表的完整副本,因此可以独立地与本地的大表数据进行联接操作,而不需要跨节点通信。

举个例子,有一张非常小的表A,还有一张按照ID分片的表B,我们可以在每一个物理库中复制一份表A,这样我们的Join操作就可以直接下推到每一个数据库操作了。

这种情况比Shuffle Join甚至还有性能高效,这种类似于ShardingSphere中的广播表的定义,其存在类似于字典表,在每一个数据库都同时存在一份,每次写入会同步到多个节点。

这种操作的好处显而易见,不仅支持并行操作而且性能极佳。

但是缺点也显而易见,如果小表不够小数据冗余不说,广播可能会消耗大量的网络带宽和资源。

MapReduce Join(MapReduce联接)

先上原理解释:

MapReduce是一种编程模型,用于处理和生成大数据集,其中的联接操作可以分为两个阶段:Map阶段和Reduce阶段。 Map阶段:每个节点读取其数据分片,并对需要联接的键值对应用一个映射函数,生成中间键值对。 Reduce阶段: 中间键值对会根据键进行排序(在某些实现中排序发生在Shuffle阶段)和分组,然后发送到Reduce节点。 在Reduce节点上,具有相同键的所有值都会聚集在一起,这时就可以执行联接操作。

MapReduce Join不直接应用于传统数据库逻辑,而是适用于Hadoop这样的分布式处理系统中。但是为了方便理解,还是用SQL语言来分析,例如一条SQL:

SELECT orders.order_id, orders.date, customers.name
FROM orders
JOIN customers ON orders.customer_id = customers.customer_id;

会被转换为两个SQL:

SELECT customer_id, order_id, date FROM orders;
SELECT customer_id, name FROM customers;

这个过程就是Map阶段,即读取orderscustomers表的数据,并为每条记录输出键值对,键是customer_id,值是记录的其余部分。

下一个阶段可有可无,即Shuffle阶段。如果不在这里排序可能会在Map阶段执行SQL时候排序/分组或者在接下来的Reduce阶段进行额外排序/分组。在这个阶段主要将收集到的数据按照customer_id排序分组,以确保相同的customer_id的数据达到Reduce阶段。

Reduce阶段将每个对应的customer_id进行联接操作,输出并返回最后的结果。

这种操作普遍应用于两个算法完全不相同的表单,也是一种标准的处理模型,在这个过程中,我们以一张逻辑表的维度进行操作。这种算法可能会消耗大量内存,甚至导致内存溢出,并且在处理大数据量时会相当耗时,因此不适合需要低延迟的场景。

额外补充

内存溢出场景普遍在如下场景:

1.大键值对数量:如果Map阶段产生了大量的键值对,这些数据需要在内存中进行缓存以进行排序和传输,这可能会消耗大量内存。 2.数据倾斜:如果某个键非常常见,而其他键则不那么常见,那么处理这个键的Reducer可能会接收到大量的数据,导致内存不足。这种现象称为数据倾斜。 3.大值列表:在Reduce阶段,如果某个键对应的值列表非常长,处理这些值可能会需要很多内存。 4.不合理的并行度:如果Reduce任务的数量设置得不合适(太少或太多),可能会导致单个任务处理不均匀,从而导致内存问题。

我能想到的相应解决方案:

•内存到磁盘的溢写:当Map任务的输出缓冲区满了,它会将数据溢写到磁盘。这有助于限制内存使用,但会增加I/O开销。 •通过设置合适的Map和Reduce任务数量,可以更有效地分配资源,避免某些任务过载。具体操作可以将Map操作的分段比如1~100,100~200,Reduce阶段开设较少的并发处理。 •优化数据分布,比如使用范围分区(range partitioning)或哈希分区(hash partitioning)来减少数据倾斜。

Sort-Merge Join(排序-合并联接)

先上原理解释:

在分布式环境中,Sort-Merge Join首先在每个节点上对数据进行局部排序,然后将排序后的数据合并起来,最后在合并的数据上执行联接操作。 这通常涉及到多阶段处理,包括局部排序、数据洗牌(重新分发),以及最终的排序和合并。

举个理解,还是上面的SQL。

SELECT orders.order_id, orders.date, customers.name
FROM orders
JOIN customers ON orders.customer_id = customers.customer_id;
1.对orders表按customer_id进行排序。 2.对customers表按customer_id进行排序。 3.同时遍历两个已排序的表,将具有相同customer_id的行配对。

这个就有点类似于原生的排序-合并联接了。也是数据库场景的标准处理办法。

对于已经排序的数据集或数据分布均匀的情况,这种方法非常有效。如果数据未预先排序,这种方法可能会非常慢,因为它要求数据在联接之前进行排序。

当然,这个算法也会造成内存溢出的场景,解决方案如下:

1.当数据集太大而无法一次性加载到内存中时,可以使用外部排序算法。外部排序算法会将数据分割成多个批次,每个批次单独排序,然后将排序后的批次合并。这种方法通常涉及到磁盘I/O操作,因此会比内存中操作慢。 2.对于合并步骤,可以使用流式处理技术,一次只处理数据的一小部分,并持续将结果输出到下一个处理步骤或存储系统。这样可以避免一次性加载大量数据到内存中。 3.当内存不足以处理数据时,可以使用磁盘空间作为临时存储。数据库管理系统通常有机制来处理内存溢出,比如创建磁盘上的临时文件来存储过程中的数据。 4.在分布式系统中,可以将数据分散到多个节点上进行处理,这样每个节点只需要处理数据的一部分,从而减少单个节点上的内存压力。​  

作者:京东科技 张俊杰

来源:京东云开发者社区 转载请注明来源

标签:排序,Join,内存,京东,联接,数据,id,分布式
From: https://www.cnblogs.com/Jcloud/p/18023435

相关文章

  • 分布式系统之性能度量
     系统的性能主要看吞吐量和响应时间(时延),且是针对成功的请求而言的(请求失败了还谈其吞吐量或时延有何意义)系统的性能如果只看吞吐量,不看响应时间是没有意义的。比如系统每秒可以顶10万请求,但是响应时间已经到了5秒钟,这样的系统已经不可用了,这样的吞吐量也是没有意义的。吞吐量......
  • 分布式、集群和微服务之间的区别
    分布式、集群和微服务之间的区别   分布式系统是一种系统范式,而分布式架构是一种系统设计的方法论。它们的含义在某些情况下可能会重叠,但在其他情况下可能会有细微的区别。  分布式系统1.分布式系统概念是指由多个相互独立的节点(或者进程)组成的系统,这些节点通过网络......
  • R语言Apriori关联规则、kmeans聚类、决策树挖掘研究京东商城网络购物用户行为数据可视
    全文链接:http://tecdat.cn/?p=30360最近我们被客户要求撰写关于网络购物用户行为的研究报告,包括一些图形和统计输出。随着网络的迅速发展,依托于网络的购物作为一种新型的消费方式,在全国乃至全球范围内飞速发展电子商务成为越来越多消费者购物的重要途径。我们被客户要求撰写关......
  • sensitive-word-admin v1.3.0 发布 如何支持敏感词控台分布式部署?
    拓展阅读sensitive-word-adminv1.3.0发布如何支持分布式部署?sensitive-word-admin敏感词控台v1.2.0版本开源sensitive-word基于DFA算法实现的高性能敏感词工具介绍更多技术交流业务背景如果我们的敏感词部署之后,不会变化,那么其实不用考虑这个问题。但是......
  • 哎呀,当时怎么没有想到 | 京东云技术团队
    在我们的测试工作中,是不是经常遇到这样的情形,发生了线上问题,产品、研发或者测试同学一拍脑袋:当时怎么没有想到,怎么给漏掉了呢?明明是一个非常简单的事情,用大拇指都能想到的验证场景,为何当时就漏测了呢?但实际情况是,逃逸到线上的缺陷,疑难杂症式的极端异常的问题很少,大部分都不复杂且......
  • 分布式系统---关键技术“消息中间件”
    消息中间件是一种用于构建分布式系统的软件基础设施,提供了一种异步的,可靠的,可伸缩的消息传递机制。 提高系统性能首先考虑的是数据库的优化,但是数据库因为历史原因,横向扩展是一件非常复杂的工程,所有我们一般会尽量把流量都挡在数据库之前。不管是无限的横向扩展服务器,还是纵向......
  • 分布式下的数据拆分,读写分离,分库分离相关问题
    一、为什么要用分库分表当不使用分库分表的情况下,系统的性能瓶颈主要体现在:当面临高并发场景的时候,为了避免Mysql崩溃(MySql性能一般的服务器建议2000/s读写并发以下),只能使用消息队列来削峰。受制于单机限制。数据库磁盘容量吃紧。数据库单表数据量太大,sql越跑越慢而分库分......
  • 分布式缓存应用:Memcache 或 Redis
    为什么要使用分布式缓存高并发环境下,例如典型的淘宝双11秒杀,几分钟内上亿的用户涌入淘宝,这个时候如果访问不加拦截,让大量的读写请求涌向数据库,由于磁盘的处理速度与内存显然不在一个量级,服务器马上就要宕机。缓存可以将经常读取的数据存储在快速的内存中,从而避免了频繁访问慢速......
  • 糟糕,被SimpleDateFormat坑到啦!| 京东云技术团队
    1.问题背景问题的背景是这样的,在最近需求开发中遇到需要将给定目标数据通过某一固定的计量规则进行过滤并打标生成明细数据,其中发现存在一笔目标数据的时间在不符合现有日期规则的条件下,还是通过了规则引擎的匹配打标操作。故而需要对该错误匹配场景进行排查,定位其根本原因所在......
  • 微服务与分布式服务架构
    根据设计期的架构思想和运行期的不同结构分为:面向服务的架构分布式服务架构微服务架构1、面向服务架构。以业务服务的角度和服务总线的方式,一般是webservice与ESB,考虑系统架构和企业IT治理;2、分布式服务架构。基于去中心化的分布式服务框架与技术,考虑系统架构和服务治理;3、......