首页 > 编程语言 >Java熔断框架:resilience4j

Java熔断框架:resilience4j

时间:2024-08-19 17:28:45浏览次数:14  
标签:Java java System 熔断 resilience4j println import event out

1.文档

中文文档:https://github.com/lmhmhl/Resilience4j-Guides-Chinese/blob/main/index.md

 

2. maven依赖

       <dependency>
            <groupId>io.github.resilience4j</groupId>
            <artifactId>resilience4j-spring-boot2</artifactId>
            <version>1.6.1</version>
        </dependency>

3.代码

断路器

package org.example;

import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.TimeoutException;
import java.util.stream.IntStream;

//断路器
public class TestCircuitBreaker {

    public static void main(String[] args) {
        // 自定义配置
        CircuitBreakerConfig config = CircuitBreakerConfig.custom()
                //滑动窗口
                .slidingWindowType(CircuitBreakerConfig.SlidingWindowType.COUNT_BASED) //滑动窗口类型:计数
                .slidingWindowSize(100) //滑动窗口的大小 默认100
                .minimumNumberOfCalls(100) //计算失败率或慢调用率之前所需的最小调用数(每个滑动窗口周期) 默认 100

                //开启熔断的阈值
                .failureRateThreshold(50) //失败率阈值  默认50
                .slowCallRateThreshold(100) //慢调用比率阈值 默认100%
                .slowCallDurationThreshold(Duration.ofSeconds(6)) //慢调用时间阈值(毫秒) 默认6秒

                //半开
                .waitDurationInOpenState(Duration.ofMillis(3)) //开启过渡到半开应等待的时间 (默认6秒)
                .permittedNumberOfCallsInHalfOpenState(10) //半开状态允许的调用次数 默认10
                .maxWaitDurationInHalfOpenState(Duration.ofSeconds(10)) //半开状态等待最大时长(毫秒) 默认0   0代表一直半开

                //异常
                .recordExceptions(IOException.class, TimeoutException.class, RuntimeException.class) //指定异常列表
                .ignoreExceptions(IOException.class) //指定被忽略且既不算失败也不算成功的异常列表

                .build();

        // 创建注册器
        CircuitBreakerRegistry registry = CircuitBreakerRegistry.of(config);

        // 注册器上注册事件
        registry.getEventPublisher()
                .onEntryAdded(entryAddedEvent -> {
                    CircuitBreaker addedCircuitBreaker = entryAddedEvent.getAddedEntry();
                    System.out.println("CircuitBreaker " + addedCircuitBreaker.getName() + " added");
                })
                .onEntryRemoved(entryRemovedEvent -> {
                    CircuitBreaker removedCircuitBreaker = entryRemovedEvent.getRemovedEntry();
                    System.out.println("CircuitBreaker " + removedCircuitBreaker.getName() + " removed");
                });

        // 创建断路器
        CircuitBreaker circuitBreaker = registry.circuitBreaker("backendService");

        // 断路器上注册事件
        circuitBreaker.getEventPublisher()
                .onSuccess(event -> System.out.println("success event"))
                .onError(event -> System.out.println("error event"))
                .onIgnoredError(event -> System.out.println("ignoredError event"))
                .onReset(event -> System.out.println("reset event"))
                .onStateTransition(event -> System.out.println("stateTransition event"));

        // 执行装饰函数
        IntStream.range(1, 111).forEach(i -> {
            System.out.println(i + " 开始");
            try {
                if(i>100) {
                    String result = circuitBreaker
                            .executeSupplier(BackendService::doSomething);
                } else if (i % 2 == 0) {
                    String result = circuitBreaker
                            .executeSupplier(BackendService::doSomethingThrowing);
                } else {
                    String result = circuitBreaker
                            .executeSupplier(BackendService::doSomething);
                }
                System.out.println(i + " 正常");
            } catch (Exception e) {
                System.out.println(i + " 异常 ");
                System.out.println(e.getMessage());
            }

            System.out.println("========================================");
        });

    }

    static class BackendService {
        public static String doSomething() {
            return "Hello World";
        }

        public static String doSomethingThrowing() {
            throw new RuntimeException("error");
        }
    }

}
View Code

限流器

package org.example;

import io.github.resilience4j.ratelimiter.RateLimiter;
import io.github.resilience4j.ratelimiter.RateLimiterConfig;
import io.github.resilience4j.ratelimiter.RateLimiterRegistry;

import java.time.Duration;
import java.util.stream.IntStream;

//限流器
public class TestRateLimiter {

