CRDT 是什么意思?CRDT是Conflict-Free Replicated Data Types的缩写,直译的话即“无冲突可复制数据类型”。翻译过来还是一脸懵逼!
用稍微通俗一点的话说:研究分布式系统,尤其是研究最终一致性分布式系统的过程中,一个最基本的问题就是,应该采用什么样的数据结构来保证最终一致性?CRDT即是理论界目前对于这个问题的答案!
当然理论界的思路现在不是所有人能跟上的,需要更加简单的解释。
一致性的难题
分布式系统的好处,不用说了,这是人类目前实际可行的构建超大规模系统的唯二办法之一(另一种就是超级计算机)。如果考虑上经济成本这个因素,那么分布式系统就是唯一可行之法。
构建分布式系统,抛开效率问题不谈,首先是如何保持其正确性?简单的讲,要构建一个跑得飞快的分布式系统并不难,难的是构建一个运作起来正确性与单机程序完全无二的分布式系统。
这就要说到 CAP定理 。
CAP定理
CAP定理中的CAP三个字母是以下三个单词的缩写:
- Consistency(一致性,所有节点在同一时间的数据完全一致)
- Availability(可用性,读写服务一直可用)
- Partition tolerance(分区容错性,部分节点故障系统仍能提供一致可用的服务)
CAP定理告诉我们,在构建分布式系统的时候,这三者只可以同时选择两样。即,就算给我们再多的钱,在目前的计算机体系结构下,三样同时选择,理论上无可能的。如果谁做到了?就是不科学。
怎么选?
分区容错性P 是分布式系统所必需的,因为我们无法保证所有的节点都不出现故障。因此,目前分布式系统都在一致性C 和 可用性A 中取舍。
CP系统
:
- 为了保持一致性,系统可能出现暂时不可用的情况,比如节点之间正在同步数据,这时候进来的读请求就要排队等一会。
- 分区故障发生时,为了保持一致性,系统只能暂停等待分区故障恢复。
- 适合对一致性要求高,可以在性能(可用性)上做出妥协,比如电商的秒杀功能(有时写,一直读)。
AP系统
- 为了保持可用性,在 强一致性, 弱一致性, 最终一致性的一致性等级中,
仅保证最终一致性
,所谓最终一致性,是指所有节点上的数据合并后得到的结果是一致的 - 分区故障发生时,为了保证系统可用,放弃一致性。
- 适合对一致性要求不高,但对性能(可用性)要求高的场景,比如日志系统(有时读,一直写),实时聊天,多人协同编辑。
需要注意的是最终一致性的系统不是不保证一致性,
而是不在保证可用性和分区容错性的同时保证一致性,最终我们还是要在最终一致性的各节点之间处理数据,使他们达到一致。
CA系统
- 单机系统
理想的分布式
CP系统的启发
- Raft等强一致性算法,选举leader统一受理写请求。
- 写请求过多时,不堪重负。
- 集中写,分布读,分布的不够彻底。
理想中的分布式
- 分布式写请求,极大提高吞吐量。
- 任何节点受理写请求后,通知给系统中所有其他节点更新信息。
- 分布写,分布读,彻底的分布。
该架构的问题
假设该分布式系统中存有1个变量count。
-
到达顺序问题
- 1号请求A节点count=100,2号请求B节点count=99,最终应该count=99
- 但count=99先到达C节点,count=100后到达C节点,导致C节点的count=100,产生错误
-
幂等问题
- 假设count=100
- 1号请求A节点count+=1,以为更新失败了(实际节点处理成功了),重新请求count+=1,最终应该count=101
- 由于+=不满足幂等律,count=102,产生错误
问题解决
如果我们找到一种数据结构,它满足交换、结合、幂等律,那么上述顺序问题,幂等问题将不复存在。
- 交换律 a∨b = b∨a
- 结合律 (a∨b)∨c = a∨(b∨c)
- 幂等律 a∨a = a
这就是CRDT产生的思想。
一个CRDT的简单例子
假设系统中存有count = 100,表示100块钱,在某个时刻t1:
- 1号节点存入10块,在它看来,count=110
- 2号节点取出10块,在它看来,count=90
- 3号节点,count=100
在这个时刻t1,这三者都是对的,因为对于最终一致的系统,允许存在不一致的时刻。那么经过一段时间之后,假设是t2吧,我们需要使得A B和C系统看来,T都有100块钱,即保证最终一致。这中间肯定需要做一些操作,例如A B和C系统之间交换一些必要的信息数据。
1号和2号的更新信息发送给3号节点,但3号节点并不知道110和90究竟哪个正确 !
换一种思路:改变数据结构,不存count最终值,存操作
- 1号节点存入10块,存count+=10
- 2号节点取出10块,存count-=10
结果:
- 3号节点收到其他节点的通知时,即可通过count = 100+10-10 = 100,计算出最终正确结果,而且不同节点的到达顺序不影响最终结果,满足交换律和结合律
- 进一步优化,比如说给每个操作加一个时间戳,包含相同时间戳和操作的通知只处理一次,满足幂等律
此时,同时满足交换律,结合律和幂等律,这就是CRDT的思想。
CRDT的几种结构
有了这样的概念,现在我们可以回头看CRDT。CRDT就是这样一些适应于不同场景的可以保持最终一致性的数据结构的统称,说了这么多,CRDT其实就是一些数据结构而已。
G-Counter(Grow-only Counter)
G-Counter (Grow-only Counter) 是一种用于分布式系统的CRDT(Conflict-free Replicated Data Type),主要用于实现计数器。这种计数器只能递增,不能递减。
该数据结构实现了+运算,其核心思想如下:
- n个node
- payload是长度为n的数组
- update函数负责+1
- merge函数负责合并两个node的payload
- compare函数负责merge时选择哪个node的P[i]
- query函数负责计算v值,sum(P[i])
下面对上面的每一条进行详细讲解。
1. node(节点)
在分布式系统中,有多个节点(node),每个节点都有一个独立的计数器。
2. payload(负载)
每个节点的计数器状态存储在一个长度为n的数组中,其中n是节点的数量。数组中的每个元素代表对应节点的计数值。
3. update function(更新函数)
更新函数负责在节点的计数器上加1。具体来说,某个节点调用更新函数时,只增加该节点对应的数组元素的值。例如,节点A更新计数器时,只增加数组中A位置的值。
public void update() {
payload[nodeId] += 1;
}
4. merge function(合并函数)
合并函数负责合并两个节点的负载(即两个长度为n的数组)。合并的方式是对每个位置取两个节点对应元素的最大值。这样做是为了保证合并后的结果包含所有节点的最大计数值。
// Static merge function
public static int[] merge(int[] P1, int[] P2) {
int length = P1.length;
int[] result = new int[length];
for (int i = 0; i < length; i++) {
result[i] = Math.max(P1[i], P2[i]);
}
return result;
}
5. compare function(比较函数)
在合并时,比较函数负责选择每个位置上哪个节点的值。实际上,合并函数已经隐含了比较的过程,因为它取的是两个节点对应元素的最大值。
6. query function(查询函数)
查询函数负责计算计数器的总值,即数组中所有元素的和。这个值表示所有节点更新操作的总次数。
// Static query function
public static int query(int[] P) {
int sum = 0;
for (int value : P) {
sum += value;
}
return sum;
}
举个例子
:假设有3个节点:A、B和C。初始时,每个节点的计数器数组都为[0, 0, 0]。
-
节点A更新计数器:
A调用update函数:update([0, 0, 0], 0) -> [1, 0, 0] -
节点B更新计数器:
B调用update函数:update([0, 0, 0], 1) -> [0, 1, 0] -
节点A和B合并计数器:
A和B的计数器分别为[1, 0, 0]和[0, 1, 0],调用merge函数:merge([1, 0, 0], [0, 1, 0]) -> [1, 1, 0] -
节点C更新计数器:
C调用update函数:update([0, 0, 0], 2) -> [0, 0, 1] -
节点A和C合并计数器:
A和C的计数器分别为[1, 1, 0]和[0, 0, 1],调用merge函数:merge([1, 1, 0], [0, 0, 1]) -> [1, 1, 1] -
查询总计数值:
调用query函数:query([1, 1, 1]) -> 3
代码
1. 创建一个远程接口
定义了G-Counter的远程接口,包括更新、获取负载和合并负载的方法。
import java.rmi.Remote;
import java.rmi.RemoteException;
public interface GCounterRemote extends Remote {
void update() throws RemoteException;
int[] getPayload() throws RemoteException;
void merge(int[] otherPayload) throws RemoteException;
}
2. 实现远程接口
GCounter类:实现了GCounterRemote接口,并提供了更新、获取负载和合并负载的方法。
main方法创建了三个节点的实例,并将它们绑定到RMI注册表中。
import java.rmi.RemoteException;
import java.rmi.server.UnicastRemoteObject;
import java.util.Arrays;
public class GCounter extends UnicastRemoteObject implements GCounterRemote {
private int[] payload;
private int nodeId;
protected GCounter(int numNodes, int nodeId) throws RemoteException {
this.payload = new int[numNodes];
this.nodeId = nodeId;
}
@Override
public void update() throws RemoteException {
payload[nodeId] += 1;
}
@Override
public int[] getPayload() throws RemoteException {
return payload;
}
@Override
public void merge(int[] otherPayload) throws RemoteException {
for (int i = 0; i < payload.length; i++) {
payload[i] = Math.max(payload[i], otherPayload[i]);
}
}
public static int query(int[] P) {
int sum = 0;
for (int value : P) {
sum += value;
}
return sum;
}
public static void main(String[] args) {
try {
int numNodes = 3;
GCounter nodeA = new GCounter(numNodes, 0);
GCounter nodeB = new GCounter(numNodes, 1);
GCounter nodeC = new GCounter(numNodes, 2);
java.rmi.registry.LocateRegistry.createRegistry(1099);
java.rmi.Naming.rebind("rmi://localhost/NodeA", nodeA);
java.rmi.Naming.rebind("rmi://localhost/NodeB", nodeB);
java.rmi.Naming.rebind("rmi://localhost/NodeC", nodeC);
System.out.println("Nodes are ready");
} catch (Exception e) {
e.printStackTrace();
}
}
}
3. 创建客户端代码以模拟RPC调用
GCounterClient类:模拟客户端,通过RMI调用远程节点的方法,进行更新、合并和查询操作。
import java.rmi.Naming;
import java.util.Arrays;
public class GCounterClient {
public static void main(String[] args) {
try {
GCounterRemote nodeA = (GCounterRemote) Naming.lookup("rmi://localhost/NodeA");
GCounterRemote nodeB = (GCounterRemote) Naming.lookup("rmi://localhost/NodeB");
GCounterRemote nodeC = (GCounterRemote) Naming.lookup("rmi://localhost/NodeC");
// Node A updates
nodeA.update();
System.out.println("Node A Payload: " + Arrays.toString(nodeA.getPayload()));
// Node B updates
nodeB.update();
System.out.println("Node B Payload: " + Arrays.toString(nodeB.getPayload()));
// Merge Node A and Node B
nodeA.merge(nodeB.getPayload());
nodeB.merge(nodeA.getPayload());
System.out.println("Merged A and B Payload A: " + Arrays.toString(nodeA.getPayload()));
System.out.println("Merged A and B Payload B: " + Arrays.toString(nodeB.getPayload()));
// Node C updates
nodeC.update();
System.out.println("Node C Payload: " + Arrays.toString(nodeC.getPayload()));
// Merge Node A and Node C
nodeA.merge(nodeC.getPayload());
nodeC.merge(nodeA.getPayload());
System.out.println("Merged A and C Payload A: " + Arrays.toString(nodeA.getPayload()));
System.out.println("Merged A and C Payload C: " + Arrays.toString(nodeC.getPayload()));
// Query total value from Node A
int totalValue = GCounter.query(nodeA.getPayload());
System.out.println("Total Value from Node A: " + totalValue);
} catch (Exception e) {
e.printStackTrace();
}
}
}
//运行结果:
Node A Payload: [1, 0, 0]
Node B Payload: [0, 1, 0]
Merged A and B Payload A: [1, 1, 0]
Merged A and B Payload B: [1, 1, 0]
Node C Payload: [0, 0, 1]
Merged A and C Payload A: [1, 1, 1]
Merged A and C Payload C: [1, 1, 1]
Total Value from Node A: 3
其它CRDT的几种结构
除了G -Counter,还有几种常见的 CRDT结构,如下:
PN-Counter (Positive-Negative Counter)
该数据结构实现了+/-运算,字符串运算怎么办?
G-Set (Grow-only Set)
实现了字符串add()操作
- merge函数直接取两个set的并集,只能添加不能删除
- OrbitDB底层基于G-Set,其将常用的数据结构转化成字符串记录在G-Set中,比如说Orbit支持的Key-Value结构本质上也是G-Set实现的
字符串删除怎么办?
2P-Set (Two-Phase Set)
实现了字符串的add()和remove()操作,再加一个Set,记录被删除的字符串,query的时候取差集
删了又想加回来怎么办?
LWW-Element-Set (Last-Write-Wins-Element-Set)
与2P-Set很像,add()和remove()操作加时间戳,字符串在add和remove集合中都存在时,究竟是否存在取决于最晚的时间戳
出现相同时间戳怎么办? 可以设置bias,bias toward add or remove
OR-Set (Observed-Removed Set)
与LWW-Element-Set很像,时间戳替换为唯一标识(tags),query时add tag set 减去 remove tag set,仍然非空,则该数据存在
工业界应用
OrbitDB如何实现在线聊天
- OrbitDB是一个基于ipfs的新型分布式数据库。
- OrbitDB实现log, feed, keyvalue, docs, counter等常用数据结构。
- ipfs-log实现G-Set。
- ipfs的Pubsub目前是实验性功能,实现publish-subscribe发布订阅模式。
其它
- SoundCloud open-sourced Roshi, implemented a LWW-element-set CRDT on top of Redis.
- Riak is a distributed NoSQL key-value data store based on CRDTs.
- League of Legends uses the Riak CRDT implementation for its in-game chat system,7.5 million concurrent users and 11,000 messages per second
总结
- 空间换时间。
- 一个优美的,有特定适用场景的分布式同步算法。
- 并不是银弹。