问题
有时,我们需要通过变量的值来同步代码块。
为了解这个问题,我们将考虑一个简单的银行应用程序,它对客户的每次转账进行以下操作:
通过此外部Web服务转移评估现金返还金额(CashBackService)
在数据库中执行汇款(AccountService)
更新现金返还评估系统中的数据(CashBackService)
汇款操作如下:
public void withdrawMoney(UUID userId,int amountOfMoney){
synchronized(userId){
结果 result = externalCashBackService。evaluateCashBack(userId,amountOfMoney);
accountService。转移(用户id,amountOfMoney + 结果。getCashBackAmount());
externalCashBackService。cashBackComplete(用户id,结果。getCashBackAmount());
}
}
应用程序的基本组件如下图所示:
我试图尽可能清楚地做出一个例子。支付服务中的资金转移取决于其他两项服务:
第一个是CashBackService与REST协议下的另一个(外部)Web应用程序交互。而且,为了计算实际的现金返还,我们需要与此应用程序同步事务。这是因为下一笔现金返还金额可能取决于用户付款总额。
第二个是AccountService与内部数据库通信并存储与其用户帐户相关的数据。在此服务中,我们可以使用JPA事务在数据库中将某些操作作为原子操作。
在现实生活中,我强烈建议重构这样的系统,以避免这种情况,如果可能的话。但在我们的例子中,想象一下我们别无选择。
我们来看看这个应用程序的草案代码:
@服务
公共 类 PaymentService {
@Autowired
private ExternalCashBackService externalCashBackService ;
@Autowired
私人 AccountService 帐户服务 ;
public void withdrawMoney(UUID userId,int amountOfMoney){
synchronized(userId){
结果 result = externalCashBackService。evaluateCashBack(userId,amountOfMoney);
accountService。转移(用户id,amountOfMoney + 结果。getCashBackAmount());
externalCashBackService。cashBackComplete(用户id,结果。getCashBackAmount());
}
}
}
@服务
公共 类 ExternalCashBackService {
@Autowired
私人 RestTemplate restTemplate ;
public Result evaluateCashBack(UUID userId,int amountOfMoney){
return sendRestRequest(“evaluate”,userId,amountOfMoney);
}
public Result cashBackComplete(UUID userId,int cashBackAmount){
return sendRestRequest(“complete”,userId,cashBackAmount);
}
private Result sendRestRequest(String action,UUID userId,int value){
URI externalCashBackSystemUrl =
URI。create(“http://cash-back-system.org/api/” + action);
HttpHeaders headers = new HttpHeaders();
标题。集(“接受”,的MediaType。APPLICATION_JSON_VALUE);
RequestDto requestDto = new RequestDto(userId,value);
HttpEntity <?> request = new HttpEntity <>(requestDto,headers);
ResponseDto responseDto = restTemplate。exchange(externalCashBackSystemUrl,
HttpMethod。GET,
要求,
ResponseDto。课程)
。getBody();
返回 新的 结果(responseDto。的getStatus(),responseDto。的getValue());
}
}
@服务
公共 类 AccountService {
@Autowired
private AccountRepository accountRepository ;
@Transactional(隔离 = REPEATABLE_READ)
public void transfer(UUID userId,int amountOfMoney){
帐户 account = accountRepository。getOne(userId);
帐户。的setBalance(帐户。所以getBalance()- amountOfMoney);
accountRepository。保存(帐户);
}
}
但是,您可以拥有多个具有相同值的对象(userId 在此示例中),但同步适用于对象的实例而不是其值。
下面的代码不能很好地工作。因为它不正确同步; 静态工厂方法UUID.fromString(..)在每次调用时都会生成UUID类的新实例,即使您传递了相等的字符串参数。
因此,我们得到UUID了相同键的不同实例。如果我们从多个线程运行此代码,那么我们很有可能遇到同步问题:
public void threadA(){
paymentService。方法withdrawMoney(UUID。fromString(“ea051187-bb4b-4b07-9150-700000000000” ),1000);
}
public void threadB(){
paymentService。方法withdrawMoney(UUID。fromString(“ea051187-bb4b-4b07-9150-700000000000” ),5000);
}
在这种情况下,您需要为equals对象获取相同的引用以在其上进行同步。
解决这个问题的错误方法
同步方法
你可以移动synchronized一个方法:
public synchronized void withdrawMoney(UUID userId,int amountOfMoney){
..
}
该解决方案性能不佳。您将阻止绝对所有用户的资金转账。如果您需要使用相同的密钥同步不同类中的不同操作,则此解决方案根本不会对您有所帮助。
字符串实习生
为了确保包含用户ID的类的实例在所有同步块中都是相同的,我们可以将它序列化为String并使用它String.intern()来获取equals字符串的相同链接。
String.intern使用全局池来存储被拦截的字符串。当您在字符串上请求实习生时,如果此类字符串存在,则从此池中获取引用,否则此字符串将放入池中。
您可以String.intern在The Java Language Specification - 3.10.5 String Literals或有关String.intern的Oracle Java文档中找到更多详细信息。
public void withdrawMoney(UUID userId,int amountOfMoney){
同步(用户id。的toString()。实习生()){
..
}
}
使用实习生不是一个好习惯,因为使用GC很难清理字符串池。并且,您的应用程序可以通过主动使用来消耗太多资源 String.intern。
此外,外部代码有可能在与应用程序相同的字符串实例上同步。这可能导致死锁。
一般来说,实习生的使用最好留给JDK的内部库; Aleksey Shipilev有关于这个概念的好文章。
我们如何才能正确解决这个问题?
创建自己的同步原语
我们需要实现描述下一个图的行为:
首先,我们需要创建一个新的同步原语 - 自定义互斥锁。这将由变量的值起作用,而不是由对象的引用起作用。
它会像一个“命名的互斥体,” 但有点宽,与使用任何物品的价值鉴定,而不仅仅是一个字符串的值的能力。您可以找到同步原语的示例,以便通过其他语言(C ++,C#)中的名称进行锁定。现在,我们将用Java解决这个问题。
解决方案看起来像这样:
public void withdrawMoney(UUID userId,int amountOfMoney){
同步(XMutex。的(用户id)){
..
}
}
为了确保获得相同的变量值相同的互斥锁,我们将创建互斥锁工厂。
public void withdrawMoney(UUID userId,int amountOfMoney){
同步(XMutexFactory。得到(用户id)){
..
}
}
public void purchase(UUID userId,int amountOfMoney,VendorDescription 供应商){
同步(XMutexFactory。得到(用户id)){
..
}
}
为了使用相等的键在每个请求上返回相同的互斥锁实例,我们需要存储创建的互斥锁。如果我们将这些互斥锁存储在简单中HashMap,那么当新键出现时,地图的大小将会增加。我们没有工具来评估互斥锁在任何地方都没有使用的时间。
在这种情况下,我们可以使用它WeakReference来保存地图中互斥锁的引用,就在它使用时。为了实现这种行为,我们可以使用WeakHashMap数据结构。几个月前我写了一篇关于这类参考文章的文章; 你可以在这里更详细地考虑它:Java中的Soft,Weak,Phantom References
我们的互斥工厂将以此为基础WeakHashMap。互斥锁工厂会创建一个新的互斥锁,如果value(key) 找不到 互斥锁的话 HashMap。然后,将创建的互斥锁添加到 HashMap。使用它WeakHashMap允许我们HashMap 在存在任何对它的任何引用的同时存储互斥 。并且,HashMap 当释放所有对它的引用时,互斥锁将自动从中删除 。
我们需要使用同步版本WeakHashMap; 让我们看看文档中描述的内容:
此类未同步。可以构造同步的WeakHashMap
使用Collections.synchronizedMap方法。
这很难过,不久之后,我们会仔细研究一下原因。但是现在,让我们考虑一下实现的例子,这是官方文档提出的(我的意思是使用 Collections.synchronizedMap):
public final Map < XMutex < KeyT >,WeakReference < XMutex < KeyT >>> weakHashMap =
收藏。synchronizedMap(new WeakHashMap < XMutex < KeyT >,
WeakReference < XMutex < KeyT >>>());
public XMutex < KeyT > getMutex(KeyT key){
validateKey(key);
return getExist(key)
。orElseGet(()- > saveNewReference(key));
}
private 可选< XMutex < KeyT >> getExist(KeyT key){
return 可选。ofNullable(WeakHashMap中,得到(XMutex。的(关键)))
。map(WeakReference :: get);
}
private XMutex < KeyT > saveNewReference(KeyT key){
XMutex < KeyT > 互斥锁 = XMutex。的(键);
WeakReference < XMutex < KeyT >> res = weakHashMap。put(互斥,新的 WeakReference <>(互斥));
如果(RES != 空 && 资源。获得()!= 空){
返回 资源。get();
}
返回 互斥 ;
}
性能怎么样?
如果我们查看代码Collections.synchronizedMap,那么我们会在全局互斥上找到很多同步,这是与SynchronizedMap 实例配对创建的 。
SynchronizedMap(Map < K,V > m){
这个。m = 物体。requireNonNull(m);
互斥 = 这个 ;
}
并且所有其他方法 SynchronizedMap 都在互斥锁上同步:
public int size(){
synchronized(互斥){ return m。size();}
}
public boolean containsKey(Object key){
synchronized(互斥){ return m。containsKey(key);}
}
public V get(Object key){
synchronized(互斥){ return m。得到(关键);}
}
public V put(K 键,V 值){
synchronized(互斥){ return m。put(key,value);}
}
public V remove(Object key){
synchronized(互斥){ return m。删除(键);}
}
...
此解决方案没有最佳性能。所有这些同步都会导致我们使用互斥锁工厂对每个操作进行永久锁定。
将WeakReference作为键的ConcurrentHashMap
我们需要看一下使用的ConcurrentHashMap。它具有比Collections.synchronizedMap更好的性能。但我们有一个问题 - ConcurrentHashMap不允许使用弱引用。这意味着垃圾收集器无法删除未使用的互斥锁。
我找到了两种方法来解决这个问题:
首先是创建我自己的ConcurrentMap实现。这是正确的决定,但需要很长时间。
第二个是使用ConcurrentReferenceHashMapSpring Framework 中的实现。这是一个很好的实现,但它有一些细微差别。我们将在下面考虑它们。
让我们改变 XMutexFactory 实现来使用ConcurrentReferenceHashMap:
公共 类 XMutexFactory < KeyT > {
/ **
*使用默认设置创建互斥锁工厂
* /
public XMutexFactory(){
这个。map = new ConcurrentReferenceHashMap <>(DEFAULT_INITIAL_CAPACITY,
DEFAULT_LOAD_FACTOR,
DEFAULT_CONCURRENCY_LEVEL,
DEFAULT_REFERENCE_TYPE);
}
/ **
*通过键创建并返回互斥锁。
*如果此键的互斥锁已存在于弱映射中,
*然后返回互斥锁的相同引用。
* /
public XMutex < KeyT > getMutex(KeyT key){
归还 这个。地图。compute(key,(k,v)- >(v == null)? new XMutex <>(k):v);
}
}
这很酷!
代码少,但性能比以前更多。我们试着检查一下这个解决方案的性能。
创建一个简单的基准
为了选择实现,我做了一个小基准测试。
Map 测试中涉及三种实现 :
Collections.synchronizedMap 基于 WeakHashMap
ConcurrentHashMap
ConcurrentReferenceHashMap
我使用 ConcurrentHashMap in基准测试来比较测量。此实现不适合在互斥锁的工厂中使用,因为它不支持使用弱引用或软引用。
所有基准测试都是使用JMH库编写的。
# 运行 完成。总 时间:00:04:39
基准 模式 Cnt 评分 误差 单位
ConcurrentMap。ConcurrentHashMap的 thrpt 5 0,015 ? 0,004 OPS / 纳秒
ConcurrentMap。ConcurrentReferenceHashMap thrpt 5 0,008 ? 0,001 OPS / 纳秒
ConcurrentMap。SynchronizedMap thrpt 5 0,005 ? 0,001 OPS / 纳秒
ConcurrentMap。ConcurrentHashMap的 avgt 5 565,515 ? 23,638 纳秒/ 运算
ConcurrentMap。ConcurrentReferenceHashMap avgt 5 1098,939 ? 28,828 纳秒/ 运算
ConcurrentMap。SynchronizedMap avgt 5 1503,593 ? 150,552 纳秒/ 运算
ConcurrentMap。ConcurrentHashMap的 样品 301796 663,330 ? 11,708 纳秒/ 运算
ConcurrentMap。ConcurrentReferenceHashMap 样品 180062 1110,882 ? 6,928 纳秒/ 运算
ConcurrentMap。SynchronizedMap 样品 136290 1465,543 ? 5,150 纳秒/ 运算
ConcurrentMap。的ConcurrentHashMap SS 5 336419,150 ? 617549,053 纳秒/ 运算
ConcurrentMap。ConcurrentReferenceHashMap SS 5 922844,750 ? 468380,489 纳秒/ 运算
ConcurrentMap。SynchronizedMap SS 5 1199159,700 ? 4339391,394 纳秒/ 运算
在这个微基准测试中,我创建了一个情况,当几个线程计算地图中的值。您可以在Concurrent Map基准测试中更详细地考虑此基准测试的源代码
把它放在图表上:
因此,ConcurrentReferenceHashMap 在这种情况下使用它是 正确的。
XSync库入门
我将此代码打包到XSync库中,您可以将其用作变量值同步的现成解决方案。
为此,您需要添加下一个依赖项:
< 依赖>
< groupId > com.antkorwin </ groupId >
< artifactId > xsync </ artifactId >
< version > 1.1 </ version >
</ dependency >
然后,您可以创建XSync类的实例,以便在需要的类型上进行同步。对于Spring Framework,您可以将它们作为bean:
@豆
public XSync < UUID > xSync(){
返回 新的 XSync <>();
}
现在,您可以使用它:
@Autowired
私有 XSync < UUID > xSync ;
public void withdrawMoney(UUID userId,int amountOfMoney){
xSync。execute(userId,()- > {
结果 result = externalPolicySystem。validateTransfer(userId,amountOfMoney,WITHDRAW);
accountService。转移(userId,amountOfMoney,WITHDRAW);
});
}
public void purchase(UUID userId,int amountOfMoney,VendorDescription 供应商){
xSync。execute(userId,()- > {
..
});
}
并发测试
为了确保这段代码运行良好,我写了几个并发测试。
有一个这样的测试的例子:
public void testSyncBySingleKeyInConcurrency(){
//安排
XSync < UUID > xsync = new XSync <>();
String id = UUID。randomUUID()。toString();
NonAtomicInt var = new NonAtomicInt(0);
//这里有一个魔力:
//我们创建了一个并行流并尝试增加
//每个流中的相同非原子整数变量
IntStream。范围(0,THREAD_CNT)
。盒装()
。并行()
。的forEach(Ĵ - > XSYNC。执行(UUID。fromString(ID),VAR :: 增量));
//断言
等待()。atMost(5,TIMEUNIT。SECONDS)
。直到(var :: getValue,equalTo(THREAD_CNT));
断言。assertThat(VAR。的getValue())。isEqualTo(THREAD_CNT);
}
/ **
*执行不是线程安全的整数变量:
* /
@Getter
@AllArgsConstructor
私有 类 NonAtomicInt {
私有 int 值 ;
public int increment(){
返回 值++ ;
}
}
让我们看看这个测试的结果: