首页 > 编程语言 >Striped64源码阅读

Striped64源码阅读

时间:2024-02-01 16:23:21浏览次数:30  
标签:cells probe long cell 源码 线程 阅读 Striped64

目录

本人的源码阅读主要聚焦于类的使用场景,一般只在java层面进行分析,没有深入到一些native方法的实现。并且由于知识储备不完整,很可能出现疏漏甚至是谬误,欢迎指出共同学习

本文基于corretto-17.0.9源码,参考本文时请打开相应的源码对照,否则你会不知道我在说什么

简介

Striped64是JUC用于实现Accumulator、Adder高性能计数器的基类,比如LongAdder,在高并发的情况下性能优于AtomicLong,原因是 Striped64使用了分段的技术,减少了高并发下的竞争。

Striped64对外的语义是一个数字,在内部将数字的“值”拆成了好几部分:一个base变量和一个 cells数组,当线程尝试修改数字(增减)时,会先尝试对base进行修改,如果成功则退出,如果失败则说明当前存在竞争,会根据线程的哈希值,对cells中的某个元素进行修改。外部需要获取数值时,需要累加base和cells中的所有元素。

相比于 Atomic 变量中所有线程竞争同一个变量,Striped64通过隔离线程的并发,让多个线程分别竞争数组中的某个元素,从而降低了竞争,减少了自旋的时间,最终提高了性能。分段并发是十分重要的减少竞争的手段,在 ConcurrentHashMap、ForkJoinPool 中也有体现。

模型

如简介所说,Striped64对外表现为一个数字,内部base就存储的是这个数字。当并发量高的时候就会导致大量对base的CAS更新失败。因此基于分段并发的思想,使用cells数组,从「每个线程都对base进行更新」,变成「每个线程对cells中对应的元素进行更新」。当外部需要获取数值时,再把cells中的每个更新结果合并到base上并返回。

cells是懒初始化的,当base更新发生竞争的时候,才会初始化cells。当cells上发生竞争,又会进一步扩容cells数组,数组的大小最多差不多为机器的CPU数量,不会无限扩容下去。cells的元素也是懒初始化的,当访问的时候才初始化。线程通过生成hashcode找到自己要访问的那个cell。当cells扩容到最大值,线程在cell上发生竞争的话,就会尝试rehash,理想状态是线程均匀hash分布到各个cell中。

我认为Striped64的思想也可以称为fork-join思想、map-reduce思想,先将一个大问题分解成各个小问题,再合并各个小问题的解得到原来大问题的解。Striped64只是在数值领域的fork-join,理解好Striped64,有利于以后理解更加通用和复杂的fork-join模型。

代码分析

成员变量

cells数组的元素类型Cell:

@jdk.internal.vm.annotation.Contended static final class Cell {
  volatile long value;
  Cell(long x) { value = x; }
  final boolean cas(long cmp, long val) {
    return VALUE.weakCompareAndSetRelease(this, cmp, val);
  }
  final void reset() {
    VALUE.setVolatile(this, 0L);
  }
  final void reset(long identity) {
    VALUE.setVolatile(this, identity);
  }
  final long getAndSet(long val) {
    return (long)VALUE.getAndSet(this, val);
  }

  // 获取value的VarHandle,以支持weakCompareAndSetRelease
  private static final VarHandle VALUE;
  static {
    try {
      MethodHandles.Lookup l = MethodHandles.lookup();
      VALUE = l.findVarHandle(Striped64.Cell.class, "value", long.class);
    } catch (ReflectiveOperationException e) {
      throw new ExceptionInInitializerError(e);
    }
  }
}

Cell可以看成是简易版的AtomicLong,只支持volatile读和release版本的CAS(即weakCompareAndSetRelease,关于release的含义可看这篇)。

// CPU个数,可视为最大并行数
static final int NCPU = Runtime.getRuntime().availableProcessors();
// cells本身和元素都是懒初始化的。cells大小为2的幂
transient volatile Cell[] cells;
transient volatile long base;
// 基于CAS的锁,0为不加锁,1为加锁,当cells扩容或者创建Cell时加锁
transient volatile int cellsBusy;

另外还涉及到Thread类的成员变量:

// 本线程对于ThreadLocalRandom的探针哈希值
@jdk.internal.vm.annotation.Contended("tlr")
int threadLocalRandomProbe;

首先在这可以提前告诉你,threadLocalRandomProbe就是Striped64中线程要映射到cells所使用的hashcode,与Object.hashcode不同,这个hashcode其实是个随机数(为了与Object.hashcode区分开,在下文称其为probe),在Striped64中通过以下两个方法读取 或 更新(rehash):