    public static void main(String[] args) {
        // 自定义配置
        RateLimiterConfig config = RateLimiterConfig.custom()
                .limitRefreshPeriod(Duration.ofNanos(500)) //每个1秒刷新一次 (默认:500纳秒)
                .limitForPeriod(1)   //一次刷新周期内允许的最大请求数 (默认:50次)
                .timeoutDuration(Duration.ofSeconds(5))  //线程超时时间 (默认:5秒)
                .build();

        // 创建注册器
        RateLimiterRegistry registry = RateLimiterRegistry.of(config);

        // 注册器上注册事件
        registry.getEventPublisher()
                .onEntryAdded(event -> {
                    RateLimiter addedRateLimiter = event.getAddedEntry();
                    System.out.println("RateLimiter " + addedRateLimiter.getName() + " added");
                })
                .onEntryRemoved(event -> {
                    RateLimiter removedRateLimiter = event.getRemovedEntry();
                    System.out.println("RateLimiter " + removedRateLimiter.getName() + " removed");
                });

        // 创建限流器
        RateLimiter rateLimiter = registry.rateLimiter("backendService2");

        // 限流器上注册事件
        rateLimiter.getEventPublisher()
                .onSuccess(event -> System.out.println("success event"))
                .onFailure(event -> System.out.println("failure event"));

        // 执行装饰函数
        IntStream.range(1, 10).forEach(i -> {
            String result = rateLimiter.executeSupplier(BackendService2::doSomething);
            System.out.println("========================================" + result);
        });

    }

    static class BackendService2 {
        public static String doSomething()  {
            System.out.println(Thread.currentThread().getId());
            return "Hello World";
        }
    }

}
View Code

隔离器

package org.example;

import io.github.resilience4j.bulkhead.*;

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Supplier;
import java.util.stream.IntStream;

//隔离器 : Resilience4j 隔板
// 通过隔离的形式,让不稳定因素限制在某一个小范围内,不会导致整个系统崩溃。
//防止下游依赖被并发请求冲击
//防止发生连环故障
public class TestBulkhead {

    //并发调用隔离测试
    public static void main(String[] args) throws IOException {
        //基于信号量隔板 : 信号量 即 限制线程数量
        //semaphoreBulkheadTest();

        //基于线程池的隔板
        threadPoolBulkheadTest();

        //main线程阻塞
        System.in.read();
    }

    private static void threadPoolBulkheadTest() {
        ThreadPoolBulkheadConfig config = ThreadPoolBulkheadConfig.custom()
                .maxThreadPoolSize(2)
                .coreThreadPoolSize(1)
                .queueCapacity(1)
                .build();

        ThreadPoolBulkheadRegistry registry = ThreadPoolBulkheadRegistry.of(config);

        registry.getEventPublisher()
                .onEntryAdded(entryAddedEvent -> {
                    ThreadPoolBulkhead addedBulkhead = entryAddedEvent.getAddedEntry();
                    System.out.println("Bulkhead " + addedBulkhead.getName() + " added");
                })
                .onEntryRemoved(entryRemovedEvent -> {
                    ThreadPoolBulkhead removedBulkhead = entryRemovedEvent.getRemovedEntry();
                    System.out.println("Bulkhead " + removedBulkhead.getName() + " removed");
                });

        ThreadPoolBulkhead bulkhead = registry.bulkhead("backendService3");

        bulkhead.getEventPublisher()
                .onCallPermitted(event -> System.out.println("permit event"))
                .onCallRejected(event -> System.out.println("rejected event"))
                .onCallFinished(event -> System.out.println("finish event"));

        Supplier<String> supplier = BackendService3::doSomething;

        Supplier<CompletionStage<String>> decoratedSupplier = ThreadPoolBulkhead.decorateSupplier(bulkhead, supplier);

        IntStream.range(1, 10).forEach(i -> {
            decoratedSupplier
                    .get()
                    .whenComplete((result, error) -> {
                        if (result != null) {
                            System.out.println(result);
                        }
                        if (error != null) {
                            error.printStackTrace();
                        }
                    });
        });

    }

