首页 > 编程语言 >ForkJoinPool源码分析之四(ForkJoinWorkerThread源码)

ForkJoinPool源码分析之四(ForkJoinWorkerThread源码)

时间:2022-08-24 23:55:07浏览次数:53  
标签:ForkJoinPool 源码 ForkJoinWorkerThread 之四 new null ThreadGroup pool

一、类结构及其成员变量

1.1 类结构和注释

类结构代码如下:

public class ForkJoinWorkerThread extends Thread {
    
}

ForkJoinWorkerThread继承了Thread类,ForkJoinWorkerThread是由ForkJoinPool管理的线程,该线程执行ForkJoinTask。此类仅可做为扩展功能的需要而被集成,因为没有提供可以调度或者可重新的方法。但是,你可以覆盖主任务处理循环周围初始化和终止方法。如果确实创建了这个类的子类,还需要在ForkJoinPool中提供自定义的ForkJoinWorkerThreadFactory来使用。

1.2 常量

主要有两个:

final ForkJoinPool pool;                // the pool this thread works in
final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics

ForkJoinWorkerThreads由ForkJoinPools管理,并执行ForkJoinTasks。请参见ForkJoinPool的内部文档。此类仅仅维护了指向pool和WorkQueue的链接。pool字段在构造的时候直接设置。但是直到对registerWorker调用完成之后,才设置workQueue字段。这将导致可见性竞争,可以通过要求workQueue字段仅由其所属线程访问来规避这个问题。对于InnocuousForkJoinWorkerThread子类的支持,要求我们在此处和子类中破坏很多封装,通过Unsafe以访问和设置Thread字段。

这是两个final修饰的常量,智能初始化一次。

二、构造函数

2.1 ForkJoinWorkerThread(ForkJoinPool pool)

protected ForkJoinWorkerThread(ForkJoinPool pool) {
    // Use a placeholder until a useful name can be set in registerWorker
    super("aForkJoinWorkerThread");
    this.pool = pool;
    this.workQueue = pool.registerWorker(this);
}

其构造函数主要是创建一个在给定pool中的ForkJoinWorkerThread。构造函数中最主要的方法就是registerWorker。

2.2 ForkJoinWorkerThread(ForkJoinPool pool, ThreadGroup threadGroup, AccessControlContext acc)

ForkJoinWorkerThread(ForkJoinPool pool, ThreadGroup threadGroup,
                     AccessControlContext acc) {
    super(threadGroup, null, "aForkJoinWorkerThread");
    U.putOrderedObject(this, INHERITEDACCESSCONTROLCONTEXT, acc);
    eraseThreadLocals(); // clear before registering
    this.pool = pool;
    this.workQueue = pool.registerWorker(this);
}

这个方法需要注意的是,采用unSafe方法,在这个类的INHERITEDACCESSCONTROLCONTEXT位置处,设置传入的AccessControlContext对象。
之后调用方法eraseThreadLocals将threadLocals清除。
之后与同用的构造函数一致。
擦除ThreadLocal也是采用UnSafe来完成。通过putObject将ThreadLocals的位置设置为null。

final void eraseThreadLocals() {
    U.putObject(this, THREADLOCALS, null);
    U.putObject(this, INHERITABLETHREADLOCALS, null);
}

实际上这个方法将会提供给InnocuousForkJoinWorkerThread继承的时候使用。

三、核心方法

3.1 run

做为Thread,最重要的就是run方法,我们来看看ForkJoinWorkerThread的实现:

public void run() {
    //如果workQueue为空,则抛出异常
    if (workQueue.array == null) { // only run once
        Throwable exception = null;
        try {
            //调用onStart方法
            onStart();
            //调用pool的runWorker方法,运行workQueue
            pool.runWorker(workQueue);
        } catch (Throwable ex) {
            exception = ex;
        } finally {
           //操作完之后处理
            try {
               //调用onTermination方法
                onTermination(exception);
            } catch (Throwable ex) {
               //如果出现异常,则进行异常处理
                if (exception == null)
                    exception = ex;
            } finally {
               //最终需要指向deregister方法
                pool.deregisterWorker(this, exception);
            }
        }
    }
}

runWorker方法实际上是对workQueue结合随机魔数,选择一个workQueue进行遍历,调用scan方法,如果不为空则执行,反之则wait。
其中registerWorker与deregisterWorker方法 ,我们可以参考前面的ForkJoinPool源码解读。

