讲协同编辑,先回顾下从BBS、邮件,到IM
信息的异步传播
信息的生产和消费异步发生。
典型的场景如论坛,博客,文档库,邮件。我在写这篇文档的时候,你们看不到。你们看的时候,我早已写完。异步场景下,信息的生产者会谨慎的推敲措辞,以确保自己的意思被准确的传达。表达方式的丰富性很重要,除了文本以外,段落结构,列表,示意图,表格都有利于信息的准确表达。
信息的同步传播
信息被生产的同时被消费。
话出我之口,入你之耳,过了此时此刻,想还原此情此景,麻烦得很,大多时候也不需要。
同步场景下,信息的生产往往不需要深思熟虑,而是通过你来我往的讨论,澄清,逐步勾
勒出话题的全貌。表达的时效性较之方式的丰富性更为重要。典型的场景如即时通讯,语音通话,视频会议等。简单明了,没有太多的格式。
文档的信息表达方式
传统文档的信息表达方式是典型的异步传播。上述的几类异步场景都可见文档的影子。
然而2016年3月,Google上线的Google Docs颠覆了这个结论,这个改变世界的功能就是“多人实时编辑”,或者称作“协同编辑”。引入了协同编辑的在线文档,就像一块儿在线的白板,使得身处世界两端的人可以在上面共同迭代一个内容,通过你来我往的信息反馈,实现信息的同步传播。而编辑的结果又将沉淀下来,成为信息异步传播的载体。
兼具信息同步与异步传播的能力,协同文档的诞生,无疑给基于互联网的沟通协作带来了一场革命。
这场革命爆发于2006年,而它的起源,早在17年前。
协同文档的技术实现
1984 年,MIT 的科学家提出了计算机支持的协同工作(Computer Supported Cooperative Work,缩写为 CSCW),使得人们可以借助计算机和互联网去完成协同工作。
比如利用用于文档编辑的 Wikis 和用于编程的版本控制系统,小组成员可以在世界任意角落去共同编写大型的百科全书或软件系统。
协同文档现在貌似有很多公司陆续实现了,例如最早的 Google、国内的石墨文档、腾讯的腾讯文档等等。
这所谓的实时协同编辑,是指多人同时编辑一个文档,最典型的例子是 Google Docs,你可以实时看到别人做出的修改,不用手动刷新页面。
要实现实时编辑,我们需要解决两个技术点:实时通信问题、编辑冲突问题,其中实时通信问题比较好解决,可以使用 long pull 或 WebSocket,所以这里就不过多讨论了,如何解决编辑冲突问题上,可以查看《实时协同编辑的实现》
支持并发控制的理论型框架
协同分析的这方面最敬仰的还是Google Wave,《协同编辑:Google Wave架构分析》,其架构的核心是操作转换 (Operational Transformation,OT),这是一个支持并发控制的理论型框架。
核心算法被称为 OT 算法。这个算法本身并不复杂,但是协同文档本身涉及更复杂的系统设计,因为它本身就是分布式的,至少客户端和服务端是分布式的。在较高性能的要求下,服务端可能也是分布式的。所以,如何使这些都能很好的协同,是很值得考虑的。
OT算法
1989年,代表着“文档”的Microsoft Office第一次在Macintosh系统上与世人见面,而代表着“协同”的操作变换算法也第一次见诸论文。
Microsoft Office 中所周知,而操作变换算法又是什么呢?
关于OT算法,copy:https://imweb.io/topic/5b3ec8dd4d378e703a4f4450 内容如下:
实时协同编辑的概念和原理
实时协同编辑,通俗来讲,是指多人同时在线编辑一个文档,且当一个参与者编辑文档的某处时,这个修改会立即同步到其他参与者的计算机上。归纳起来,需要下面几个步骤:
-
计算出当前参与者对文档做出的修改,并发送到服务器
-
在服务器端,对所有参与者的修改进行合并以及冲突处理
-
讲合并之后的结果返回到所有参与者的计算机上
-
将光标移动到正确的位置
由于没有锁的机制,当多个参与者在编辑同一处内容时,便可能出现冲突,这个时候就需要通过一定的算法来自动地解决这些冲突。最后,当所有改变都同步后,每个参与者计算机上所看到的文档内容应该是完全一致的。
How does Operational Transformation work?
Here’s the short overview:
Every change to a shared document is represented as an operation. In a text editor, this operation could be the insertion of the character ‘A’ at position 12. An operation can be applied to the current document resulting in a new document state.
To handle concurrent operations, there is a function (usually called transform) that takes two operations that have been applied to the same document state (but on different clients) and computes a new operation that can be applied after the second operation and that preserves the first operation’s intended change. Let’s make this clear with an example: User A inserts the character ‘A’ at position 12 while user B inserts ‘B’ at the beginning at the document. The concurrent operations are therefore
insert(12, 'A')
andinsert(0, 'B')
. If we would simply send B’s operation to client A and applied it there, there is no problem. But if we send A’s operation to B and apply it after B’s operation has been applied, the character ‘A’ would be inserted one character one position left from the correct position. Moreover, after these operations, A’s document state and B’s document state wouldn’t be the same. Therefore, A’s operationinsert(12, 'A')
has to be transformed against B’s operation to take into account that B inserted a character before position 12 producing the operationinsert(13, 'A')
. This new operation can be applied on client B after B’s operation.This function can be used to build a client-server protocol that handles collaboration between any number of clients. This is explained in Daniel Spiewak’s excellent article Understanding and Applying Operational Transformation.
However, you don’t have to understand the details of Operational Transformation to use it with this library in your own project.
Changeset
一个文档可以被抽象为一系列操作的集合,这个集合便是 changeset。
changeset 具有如下的特征:
-
changeset 是对文档一系列操作的集合
-
这些操作必须是指定的一些操作其中的一种或多种
-
changeset 只有它基于某个特定的版本的文档时才是有意义的
-
一个文档可以表示为一系列的 changeset 依次应用于 空文档 之后得到的
-
定义运算 $AB$,意为将 changeset $B$ 应用到 $A$ 上
-
定义 $C = AB$,意为 changeset $C$ 产生的效果等等价于依次应用 $A$, $B$ 产生的效果
-
changeset 一般表示为 $C_v$, 意为一个基于版本号 $v$ 的 changeset
对于 changeset,通常可以使用 json 的形式表示。
interface Action { type: string; // ...}interface Changeset { version: number; actions: Action[]; }
例如,下面的 changeset 是在协同表格的第 15 行后面添加一行,并删掉第 5 行。
{ "version": 0, "actions": [ { "type": "insertRowAfter", "index": 15 }, { "type": "deleteRow", "index": 5 } ]}
注意:
-
changeset 中 action 的顺序必须保留,因为 index 的位置可能会被改变。
-
一般每 $500ms$ 收集一次 action (变更动作),并生成一个 changeset
Follow
上面说到过,changeset 只有基于某个版本才是有意义的。
假设, 有一个 A 客户端和一个 B 客户端,他们在某时刻具有一样的文档 $X$, 这时,A 做了 $A$ 操作,B 做了 $B$ 操作,他们本地的文档看上去已经不再一样,这时,我们便需要进行 协同
我们可能会采用 merge 的思路。意思是,将 A 的操作和 B 的操作在服务端进行 merge,然后分别应用到 X 上,即
$X ← Xmerge(A, B)$
但是,这显然不可取,因为无论在 A 还是 B 端,都已经分别是 $XA$, $XB$ 了
我们采用 follow 算法
follow 具有如下特征
-
一致性,$XAfollow(A,B)=XBfollow(B,A)$
-
合法性,由 follow 得到的 $follow(A,B)$ 或 $follow(B,A)$ 必须符合业务逻辑
-
follow 必须是数学上的纯函数,也即,对于确定的自变量 $A$,$B$,$follow(A,B)$ 的函数值一定
follow 的以上特性使其很适合作为协同编辑的运算单元。
链式反应法则
定义 $V_n=C_1C_2...C_n$ 为第 $n$ 版本的服务端的文档。
假设服务端的数据库存储了形如 $V_0→V_1→V_2→V_3→...→V_m→ ...→V_H$ 版本信息的某文档,则若有某 changeset $C_m'(m<n)$ 的变更需要应用到该文档,显然,$C_m'$ 不能直接应用到 $V_H$(版本不兼容)。这时,我们根据 follow 的特性,容易想到使用 follow 来做变换。
由于
$$ V{m+1}=V_mC{m+1} $$
即
$$ V{m+1} follow(C{m+1},Cm') = V_m C{m+1} follow(C{m+1},C_m') = V_m C_m' follow(C_m',C{m+1}) $$
由此我们可以得到一个
$$ C{m+1}' = follow(C{m+1},C_m') $$
同理
$$ C{m+2}' = follow(C{m+2},follow(C_{m+1},C_m')) $$
重复以上过程,可以得到一个相对于 $C_H$ 的 $C_H'$。在实现的时候,可以使用数组的 reduce
来进行。
得到该 $V_H'$ 之后,这个 changeset 可以应用到最新的文档 $V_H$ 上,这样便可以完成此次编辑。
客户端的行为定义
客户端负责收集新的变更,生成 changeset 并发送给服务端, 客户端因此需要 维护一些状态、存在一定的生命周期。
$A$: 本地最新的版本,类比服务端的 $V_H$
$X$: 发送给服务端的 changeset,但是还没有得到服务端的确认
$Y$: 用户做的变更生成的 changeset,但是还没有发送给服务端
容易知道,本地文档看上去的样子显然应该是 $V=AXY$
当收到服务端推送过来的 changeset $B$ 时,客户端应该
-
确认是否是可以应用到 A 上的版本的 changeset
-
处理 changeset
-
进行运算:
-
赋值 $A ← A'$, $X ← X'$, $Y ← Y'$
-
应用 $D$ 到文档上
-
$A' ← AB$
-
$X' ← follow(B, X)$
-
$Y' ← follow(follow(X, B),Y)$
-
$D ← follow(X,follow(X, B))$
-
如果可以应用到 A 上
-
如果不能,对 $B$ 根据链式反应法则进行处理, 得到的 $B'$ 应用上面的计算过
证明:
$$ A'X'Y'= ABfollow(B,X)follow(follow(X,B),Y) $$
$$ A'X'Y'= AXfollow(X,B)follow(follow(X,B),Y) $$
$$ A'X'Y'= AXYfollow(Y,follow(X,B)) $$
$$ A'X'Y'= AXYD $$
$$ A'X'Y'=VD $$
当发送出去一个 changeset 的后,等待服务端的 ACK。当收到 ACK 的时候
-
$A ← AX$
-
$X ← null$
服务端的行为定义
这里暂时只举例只有一台服务器的情况
服务端在数据库中维护一个形如 ${V_n} = V_0→V_1→V_2→V_3→...→V_m→ ...→V_H$ 版本信息列表
当有活跃用户进入这个文档时,读入内存中
当一个 changeset $C$ 从客户端发送过来的时候
-
服务端确认是否可以应用到 $V_H$ 上
-
处理这个 changeset
-
根据链式反应法则对 $C$ 进行处理
-
将得到的 $C'$ 按照上面的流程处理
-
直接将这个 changeset 推入记录,并以某种频率保持和数据库同步。
-
将该 changeset 推送到其余所有进入该文档的客户端
-
回应该客户端 ACK 并附带服务端最新的版本号(或 ACK 的 changeset 的版本号)
-
如果可以应用到 $V_H$ 上
-
如果不能应用到 $V_H$ 上
接口和模块定义
公共
interface Change { type: string; [k: string]: any; }type Changeset = { baseVersion: number; changes: Change[]; } | null;type FollowFunc = (cs1: Changeset, cs2: Changeset) => Changeset;
客户端
定义 client:
-
client 是一个客户端
-
client 的初始化需要指定一个 websocket 实例,该实例需要是正在连接或者 OPEN 的状态
-
client 的实现不需要覆盖用户身份、权限等和协同无关的逻辑
-
client 的实例应该暴露创建一个协同文档的接口
-
可以在一个 client 上创建多个协同文档
定义 client 协同文档:
-
协同文档是一个父类
-
协同文档的方法
-
进入文档
-
编辑(传入变更)
-
离开文档
-
协同文档需要定义的生命周期钩子:
-
已经进入文档
-
已经离开文档
-
连接重新建立
-
被拒绝进入文档
-
新的变更需要应用到文档(传出变更)
-
需要清空文档
/// <reference path="common.d.ts" />export = CoSyncClient;interface DocumentLifecycle {}declare class Document{ constructor(followFunc: FollowFunc, documentId: string); connected?(): void; reconnected?(ws: WebSocket): void; connectionLost?(reason: string): void; connectionClosed?(reason: string): void; connectionRejected?(reason: string): void; applyChanges?(changes: Change[]): void; makeEmpty?(): void; edit(change: Change): void; leaveDocument(): Promise<void>; enterDocument(): Promise<void>; } declare namespace CoSyncClient { function createClient( websocket: WebSocket ): { Document: new (followFunc: FollowFunc, documentId: string) => Document; getAllDocuments: () => Document[]; websocket: WebSocket; }; }
服务端
定义 server:
-
server 实际上是与客户端对立的部分服务端
-
server 的初始化需要指定一个 websocket 实例,该实例需要是正在连接或者 OPEN 的状态
-
server 的实现不需要覆盖用户身份、权限等和协同无关的逻辑
-
当 client 和 server 的稳定、互信的连接建立,客户端每创建并进入一个文档,服务端创建相应的 server 协同文档实例
-
server 的实例应该暴露一个类似
onEnterRequest
的回调注册函数,该回调函数
定义 server 协同文档:
-
协同文档是一个父类
-
协同文档的方法
-
允许进入文档
-
拒绝进入文档
-
关闭文档
-
协同文档需要定义的生命周期钩子:
-
获取全部 changeset(从数据库)
-
changeset 将要被处理
-
changeset 将要被接受
-
changeset 将要被广播
/// <reference path="common.d.ts" />import WebSocket from "ws";declare class Document{ constructor(followFunc: FollowFunc, docuemntId: string); getInitialDocument(): Promise<Set<Changeset>>; saveChangeset(cs: Changeset): Promise<void>; changesetWillBeHandled(cs: Changeset): void; changesetWillBeAccepted(cs: Changeset): boolean | void; changesetWillBeBroadcast(cs: Changeset): boolean | void; private broadcast(cs: Changeset): void; private sendInitialDocument(document: Set<Changeset>): void; acceptEnter(): void; rejectEnter(reason: string): void; closeDocument(reason: string): void; } export = CoSyncServer; declare namespace CoSyncServer { function createServer( websocket: WebSocket ): { getAllDocuments: () => Document[]; websocket: WebSocket; onEnterRequest: ( cb: ( websocket: WebSocket, documentId: string, Document: new (followFunc: FollowFunc) => Document) => void, once: boolean ) => void; }; }
客户端和服务端消息结构
-
client -> server
-
documentId
-
versions, Set\
-
documentId, string
-
changeset, Changeset
-
documentId, string
-
documentId, string
-
EnterDocument
-
LeaveDocument
-
Edit
-
FetchVersions
-
server -> client
-
documentId, string
-
changesets, Set\
-
documentId, string
-
version, number
-
documentId, string
-
changeset, Changeset
-
documentId, string
-
changesets, Changeset[]
-
documentId, string
-
RejectEnter
-
InitialDocument
-
OthersEdit
-
EditACK
-
PushVersions
CRDT
解决文本文档的协同编辑有两种方案,一种是 Google Doc 使用的 Operational transformation (OT),还有一种就是 Atom teletype 使用的 Conflict-free replicated data type (CRDT)。
CRDT 有两种形式:
-
基于状态:即将各个节点之间的CRDT数据直接进行合并,所有节点都能最终合并到同一个状态,数据合并的顺序不会影响到最终的结果。
-
基于操作:将每一次对数据的操作通知给其他节点。只要节点知道了对数据的所有操作(收到操作的顺序可以是任意的),就能合并到同一个状态。
CRDT 必须符合可交换性,结合性,还有幂等性,所以 CRDT 数据类型合并最终会收敛到相同状态。
为什么要符合可交换性,结合性,还有幂等性三个特性呢?因为可以解决分布式达到最终一致会遇到的问题:
-
网络问题导致发送接收顺序不一致(幂等性)
-
以及多次发送(可交换性)
OT与CRDT的区别于联系
OT主要用於文本,CRDT更通用
CRDT 不仅仅应用在协同编辑,还有分布式系统的最终一致性上也有应用。
-
OT操作必须通过服务器的转换才可以合并,
-
CRDT 由于其数据结构特性,不通过服务器也可以合并。
CRDT 实现协同编辑
-
OT通过改变操作来实现。操作是通过连线发送的,而并发操作在接收到它们后会进行转换。
-
CRDTS通过改变状态来实现。对本地crdt进行操作。它的状态通过连线发送,并与副本的状态合并。合并的次数和顺序无关紧要—所有副本都会聚合。
因为 OT 中的 transformation 流程太复杂,OT 概念不是很清楚,而 CRDT 很好理解,实现起来也不难。
Lamport timeStamp
在因果关系的事件需要知道事件的先后顺序,并且能够按照正确的顺序处理这些事件,所以需要 Lamport timeStamp 来确定事件发生的事件,Lamport timeStamp 只需要保证两个规则就好了。
newTimeStamp[local] = Max(timeStamp[local], timeStamp[receive]) newTimeStamp[local] = timeStamp[local] + 1
-
本地 Lamport timeStamp 为收到事件的 Lamport timeStamp 和本地 Lamport timeStamp 中最大值
-
生成事件 Lamport timeStamp 时候加一就可以了。
UUUID
每个客户端都有一个唯一 UUUID,再加上 Lamport timeStamp 就可以为每个操作添加唯一可排序的 ID。
因果树
每个操作都有唯一的 ID,接下来就是定义操作的数据结构,并且符合 CRDT 的特性,ID的唯一性可以保证操作的幂等性,操作可以排序保证了交换性,接下来只要保证每个操作都可以被合并就可以了。
id U0@T1 insert 'a' at index 0 id U0@T2 insert 'b' at index 1 id U0@T3 insert 'c' at index 2 id U0@T4 delete 'c' at index 2 id U0@T5 delete 'd' at index 2
一般会这样定义操作,但是这样的操作是线性依赖的,每个操作都是依赖前一个操作的结果,并发的时候,必须确保执行的顺序是一致的,有些操作可能合并会得到不一致的结果。
id U0@T1 insert 'a' at id null id U0@T2 insert 'b' at id U0@T1 id U0@T3 insert 'c' at id U0@T2 id U0@T4 delete 'c' at id U0@T3 id U0@T5 insert 'd' at id U0@T2
前端实现
-
前端接受服务器生成的初始 UUUID , Lamport timeStamp,actions,初始化前端状态。
-
前端接受用户输入,产生初始操作,比如 index 10 insert 'a'在位置10插入字母a,之后通过位置 10 深度遍历因果树,找到操作依赖字母的 id,并且生成操作 id,生成操作,然后发送到服务器。
-
前端接受服务器广播操作,更新前端 timestamp,插入因果树,生成最新字符串,更新前端状态。
服务端实现
服务端实现就比较简单了,只要提供 UUUID 还有 Lamport timeStamp 生成,还有就是接受客户端的操作,并且广播给其他客户端,因为后端使用 node 写的,还可以和前端公用一部分代码,实现就更方便。
参考文章:
Google Wave的架构 https://www.infoq.cn/article/2009/06/wave/,查看英文原文: Google Wave’s Architecture
协同文档的技术实现 https://imweb.io/topic/5b3ec8dd4d378e703a4f4450
实时协同编辑的实现 https://fex.baidu.com/blog/2014/04/realtime-collaboration/
钉钉文档协同编辑背后的核心技术原理 https://developer.aliyun.com/article/738238
文献:
Context-based Operational Transformation in Distributed Collaborative Editing Systems——COT算法论文
Google Wave Operational Transformation——G-Suite协同引擎的协议白皮书
Achieving convergence,causality-preservation, and intention-preservation in real-time cooperative editing systems——GOT算法及一维数据操作变换算法论文
Operational Transformation Frequently Asked Questions and Answers——南洋理工大学教授Chengzheng Sun的Survey,覆盖了OT领域绝大多数研究成果
转载本站文章《协同文档:OT与CRDT实现协同编辑笔记》,
请注明出处:https://www.zhoulujun.cn/html/webfront/engineer/Architecture/8564.html