// 获取probe
static final int getProbe() {
    return (int) THREAD_PROBE.get(Thread.currentThread());
}

// rehash,通过传入的probe产生新的probe
// 如果传入的probe是0,rehash结果也为0,因此必须先初始化线程的probe
static final int advanceProbe(int probe) {
  probe ^= probe << 13;   // xorshift
  probe ^= probe >>> 17;
  probe ^= probe << 5;
  THREAD_PROBE.set(Thread.currentThread(), probe);
  return probe;
}

目前了解到这里就够了,主要还是先分析Striped64的实现思路,在文末再做相关的补充。

方法

Striped64分别对long和double类型各有一套方法处理,由于其base、cell等都是默认用long,因此double值以IEEE 754双精度浮点数的方式存储在long类型的变量中(因为double和long都是64位所以可以这样存),不过流程大致相同,看long版本的longAccumulate就行

这个函数虽然叫xxxAccumulate,意味着将x加到base上,默认是"+"运算,其实还可以传入fn,自定义x和base之间的运算:

// wasUncontended:false表示调用之前对cell的CAS失败了
// index:线程probe在cells中对应的下标
final void longAccumulate(long x, LongBinaryOperator fn,
                          boolean wasUncontended, int index) {
  // 如果下下标为0,可能是probe还没初始化(为0)
  // 因此用ThreadLocalRandom.current()为probe初始化
  if (index == 0) {
    ThreadLocalRandom.current();
    index = getProbe();
    wasUncontended = true;
  }
  
  // 使用无限循环方便CAS重试。collide=true表示多线程竞争同一个cell
  for (boolean collide = false;;) {
    // 变量说明:
    // cs: cells
    // c: 本线程对应的cell
    // n: cells容量
    // v: cell.value或base
    Striped64.Cell[] cs; Striped64.Cell c; int n; long v;
    
    // 分支1. 使用cells进行操作
    if ((cs = cells) != null && (n = cs.length) > 0) {
      // 如果发现没有cell,那么为线程创建cell
      if ((c = cs[(n - 1) & index]) == null) {
        if (cellsBusy == 0) {
          Striped64.Cell r = new Striped64.Cell(x);
          // 加锁
          if (cellsBusy == 0 && casCellsBusy()) {
            try {
              Striped64.Cell[] rs; int m, j;
              if ((rs = cells) != null &&
                (m = rs.length) > 0 &&
                rs[j = (m - 1) & index] == null) { // double-check
                rs[j] = r;
                break;
              }
            } finally {
              cellsBusy = 0;
            }
            continue;
          }
        }
        collide = false;
      }
      
      // 否则,线程有对应cell,如果曾经CAS失败过,那么就不急着对该cell进行CAS
      // 而是进行下面的double hashing,让线程映射到别的cell
      else if (!wasUncontended)
        wasUncontended = true;
      
      // 否则,没有进行过CAS,尝试对cell进行CAS
      else if (c.cas(v = c.value,
                     (fn == null) ? v + x : fn.applyAsLong(v, x)))
        break; // 成功,退出
      
      // 否则,CAS失败,说明有其他线程更新同一个cell。
      // 如果已经达到最大容量或者cells已过时(被其他线程扩容或丢弃),
      // 那么不视为冲突(因为冲突的话,本线程会扩容cells),重新开始循环
      else if (n >= NCPU || cells != cs)
        collide = false;
      
      // 否则,标记为冲突,由下一轮循环来扩容
      else if (!collide)
        collide = true;
      
      // 加锁扩容
      else if (cellsBusy == 0 && casCellsBusy()) {
        try {
          if (cells == cs) // double-check是否过时cells
            cells = Arrays.copyOf(cs, n << 1); // 扩容为原来的两倍
        } finally {
          cellsBusy = 0;
        }
        collide = false;
        continue;
      }
      
      // double hashing: 每次retry都生成新probe,可能为了增加随机性啥的吧
      index = advanceProbe(index);
    }
    
    // 分支2. 初始化cells并为线程创建一个cell
    else if (cellsBusy == 0 && cells == cs && casCellsBusy()) {
      try {
        if (cells == cs) { // double-check
          Striped64.Cell[] rs = new Striped64.Cell[2]; // 初始化容量为2
          rs[index & 1] = new Striped64.Cell(x); // 创建cell
          cells = rs;
          break;
        }
      } finally {
        cellsBusy = 0;
      }
    }
    
    // 分支3. 加锁失败,说明cells正在被初始化/修改,只能对base动手了
    else if (casBase(v = base, (fn == null) ? v + x : fn.applyAsLong(v, x)))
      break;
  }
}