3.2 定义的可扩展方法

由于ForkJoinWorkerThread还支持继承扩展,因此在此定义了两个扩展的方法:

protected void onStart() {
}

protected void onTermination(Throwable exception) {
}

这两个方法用于执行之前和之后,onStart用于run实际执行之前,执行一些初始化操作。onTermination用于run实际执行之后,执行一些清理操作。

四、内部类InnocuousForkJoinWorkerThread

这个类就是继承了ForkJoinWorkerThread的一个实现类。此类定义了一个没有任何权限、也非用户定义的任何线程组的线程。这个线程在运行完每个top的task之后,会擦除所有的ThreadLocals。

源码如下:

static final class InnocuousForkJoinWorkerThread extends ForkJoinWorkerThread {
    /** The ThreadGroup for all InnocuousForkJoinWorkerThreads */
    private static final ThreadGroup innocuousThreadGroup =
        createThreadGroup();

    /** An AccessControlContext supporting no privileges */
    private static final AccessControlContext INNOCUOUS_ACC =
        new AccessControlContext(
            new ProtectionDomain[] {
                new ProtectionDomain(null, null)
            });

    InnocuousForkJoinWorkerThread(ForkJoinPool pool) {
        super(pool, innocuousThreadGroup, INNOCUOUS_ACC);
    }

    @Override // to erase ThreadLocals
    void afterTopLevelExec() {
        eraseThreadLocals();
    }

    @Override // to always report system loader
    public ClassLoader getContextClassLoader() {
        return ClassLoader.getSystemClassLoader();
    }

    @Override // to silently fail
    public void setUncaughtExceptionHandler(UncaughtExceptionHandler x) { }

    @Override // paranoically
    public void setContextClassLoader(ClassLoader cl) {
        throw new SecurityException("setContextClassLoader");
    }

    private static ThreadGroup createThreadGroup() {
        try {
            sun.misc.Unsafe u = sun.misc.Unsafe.getUnsafe();
            Class<?> tk = Thread.class;
            Class<?> gk = ThreadGroup.class;
            long tg = u.objectFieldOffset(tk.getDeclaredField("group"));
            long gp = u.objectFieldOffset(gk.getDeclaredField("parent"));
            ThreadGroup group = (ThreadGroup)
                u.getObject(Thread.currentThread(), tg);
            while (group != null) {
                ThreadGroup parent = (ThreadGroup)u.getObject(group, gp);
                if (parent == null)
                    return new ThreadGroup(group,
                                           "InnocuousForkJoinWorkerThreadGroup");
                group = parent;
            }
        } catch (Exception e) {
            throw new Error(e);
        }
        // fall through if null as cannot-happen safeguard
        throw new Error("Cannot create ThreadGroup");
    }
}

AccessControlContext INNOCUOUS_ACC 定义了一个不支持任何特权访问的AccessControlContext。这个类会创建一个单独的threadGroup,以确保其不属于任何一个用户创建的ThreadGroup。

五、ForkJoinPool中创建工作线程的过程

此时再来结合ForkJoinPool中的ForkJoinWorkerThreadFactory,就能明白ForkJoinThread的创建意义了。ForkJoinPool根据访问权限的需要,定义了采用默认的创建方法,还是创建InnocuousForkJoinWorkerThread。

5.1 makeCommonPool创建过程

再ForkJoinPool重的makeCommPool,有如下代码:

if (factory == null) {
    if (System.getSecurityManager() == null)
        factory = defaultForkJoinWorkerThreadFactory;
    else // use security-managed default
        factory = new InnocuousForkJoinWorkerThreadFactory();
}

这里也就是说,如果System.getSecurityManager()为null,则返回默认的ThreadFactory,而不为null,则说, 使用了默认的安全管理级别,因此将创建InnocuousForkJoinWorkerThreadFactory。

5.2 createWorker创建过程

ForkJoinWorkerThreadFactory fac = factory;
try {
    if (fac != null && (wt = fac.newThread(this)) != null) {
        wt.start();
        return true;
    }
} catch (Throwable rex) {
    ex = rex;
}

也就是说,createWorker根据ForkJoinWorkerThreadFactory的实现类来创建。

5.3 InnocuousForkJoinWorkerThreadFactory

