import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; public class SeqManager { private final Seq seq; private final ExecutorService e = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<>(), new ThreadPoolExecutor.DiscardPolicy()); private final Function<Seq, Long> refreshSeqMax; public SeqManager(String key, Seq seq, Function<Seq, Long> refreshSeqMax, boolean load) { this.seq = seq; seq.key = key; this.refreshSeqMax = refreshSeqMax; if(load) { loadSeq(); } } public long getSeq(long perWaitTime, TimeUnit tu) { long r = getSeq(); while(r == -1) { try { tu.sleep(perWaitTime); } catch (InterruptedException ignored) { } r = getSeq(); } return r; } public long getSeq() { //try load not load success will getSel() return -1; //not do block get, because may be effect response speed; loadSeq(); long nextSeq = seq.trySeq.incrementAndGet(); if(nextSeq > seq.maxSeq.get()) { seq.fullSeq.incrementAndGet(); return -1; } return seq.nowSeq.incrementAndGet(); } private void loadSeq() { // 大部分时间这里都会返回false if(seq.needLoad()) { //在并发的时候,只会提交成功一个。 e.execute(() -> { //加锁进行seq加载,保证只会有一个线程在加载seq。 synchronized (this) { //二次检测seq是否需要加载,即使遇到刚加载完之后,但是线程感知延后,做了加载seq提交也能保证不会多次加载。 if (seq.needLoad()) { long max = refreshSeqMax.apply(seq); seq.maxSeq.addAndGet(max); //获取超出来未获得的seq避免浪费seq。 long now = seq.fullSeq.get(); //还原Seq。 seq.trySeq.addAndGet(-now); //清空被还原的数量。 seq.fullSeq.getAndAdd(-now); } } }); } } public static class Seq { private String key; private AtomicLong trySeq;//避免超消耗 private AtomicLong fullSeq;//存储未被消耗的seq private AtomicLong nowSeq;//实际最新消耗的Seq private AtomicLong maxSeq;//当前加载的最大Seq private int loadNum;//剩下多少的时候加载 private int step;//每次加载多少 public Seq(AtomicLong trySeq, AtomicLong fullSeq, AtomicLong nowSeq, AtomicLong maxSeq, int loadNum, int step) { this.trySeq = trySeq; this.fullSeq = fullSeq; this.nowSeq = nowSeq; this.maxSeq = maxSeq; this.loadNum = loadNum; this.step = step; } public Seq(int loadNum, int step) { this.trySeq = new AtomicLong(); this.fullSeq = new AtomicLong(); this.nowSeq = new AtomicLong(); this.maxSeq = new AtomicLong(); this.loadNum = loadNum; this.step = step; } public boolean needLoad() { return nowSeq == null || maxSeq.get() - trySeq.get() <= loadNum; } public boolean full() { return maxSeq.get() - trySeq.get() < 0; } public String getKey() { return key; } public AtomicLong getTrySeq() { return trySeq; } public AtomicLong getFullSeq() { return fullSeq; } public AtomicLong getNowSeq() { return nowSeq; } public AtomicLong getMaxSeq() { return maxSeq; } public int getLoadNum() { return loadNum; } public int getStep() { return step; } } }
import java.io.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; public class TestSeq { private static AtomicLong al = new AtomicLong(0); public static void main(String[] args) throws IOException, InterruptedException, ExecutionException { SeqManager.Seq s = new SeqManager.Seq(1000, 100); SeqManager sm = new SeqManager("", s, a -> { return al.addAndGet(a.getStep()); }, true); AtomicInteger ai = new AtomicInteger(0); ConcurrentSkipListMap<Long, Integer> clq = new ConcurrentSkipListMap<>(); CompletableFuture<Void> [] cf = new CompletableFuture[10000]; for(int i = 1;i <= 10000;i++) { int finalI = i; cf[i-1] = CompletableFuture.runAsync(() -> { long seq = sm.getSeq(); if(seq == -1) { ai.incrementAndGet(); return; } clq.put(seq, finalI); }); } CompletableFuture.allOf(cf).get(); new File("seq.txt").delete(); try(FileWriter fw = new FileWriter("seq.txt")) { clq.forEach((k, v)->{ try { fw.write(k + ":" + v + "\n"); } catch (IOException e) { throw new RuntimeException(e); } }); } System.out.println(ai.get()); } }
标签:自增,Seq,seq,生成器,private,public,AtomicLong,new,id From: https://www.cnblogs.com/math-and-it/p/18115613