看过JUC其他类的源码的小伙伴知道,longAccumulator的代码结构类似于AQS中的acquire,外面套一个无限循环,循环里面if..else if多个分支,运行流程可以看成一个有限状态机:根据当前某些变量的值选择进入对应的分支,然后进入下一个状态或终态...

循环内分为三个大分支,可以看出longAccumulate是优先使用操作cells而不是base的,至于为什么,可以到LongAdder.add方法看一下,他是对base更新失败才调用longAccumulate的,甚至cell存在的话还会对cell尝试更新一下(这就是为什么要传一个wasUncontended的原因)

结合代码中的注释,实现流程如下:

  • 分支1. 如果cells存在,那么使用cells进行操作
    • 如果没有cell则创建cell
    • 如果曾经对cell的CAS失败过,就取消这个失败的标记
    • 对cell进行CAS
    • 如果CAS失败则表示cell发生冲突,扩容(可能会由于过时cells或已达最大容量无法扩容)
  • 分支2. 否则cells不存在,那么加锁并初始化cells
  • 分支3. 否则加锁失败,此时也不会傻傻空转,而是尝试去直接更新base

分支1的最后会对probe进行double hashing。

补充

ThreadLocalRandom

之前说到线程有一个“探针哈希值”(Probe hash value),每个线程都拥有自己的探针哈希值,并用他作为hashcode定位到cells的下标。线程的探针哈希值是作为Thread成员变量存储的:

// 本线程的探针哈希值
@jdk.internal.vm.annotation.Contended("tlr")
int threadLocalRandomProbe;

那么这个哈希值threadLocalRandomProbe(以下简称为probe)是如何计算得到的?这与类ThreadLocalRandom有关。首先这个类是用于生成随机数,而同样能生成随机数的java.util.Random虽然是线程安全的,但由于多线程共用会导致性能降低,因此ThreadLocalRandom通过实现类似ThreadLocal那样的线程隔离机制(内部并没有使用ThreadLocal)来提高多线程下并发生成随机数的性能。

更进一步地,对Random的竞争其实是对随机数种子发生的竞争,因此ThreadLocalRandom实现方式是为每个线程维护各自的随机数种子,这样就能用各自的种子生成随机数互不干涉。

首先这个类的一般使用方式是:

// 单例模式
// 必须在需要获取随机数的那个线程内调用current获取单例
ThreadLocalRandom.current().nextInt();

为什么要求在线程内获取单例呢?看一下current的实现:

public class ThreadLocalRandom extends Random {
  
  private static final ThreadLocalRandom instance = new ThreadLocalRandom();
  
  // 获取ThreadLocalRandom单例
  public static ThreadLocalRandom current() {
    // 判断线程的threadLocalRandomProbe是否为0,如果为0的话表示还没为线程初始化种子
    // 调用localInit进行初始化
    if (U.getInt(Thread.currentThread(), PROBE) == 0)
      localInit();
    return instance;
  }
  
  // 初始化线程的种子和probe
  static final void localInit() {
    // 生成probe,如果生成的probe恰好为0的话则设为1
    int p = probeGenerator.addAndGet(PROBE_INCREMENT);
    int probe = (p == 0) ? 1 : p;
    // 生成随机数种子
    long seed = RandomSupport.mixMurmur64(seeder.getAndAdd(SEEDER_INCREMENT));
    
    // 设置线程的种子和probe
    Thread t = Thread.currentThread();
    U.putLong(t, SEED, seed);
    U.putInt(t, PROBE, probe);
  }
}

哦,原来设置线程的随机数种子是在获取单例时进行的,并且有没有设置种子是通过probe是否为0来判断的。线程的随机数种子与probe一样,也是Thread类的成员变量:

@jdk.internal.vm.annotation.Contended("tlr")
long threadLocalRandomSeed;

获取ThreadLocalRandom单例后,调用nextInt生成随机数:

public int nextInt() {
  return mix32(nextSeed());
}

final long nextSeed() {
  Thread t; long r; // read and update per-thread seed
  U.putLong(t = Thread.currentThread(), SEED,
            r = U.getLong(t, SEED) + (t.getId() << 1) + GOLDEN_GAMMA);
  return r;
}

nextInt通过操作这个线程的种子来生成随机数,由于线程操作的是各自的种子,因此多线程生成随机数不发生竞争。