static final class InnocuousForkJoinWorkerThreadFactory
    implements ForkJoinWorkerThreadFactory {

    private static final AccessControlContext innocuousAcc;
    static {
        Permissions innocuousPerms = new Permissions();
        innocuousPerms.add(modifyThreadPermission);
        innocuousPerms.add(new RuntimePermission(
                               "enableContextClassLoaderOverride"));
        innocuousPerms.add(new RuntimePermission(
                               "modifyThreadGroup"));
        innocuousAcc = new AccessControlContext(new ProtectionDomain[] {
                new ProtectionDomain(null, innocuousPerms)
            });
    }

    public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
        return (ForkJoinWorkerThread.InnocuousForkJoinWorkerThread)
            java.security.AccessController.doPrivileged(
                new java.security.PrivilegedAction<ForkJoinWorkerThread>() {
                public ForkJoinWorkerThread run() {
                    return new ForkJoinWorkerThread.
                        InnocuousForkJoinWorkerThread(pool);
                }}, innocuousAcc);
    }
}

5.4 DefaultForkJoinWorkerThreadFactory

static final class DefaultForkJoinWorkerThreadFactory
    implements ForkJoinWorkerThreadFactory {
    public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
        return new ForkJoinWorkerThread(pool);
    }
}

六、总结

ForkJoinWorkerThread实际上非常简单,就是结合ForkJoinPool,然后根据其需要,创建合适的线程的过程。这里面值得我们借鉴的是,如果需要创建无其他访问权限的线程,实际上这两种线程大部分内容都是相同的,因此可以通过继承来复用大部分代码。之后定义两个factory,让最终的用户根据需要选择factory。

标签:ForkJoinPool,源码,ForkJoinWorkerThread,之四,new,null,ThreadGroup,pool
From: https://www.cnblogs.com/ciel717/p/16444888.html

相关文章

  • 以太坊 layer2: optimism 源码学习(二) 提现原理
    作者:林冠宏/指尖下的幽灵。转载者,请:务必标明出处。掘金:https://juejin.im/user/1785262612681997博客:http://www.cnblogs.com/linguanh/GitHub:https://git......
  • 《Python源码剖析》PDF高清版试读
      《Python源码剖析》PDF高清版免费下载地址  内容简介  · · · · · ·作为主流的动态语言,Python不仅简单易学、移植性好,而且拥有强大丰富的库的......
  • Spring 源码学习笔记10——Spring AOP
    Spring源码学习笔记10——SpringAOP参考书籍《Spring技术内幕》SpringAOP的实现章节书有点老,但是里面一些概念还是总结比较到位源码基于Spring-aop5.3.22可能和旧......
  • LNMP架构的源码编译
    LNMP架构的源码编译一、LNMP架构的编译安装1.安装nginx服务(1)关闭防火墙[root@localhost~]#systemctlstopfirewalld[root@localhost~]#systemctldisablefirew......
  • 新字符设备驱动原理和框架源码分析
    一、分配和释放设备号动态申请设备号:/*dev:设备号--dev_tdevid;count:是要申请的数量,一般都是一个;name:是设备名字*/intalloc_chrdev_region(dev_t*dev,uns......
  • CBV源码剖析
    你自己不要修改源码除了bug很难找突破口在urls.pyurl(r'^login/',views.MyLogin.as_view())url(r'^login/',views.view)FBV一模一样CBV与FBV在路由匹配上本质是一样......
  • 即时通讯源码(基于websocket即时通讯源码uniapp)+视频搭建教程
    即时通讯系统源码服务器端构架目录:1、构建基本服务器2、用户在线功能3、用户消息广播机制4、用户业务层封装5、在线用户查询6、修改用户名......
  • 姿薇优选源码
    最近新开发了一款姿薇优选系统,其主要功能有预约系统,支付系统,商城系统,会员系统,积分系统,物流系统,赠送系统,实名认证系统等等。姿薇优选系统客服系统源码分享:<?phpnamespac......
  • 自定义频率-权限 频率执行源码分析-全局异常处理-自动生成接口文档-RBAC
    自定义频率类 #自定义的逻辑1)取出访问者ip{192.168.1.12:[访问时间3,访问时间2,访问时间1],192.168.1.12:[],192.168.1.14:[]}2)判断当前ip不在访问字典里,......
  • 数字化培训、知识库、考学一体平台@附源码
    前言随着信息化的进一步推进,目前各行各业都在进行数字化转型,本人从事过医疗、政务等系统的研发,和客户深入交流过日常办公中“知识”的重要性,再加上现在倡导的互联互通、数......