    private static void semaphoreBulkheadTest() {
        BulkheadConfig config = BulkheadConfig.custom()
                .maxConcurrentCalls(2)       //允许并发数
                //当达到并发调用数量时,新的线程执行时将被阻塞,这个属性表示最长的等待时间
                //如果线程无法在我们指定的 2s maxWaitDuration 内获得许可,则会被拒绝
                .maxWaitDuration(Duration.ofSeconds(1))
                .build();

        BulkheadRegistry registry = BulkheadRegistry.of(config);

        registry.getEventPublisher()
                .onEntryAdded(entryAddedEvent -> {
                    Bulkhead addedBulkhead = entryAddedEvent.getAddedEntry();
                    System.out.println("Bulkhead " + addedBulkhead.getName() + " added");
                })
                .onEntryRemoved(entryRemovedEvent -> {
                    Bulkhead removedBulkhead = entryRemovedEvent.getRemovedEntry();
                    System.out.println("Bulkhead " + removedBulkhead.getName() + " removed");
                });

        Bulkhead bulkhead = registry.bulkhead("backendService3");

        bulkhead.getEventPublisher()
                .onCallPermitted(event -> System.out.println("permit event"))
                .onCallRejected(event -> System.out.println("rejected event"))
                .onCallFinished(event -> System.out.println("finish event"));

        //让我们调用几次装饰操作来了解隔板的工作原理。我们可以使用 CompletableFuture 来模拟来自用户的并发请求
        IntStream.range(1, 5).forEach(i -> {
            CompletableFuture
                    .supplyAsync(Bulkhead.decorateSupplier(bulkhead, BackendService3::doSomething))
                    .thenAccept(result -> System.out.println(result))
            ;
        });

        try {
            System.in.read();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }

    }

    public static class BackendService3 {
        public static String doSomething()  {
//            try {
//                Thread.sleep(3000); // 模拟耗时操作
//            } catch (InterruptedException e) {
//                throw new RuntimeException(e);
//            }
            return "Hello World";
        }
    }
}
View Code

限时器

package org.example;

import io.github.resilience4j.timelimiter.TimeLimiter;
import io.github.resilience4j.timelimiter.TimeLimiterConfig;
import io.github.resilience4j.timelimiter.TimeLimiterRegistry;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Supplier;

//限时器
public class TestTimeLimiter {

    public static void main(String[] args) throws Exception {
        TimeLimiterConfig config = TimeLimiterConfig.custom()
                .cancelRunningFuture(true)
                .timeoutDuration(Duration.ofMillis(1000))
                .build();

        TimeLimiterRegistry registry = TimeLimiterRegistry.of(config);

        TimeLimiter timeLimiter = registry.timeLimiter("backendService4");

        //test1(timeLimiter);

        test2(timeLimiter);


        System.in.read();
    }

    private static void test2(TimeLimiter timeLimiter) throws Exception {
        Supplier<String> supplier = BackendService4::doSomething;
        CompletableFuture<String> future = CompletableFuture.supplyAsync(supplier);
        // 阻塞方式,实际上是调用了future.get(timeoutDuration, MILLISECONDS)
        String result = timeLimiter.executeFutureSupplier(() -> future);
        System.out.println(result);
    }

