package org.example.file.mult; //函数值接口 @FunctionalInterface public interface FuncationCallback { void callback(String param); }
回调接收
package org.example.file.mult; import java.util.ArrayList; public class FuncationCallbackImpl { //函数式 回调参数处理 public FuncationCallbackImpl(ArrayList arrayList, FuncationCallback funcationCallback) { arrayList.forEach(ele->{ funcationCallback.callback(ele+"456789"); }); } }
队列业务实现
package org.example.file.mult; import org.apache.tomcat.util.threads.TaskThreadFactory; import java.util.ArrayList; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class Securite { //有界队列,根据实际业务设置即可 public static ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10); //静态线程池,一会多线程执行能用到,根据自己的机器性能配置即可 public static Executor executor = new ThreadPoolExecutor(3, 10, 2000, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new TaskThreadFactory("测试队列", false, 7)); public Securite() { } public void exec(Integer ele) { queue.offer(ele); } //全局静态 内存可见性常量,空值任务暂停使用 public static volatile int a = 0;//刷回主内存 //静态内部类,有利于在主程序空值进度 public static class MultTask implements Runnable { private ArrayBlockingQueue<Integer> arrayBlockingQueue1; //线程-队列构造器 便于每个线程都能冲全局队列取值 public MultTask(ArrayBlockingQueue<Integer> arrayBlockingQueue1) { this.arrayBlockingQueue1 = arrayBlockingQueue1; } @Override public void run() { //循环,这里要注意和arrayBlockingQueue1.take()配合使用,避免空悬打满cpu while (true) { try { //当参数等于8时,后面的线程停止取队列的元素进行操作,来达到外界可控的目的 if (a == 8) { System.out.println("开始终端了"); Thread.sleep(5000); System.out.println("5秒后继续"); // a = 51; return; } Integer take = arrayBlockingQueue1.take(); String name = Thread.currentThread().getName(); ArrayList arrayList = new ArrayList(); arrayList.add(take); //队列每次取值后再回调函数里处理后的值 new FuncationCallbackImpl(arrayList, new FuncationCallback() { @Override public void callback(String param) { System.out.println("返回param:" + param); } }); //TODO 根据自己的业务进行后续处理 System.out.println(">>>>>>>>>>>>>>>>>>>>>:" + take + "<><><><><><><>:" + name); } catch (InterruptedException e) { } } } } public static void main(String[] args) throws InterruptedException { Securite securite = new Securite(); for (int i = 0; i < 10; i++) { if (i == 8) { a = 8; } securite.exec(i); executor.execute(new MultTask(queue)); } System.out.println("10s后在运行一次"); Thread.sleep(1500); securite.exec(10); executor.execute(new MultTask(queue)); } }
标签:接收,队列,util,import,ArrayBlockingQueue,new,多线程,public From: https://www.cnblogs.com/wangbiaohistory/p/17274823.html