这样一看,probe好像也就用来判断一下有没有初始化种子而已。其实它的另一个作用早已说了:用作线程的探针哈希值。比如在Striped64中,用作线程的哈希值,定位到其在cells中的下标。(在ForkJoin、ConcurrentHashMap中也有类似的作用)。什么叫探针哈希值?而且既然是哈希值为什么不直接用Object.hashcode方法?说直白点,探针哈希值就是可以动态改变的哈希值,当多线程在同一个数据单元发生冲突的时候,可以通过更新probe使其定位到其他的数据单元,避免冲突,即通过rehash避免冲突。

总结一下,这个小节主要说明了threadLocalRandomProbe的由来和用途,顺带提了一下ThreadLocalRandom这个类。threadLocalRandomProbe的用途有两个:

  • 判断线程是否已经初始化随机数种子和probe
  • 用作线程的探针哈希值(专门用于多线程并发)

关于ThreadLocalRandom和Striped64最后还有一些东西额外补充下:

  • Striped64中的getProbeadvanceProbe方法与ThreadLocalRandom的这两个同名方法实现是一样的,但是由于后者位于j.u.c包下并且方法是package-private的,所以位于j.u.c.atomic包的Striped64访问不了,只能复制过来用:

    Duplicated from ThreadLocalRandom because of packaging restrictions.

  • 我发现JDK 17+的ThreadLocalRandom类中有几个生成随机数的方法比如nextInt(int bound)直接调用了super.nextInt(bound),即Random.nextInt,这样岂不是失去了线程隔离的特性,又回到了竞争随机数种子的问题。关于这点我在stackoverflow提了一个问题,并且很快得到了回复:https://stackoverflow.com/questions/77917763/jdk-21-why-threadlocalrandoms-nextint-implement-with-super-nextint-directly

Contended注解 - 解决伪共享问题

细心的小伙伴可能会注意到cells数组的元素类型Cell有一个注解:@jdk.internal.vm.annotation.Contended,这个注解有什么用呢?首先得了解「伪共享」的概念。

我们知道,计算机使用了cache缓存提高访问内存的效率。在SMP架构的处理器上,为每个CPU核单独设置一个cache,以提高读写并行访问效率。而CPU对外需要表现为只有一个cache,即表现为各个cache的数据应该是相同的,因此解决cache一致性问题,即修改了一个cache的数据时,需要同步到其他cache上,否则其他核将读到旧的数据。并且这个同步是按cache行为单位的,cache行大小一般为64B。比如核修改了其cache中某行的几个字节,其他核访问其cache的同一行就会检测到该行已失效,并需要从主存中重新读取最新数据(最新的cache需要先将该写到主存)。

在Java中,不严谨地说,访问变量就是访问该变量所在的cache行。而小于64B的多个变量可能会放在同一个cache行中,比如:

class Obj {
  public volatile long a;
  public volatile long b;
}

Obj实例的a和b两个变量由于在内存上相邻存放,很可能会加载到同一个cache行中。当两个线程分别读写a和b变量,并且假设两个线程绑定在了不同的CPU核上并行运行:

// Thread 1
a = 1; // 在Thread 2读b之前写入

// Thread 2
int x = b;

此时,虽然两个线程访问的是不同变量,在Java的层面上并没有共享变量,但实际上两个变量共享了同一个cache line,并导致:当Thread 2读b的时候,a所在的cache line需要先刷到内存,然后再从内存读到Thread 2所在核对应的cache,这就造成了「伪共享」现象,即两个看起来互不相干的变量实际上共享了cache line。

在JDK 7之前,可以在可能发生伪共享的变量前后加上填充字节(padding),效果上是使该变量占据整个cache line,这样a所在的cache line就永远不会出现b的身影:

class Obj {
  private volatile long p0, p1, p2, p3, p4, p5, p6; // 64b * 7
  public volatile long a;                           // 64b
  private volatile long p0, p1, p2, p3, p4, p5, p6; // 64b * 7
  
  public volatile long b;
}

JDK 8则提供了Contented注解达到类似的效果,使得不用手写padding:

class Obj {
  @jdk.internal.vm.annotation.Contended
  public volatile long a;
  @jdk.internal.vm.annotation.Contended
  public volatile long b;
}

之前说过的probe和随机数种子也是(在JDK 21中去掉了这里的Contented,估计是后来的实现不会出现伪共享):

public class Thread implements Runnable {
  @jdk.internal.vm.annotation.Contended("tlr")
  long threadLocalRandomSeed;

  @jdk.internal.vm.annotation.Contended("tlr")
  int threadLocalRandomProbe;
}

再回到Striped64的Cell类上,cells本来就是让每个线程访问不同的元素减少共享以提高并发下的性能,但数组的元素在内存上是相邻存放的,很容易出现伪共享现象,反而增加了访存次数降低性能,因此将数组元素类型Cell加上Contented注解,使每个元素占据独立的cache line,避免伪共享。