    private static void test1(TimeLimiter timeLimiter) {
        // 需要一个调度器对非阻塞CompletableFuture进行调度,控制超时时间
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(3);

       // 返回CompletableFuture类型的非阻塞变量
        CompletableFuture<String> result = timeLimiter.executeCompletionStage(
                scheduler,
                () -> CompletableFuture.supplyAsync(BackendService4::doSomething)
        ).toCompletableFuture();

        try {
            System.out.println(result.get());
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    static class BackendService4 {
        public static String doSomething() {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return "Hello World";
        }
    }
}
View Code

重试

package org.example;

import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import io.github.resilience4j.retry.RetryRegistry;
import io.vavr.CheckedFunction0;
import io.vavr.control.Try;

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.TimeoutException;

//重试
public class TestRetry {
    public static void main(String[] args) throws IOException {
        RetryConfig config = RetryConfig.custom()
                .maxAttempts(3) //最大重试次数
                .waitDuration(Duration.ofMillis(1000)) //两次重试之间的时间间隔
                .retryOnResult(response -> !response.equals("hello"))  //计算结果是否重试
                .retryOnException(e -> e instanceof RuntimeException) //RuntimeException 异常触发重试
                .retryExceptions(IOException.class, TimeoutException.class) //TimeoutException 异常触发重试
                .ignoreExceptions(IOException.class, TimeoutException.class)
                .build();

        RetryRegistry registry = RetryRegistry.of(config);

        Retry retry = registry.retry("backendService5");

        // 装饰
        CheckedFunction0<String> retryableSupplier = Retry.decorateCheckedSupplier(retry, BackendService5::doSomething);

        // 进行方法调用
        Try<String> result = Try.of(retryableSupplier)
                .recover((throwable) -> "Hello world from recovery function");

        System.in.read();
    }

    static class BackendService5 {
        public static String doSomething() {
            System.out.println(1);
            //throw new RuntimeException("hello");
            return "hello";
        }
    }
}
View Code

 

 

 

标签:Java,java,System,熔断,resilience4j,println,import,event,out
From: https://www.cnblogs.com/smileblogs/p/18367753

相关文章

  • Java-人工智能初学者实用手册-全-
    Java人工智能初学者实用手册(全)零、前言在一切都由技术和数据驱动的现代世界中,人工智能变得越来越重要,它是使任何系统或流程自动化的过程,以自动执行复杂的任务和功能,从而实现最佳生产率。面向初学者的Java人工智能实践解释了使用流行的基于Java的库和框架来构建智能应用程......
  • JavaScript-快速语法参考-全-
    JavaScript快速语法参考(全)原文:JavaScriptQuickSyntaxReference协议:CCBY-NC-SA4.0一、使用JavaScript要开始试验JavaScript,您应该安装一个支持这种语言的集成开发环境(IDE)。有很多不错的选择,比如NetBeans、Eclipse、VisualStudio、括号。在本书中,我们将使用NetBe......
  • JavaScript-入门指南-全-
    JavaScript入门指南(全)原文:BeginningJavaScript协议:CCBY-NC-SA4.0一、JavaScript简介这些年来,avaScript发生了很大变化。我们目前正处于一个JavaScript库的时代,你可以构建任何你想构建的东西。JavaScript存在于客户机和服务器上,存在于桌面和移动设备上。这本书的目......
  • 工作一年多,准备重新缕一下Java全流程(JDK8和JDK17,搭建环境)
    在重新学习的过程中哥们会吧一些理解不深的有疑问的记录在此系列中有好兄弟想一起学习,可以一起打卡记录一下一搭建环境今天下载了一下jdk17,因为工作中用8所以配置了一些兼容性的东西给大伙分析一些首先我们可以去官网下载jdk17,下载的话走默认路径就可以JavaDownloads|......
  • 【Java 并发编程】(四) ThreadLocal 源码解读
     介绍每个Thread对象,内部有一个ThreadLocalMapthreadLocals,这是一个哈希表,底层是一个Node[]table;当在某个线程中调用ThreadLocal的set方法时,会使用Thread.currentThread获取当前先线程的thread对象,然后将ThreadLocal对象作为key,将set方法的参数作为value......
  • java学习第八周
    临近开学,本周的任务完成情况不够好,平常乱七八糟的事情比较多,所以放在学习上的心思比较少。平均每天放在JAVA学习的时间约1个小时,放在编程的时间约半小时,解决问题的时间约1小时。下一个星期就要开学了,回看自己暑期的JAVA学习情况感觉比之前的暑期有很大的进步,在家中能拿出大量的时......
  • Java异常处理
    Java异常处理java:Compilationfailed:internaljavacompilererrorjava:Compilationfailed:internaljavacompilererror原因:idea的jdk版本和项目配置的不同。比对idea中三处关于jdk版本配置:setting-Build,Execution,Deployment-Compiler-JavaCompilerProj......
  • Java中的可达性分析算法图解,以及哪些对象可以作为GCRoots
    可达性分析算法图示:解释:因为在GCRoots中存在对于对象A的引用,而A又持有对对象B和对象C的引用,所以这一串都是有用的引用链,需要保留。对于对象D和对象E,他们只是相互进行引用,并没有和GCRoots中的对象有任何的关联,所以可以安全的回收。哪些对象可以作为GCRoots虚拟机栈(栈帧中的......
  • 一个专门用于Java服务端图片合成的工具,支持图片、文本、矩形等多种素材的合成,功能丰富
    前言在数字化营销的当下,企业对于图片处理的需求日益增长。然而,传统的图片处理方式往往需要复杂的操作和专业的技术,这不仅增加了工作量,也提高了时间成本。为了处理这一问题,一款能够简化图片合成流程的软件应运而生。介绍ImageCombiner是一款面向Java服务端的图片合成工具,以......
  • java.lang.IllegalArgumentException: Comparison method violates its general contr
    代码:publicstaticvoidwbsSort(List<SendMessageEntity>sendMessageEntityList){Collections.sort(sendMessageEntityList,(o1,o2)->{StringwbsCode1Temp=o1.getWbsCode();StringwbsCode2Temp=o2.getWbsCode();......