参考链接

「Java并发知识」Striped64

「简书」Java 并发计数组件Striped64详解

「简书」Java8使用@sun.misc.Contended避免伪共享

标签:cells,probe,long,cell,源码,线程,阅读,Striped64
From: https://www.cnblogs.com/nosae/p/18001498

相关文章

  • 基于Java+Neo4j开发的知识图谱+全文检索的知识库管理系统(源码分析)
    在数字化高度普及的时代,企事业机关单位在日常工作中会产生大量的文档,例如医院制度汇编,企业知识共享库等。针对这些文档性的东西,手工纸质化去管理是非常消耗工作量的,并且纸质化查阅难,易损耗,所以电子化管理显得尤为重要。【springboot+elasticsearch+neo4j+vue+activiti】实现数字......
  • Kubernetes:kube-scheduler 源码分析
    0.前言[译]kubernetes:kube-scheduler调度器代码结构概述介绍了kube-scheduler的代码结构。本文围绕代码结构,从源码角度出发,分析kube-scheduler的调度逻辑。1.启动kube-schedulerkube-scheduler使用Cobra框架初始化参数,配置和应用。//kubernetes/cmd/kube-sche......
  • Mybatis 源码系列:领略设计模式在 Mybatis 其中的应用
    目录一、Builder模式二、工厂模式三、单例模式四、代理模式五、组合模式六、模板方式模式七、适配器模式八、装饰器模式九、迭代器模式虽然我们都知道有23种设计模式,但是大多停留在概念层面,真实开发中很少遇到,Mybatis源码中使用了大量的设计模式,阅读源码并观察设计模式在其中的应......
  • 龙蜥8.6 源码安装python3.12
    ​ 闲来无事用虚拟机安装了一下龙蜥系统。[root@localhosthome]#cat/etc/*release*AnolisOSrelease8.6NAME="AnolisOS"VERSION="8.6"ID="anolis"ID_LIKE="rhelfedoracentos"VERSION_ID="8.6"PLATFORM_ID="platform:an......
  • 《程序是怎样跑起来的》阅读笔记 - 第一、二章
    简介:《程序是怎样跑起来的》是一本介绍计算机程序工作原理的畅销书籍。本文将对该书的前两章进行阅读笔记,主要涵盖了计算机基础知识和程序执行过程的基本原理。第一章:计算机基础知识本章主要讲解了计算机的基本组成部分以及它们之间的关系。作者通过引入一个简单的模型,描述了计......
  • 《程序是怎样跑起来的》阅读笔记 - 第三、四章
    简介:继续探索《程序是怎样跑起来的》,本文将对该书的第三、四章进行阅读笔记,重点关注计算机程序的存储和数据处理。第三章:计算机的存储器本章主要讲解了计算机的存储器,包括随机存取存储器(RAM)和只读存储器(ROM)。作者首先介绍了这两种存储器的基本概念和特点,然后深入讨论了它们在计......
  • 标题:《程序是怎样跑起来的》阅读笔记 - 第五、六章
    简介:本文将继续探索《程序是怎样跑起来的》,对该书的第五、六章进行阅读笔记,重点关注计算机程序的运行流程和输入输出操作。第五章:程序的执行本章主要讲解了程序的执行过程,包括指令的抓取、解码和执行等步骤。作者详细介绍了计算机中指令的编码方式和指令集体系结构,并解释了控制......
  • 阅读笔记3
    阅读《程序员的修炼之道:从小工到专家》第八章后,我对团队沟通和协作的重要性有了更深入的理解。这一章节详细探讨了如何建立高效的团队沟通机制,以及如何提高团队协作能力,以达到更好的软件开发效果。首先,作者强调了沟通在团队中的重要性。在软件开发过程中,团队成员之间需要频繁......
  • 阅读笔记
    《人月神话》是软件工程领域的一部经典之作,它以其独特的视角和深刻的洞察力,让我对软件开发有了更加全面和深入的认识。在阅读这本书的过程中,我深深地被作者对软件开发的独到见解所吸引。作者通过自己在IBM公司从事大型软件项目开发的亲身经历,向我们揭示了软件开发过程中的种种困......
  • 阅读笔记2
    阅读完《程序员的修炼之道:从小工到专家》第七章后,我对掌握编程语言的重要性有了更深入的理解。这一章节详细探讨了如何选择适合自己的编程语言,以及如何精通掌握一门或多门编程语言。首先,作者强调了编程语言在程序员职业生涯中的重要性。编程语言是程序员表达思想、解决问题的重要......