首页 > 系统相关 >Disruptor内存消息队列简单使用

Disruptor内存消息队列简单使用

时间:2023-05-29 10:33:52浏览次数:38  
标签:Disruptor seq 队列 logId logEvent 内存 date content public


Disruptor内存消息队列

最近在做一个有关使用内存消息队列到功能,比如将日志信息或点击统计信息持久化等操作,开始想着用java到内存队列作为缓冲区,后来在网上搜到Disruptor这个东西,神乎其神到,就简单了解了一下,做了一个demo,感觉还不错,可以用用,有关概念可以自行搜索,下面就简单介绍一下开发过程。

新建测试工程

新建maven工程listen-disruptor

工程目录结构如下

Disruptor内存消息队列简单使用_Disruptor

pom.xml文件内容

1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
2. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
3. <modelVersion>4.0.0</modelVersion>
4. <groupId>listen</groupId>
5. <artifactId>listen-disruptor</artifactId>
6. <packaging>war</packaging>
7. <version>0.0.1-SNAPSHOT</version>
8. <name>listen-disruptor Maven Webapp</name>
9. <url>http://maven.apache.org</url>
10. <dependencies>
11. <dependency>
12. <groupId>junit</groupId>
13. <artifactId>junit</artifactId>
14. <version>3.8.1</version>
15. <scope>test</scope>
16. </dependency>
17.  
18. <!-- https://mvnrepository.com/artifact/com.lmax/disruptor -->
19. <dependency>
20. <groupId>com.lmax</groupId>
21. <artifactId>disruptor</artifactId>
22. <version>3.3.6</version>
23. </dependency>
24. <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
25. <dependency>
26. <groupId>com.alibaba</groupId>
27. <artifactId>fastjson</artifactId>
28. <version>1.2.20</version>
29. </dependency>
30. </dependencies>
31. <build>
32. <finalName>listen-disruptor</finalName>
33. </build>
34. </project>

在Disruptor中,所有到生产和消费是基于事件驱动到,即事件对象。

编码

新建事件对象类LogEvent

1. package com.listen.disruptor;
2.  
3. import java.io.Serializable;
4. import java.util.Date;
5.  
6. import com.alibaba.fastjson.JSONObject;
7.  
8. /**
9. * 事件对象(日志事件)
10. * @author Administrator
11. *
12. */
13. public class LogEvent implements Serializable {
14.  
15. private long logId;
16. private String content;
17. private Date date;
18.  
19. public LogEvent(){
20.  
21. }
22.  
23. public LogEvent(long logId, String content, Date date){
24. this.logId = logId;
25. this.content = content;
26. this.date = date;
27. }
28.  
29. public long getLogId() {
30. return logId;
31. }
32.  
33. public void setLogId(long logId) {
34. this.logId = logId;
35. }
36.  
37. public String getContent() {
38. return content;
39. }
40.  
41. public void setContent(String content) {
42. this.content = content;
43. }
44.  
45. public Date getDate() {
46. return date;
47. }
48.  
49. public void setDate(Date date) {
50. this.date = date;
51. }
52.  
53. public String toString(){
54. return JSONObject.toJSONString(this);
55. }
56.  
57. }

事件工厂类,用来初始化预分配空到事件对象

1. package com.listen.disruptor;
2.  
3. import com.lmax.disruptor.EventFactory;
4.  
5. /**
6. * 事件生成工厂(用来初始化预分配事件对象)
7. * @author Administrator
8. *
9. */
10. public class LogEventFactory implements EventFactory<LogEvent> {
11.  
12. public LogEvent newInstance() {
13. return new LogEvent();
14. }
15.  
16. }

新建事件到消费者LogEventConsumer,这里演示只是做简单到打印显示

1. package com.listen.disruptor;
2.  
3. import com.lmax.disruptor.EventHandler;
4.  
5. /**
6. * 事件到消费者
7. * @author Administrator
8. *
9. */
10. public class LogEventConsumer implements EventHandler<LogEvent> {
11.  
12. public void onEvent(LogEvent logEvent, long seq, boolean bool) throws Exception {
13. System.out.println("seq:" + seq + ",bool:" + bool + ",logEvent:" + logEvent.toString());
14. }
15.  
16. }

新建事件的生产者LogEventProducer


1. package com.listen.disruptor;
2.  
3. import java.util.Date;
4.  
5. import com.lmax.disruptor.RingBuffer;
6.  
7. public class LogEventProducer {
8.  
9. private final RingBuffer<LogEvent> ringBuffer;
10.  
11. public LogEventProducer(RingBuffer<LogEvent> ringBuffer){
12. this.ringBuffer = ringBuffer;
13. }
14.  
15. public void onData(long logId, String content, Date date){
16. //ringBuffer类似一个队列,next就是下一个槽
17. long seq = ringBuffer.next();
18. try{
19. //用seq索引取出一个空到事件用于填充
20. LogEvent logEvent = ringBuffer.get(seq);
21. logEvent.setLogId(logId);
22. logEvent.setContent(content);
23. logEvent.setDate(date);
24.  
25. }
26. finally{
27. //最终发布事件,很重要
28. ringBuffer.publish(seq);
29. }
30. }
31.  
32. }

至此有关消费和生产到代码已经码完,但是目前推荐另一种生产者到写法,两种都可以使用

新建LogEventProducerWithTranslator生产者

1. package com.listen.disruptor;
2.  
3. import java.util.Date;
4.  
5. import com.lmax.disruptor.EventTranslatorVararg;
6. import com.lmax.disruptor.RingBuffer;
7.  
8. /**
9. * 使用translator方式到事件生产者发布事件
10. * @author Administrator
11. *
12. */
13. public class LogEventProducerWithTranslator {
14.  
15. private final static EventTranslatorVararg<LogEvent> translator = new EventTranslatorVararg<LogEvent>() {
16.  
17. public void translateTo(LogEvent logEvent, long seq, Object... objs) {
18. logEvent.setLogId((Long) objs[0]);
19. logEvent.setContent((String) objs[1]);
20. logEvent.setDate((Date) objs[2]);
21. }
22. };
23.  
24. private final RingBuffer<LogEvent> ringBuffer;
25. public LogEventProducerWithTranslator(RingBuffer<LogEvent> ringBuffer){
26. this.ringBuffer = ringBuffer;
27. }
28.  
29. public void onData(long logId, String content, Date date){
30. this.ringBuffer.publishEvent(translator, logId, content, date);
31. }
32. }

测试类LogEventMain

1. package com.listen.disruptor;
2.  
3. import java.util.Date;
4. import java.util.concurrent.Executor;
5. import java.util.concurrent.Executors;
6. import java.util.concurrent.ThreadFactory;
7.  
8. import com.lmax.disruptor.RingBuffer;
9. import com.lmax.disruptor.dsl.Disruptor;
10.  
11. public class LogEventMain {
12.  
13. public static void main(String[] args) {
14. // producer();
15. producerWithTranslator();
16. }
17.  
18. public static void producer(){
19. Executor executor = Executors.newCachedThreadPool();
20. LogEventFactory factory = new LogEventFactory();
21. int ringBufferSize = 1024;
22. Disruptor<LogEvent> disruptor = new Disruptor<LogEvent>(factory, ringBufferSize, executor);
23. disruptor.handleEventsWith(new LogEventConsumer());
24. disruptor.start();
25. RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();
26.  
27. LogEventProducer producer = new LogEventProducer(ringBuffer);
28. for(int i = 0; true; i++){
29. producer.onData(i, "c" + i, new Date());
30. }
31. }
32.  
33. public static void producerWithTranslator(){
34. LogEventFactory factory = new LogEventFactory();
35. int ringBufferSize = 1024;
36. ThreadFactory threadFactory = new ThreadFactory() {
37.  
38. public Thread newThread(Runnable r) {
39. return new Thread(r);
40. }
41. };
42. Disruptor<LogEvent> disruptor = new Disruptor<LogEvent>(factory, ringBufferSize, threadFactory);
43. disruptor.handleEventsWith(new LogEventConsumer());
44. disruptor.start();
45. RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();
46. LogEventProducerWithTranslator producer2 = new LogEventProducerWithTranslator(ringBuffer);
47. for(int i = 0; true; i++){
48. producer2.onData(i, "c" + i, new Date());
49. }
50. }
51.  
52. }

测试类分别写类两种生产者到测试,其中LogEventProducerWithTranslator生产者中到translator变量还有其他几种写法,适用于其他到场景,用法都是类似到,可以自行研究一下

测试

测试结果如下

1. seq:57804,bool:false,logEvent:{"content":"c57804","date":1477899755475,"logId":57804}
2. seq:57805,bool:false,logEvent:{"content":"c57805","date":1477899755475,"logId":57805}
3. seq:57806,bool:false,logEvent:{"content":"c57806","date":1477899755475,"logId":57806}
4. seq:57807,bool:false,logEvent:{"content":"c57807","date":1477899755475,"logId":57807}
5. seq:57808,bool:false,logEvent:{"content":"c57808","date":1477899755475,"logId":57808}
6. seq:57809,bool:false,logEvent:{"content":"c57809","date":1477899755475,"logId":57809}
7. seq:57810,bool:false,logEvent:{"content":"c57810","date":1477899755475,"logId":57810}
8. seq:57811,bool:false,logEvent:{"content":"c57811","date":1477899755475,"logId":57811}
9. seq:57812,bool:false,logEvent:{"content":"c57812","date":1477899755475,"logId":57812}
10. seq:57813,bool:false,logEvent:{"content":"c57813","date":1477899755475,"logId":57813}
11. seq:57814,bool:false,logEvent:{"content":"c57814","date":1477899755475,"logId":57814}
12. seq:57815,bool:false,logEvent:{"content":"c57815","date":1477899755475,"logId":57815}
13. seq:57816,bool:false,logEvent:{"content":"c57816","date":1477899755475,"logId":57816}
14. seq:57817,bool:false,logEvent:{"content":"c57817","date":1477899755475,"logId":57817}
15. seq:57818,bool:false,logEvent:{"content":"c57818","date":1477899755475,"logId":57818}
16. seq:57819,bool:false,logEvent:{"content":"c57819","date":1477899755475,"logId":57819}
17. seq:57820,bool:false,logEvent:{"content":"c57820","date":1477899755475,"logId":57820}
18. seq:57821,bool:false,logEvent:{"content":"c57821","date":1477899755475,"logId":57821}
19. seq:57822,bool:false,logEvent:{"content":"c57822","date":1477899755475,"logId":57822}
20. seq:57823,bool:false,logEvent:{"content":"c57823","date":1477899755475,"logId":57823}
21. seq:57824,bool:false,logEvent:{"content":"c57824","date":1477899755475,"logId":57824}
22. seq:57825,bool:false,logEvent:{"content":"c57825","date":1477899755475,"logId":57825}

性能测试我也没有研究,毕竟也没有专业到测试机器,只是在自己到电脑上跑一下,完全满足业务需要。


标签:Disruptor,seq,队列,logId,logEvent,内存,date,content,public
From: https://blog.51cto.com/chengzheng183/6368233

相关文章

  • 内存管理机制
    Python使用自动内存管理机制,具体来说是使用垃圾回收(GarbageCollection)来管理内存。Python中的垃圾回收器负责跟踪不再使用的对象,并在适当的时候释放它们所占用的内存。Python的内存管理机制主要基于引用计数(ReferenceCounting)和循环垃圾收集(CycleGarbageCollection)。引用......
  • 驱动开发:内核解析内存四级页表
    当今操作系统普遍采用64位架构,CPU最大寻址能力虽然达到了64位,但其实仅仅只是用到了48位进行寻址,其内存管理采用了9-9-9-9-12的分页模式,9-9-9-9-12分页表示物理地址拥有四级页表,微软将这四级依次命名为PXE、PPE、PDE、PTE这四项。关于内存管理和分页模式,不同的操作系统和体系结构......
  • ThreadLocal是否存在内存泄漏问题,如何防止内存泄漏
    ThreadLocal还是不能百分百地让程序员避免内存泄露,如果程序员不谨慎就很可能导致内存泄露?那么今天我们就来聊聊什么样的情况ThreadLocal不会出现内存泄露?什么样的情况会出现内存泄露?我们如何防止内存泄露的情况发生呢?我们这节就会为同学们一一详细解答,那我们先来简单回忆一下Thr......
  • 深入理解 Java 虚拟机 —— Java 内存模型与线程
    处理器的效率和一致性(与java内存访问可类比)计算机同时去做几件事情,不仅是因为计算机的运算能力强大了,还有一个很重要的原因是计算机的运算速度与它的存储和通信子系统的速度差距太大,大量的时间都花费在磁盘I/O、网络通信或者数据库访问上。如果不希望处理器在大部分时间里都处......
  • 数据结构之队列
    @TOC前言本文章讲述的是数据结构的特殊线性表——队列一.什么是队列,队列的特点队列是数据结构中的一种特殊的线性表,它与栈不同,队列的基本图例如上图:显然,队列的特点就是:先进先出FirstInFirstOut那么我们使用什么样的方式来实现队列呢?基于队列的特点,使用链表而不是数组来实现队......
  • 代码随想录Day11|栈和队列
    20.有效的括号经典的利用栈的题目这里选择用java来写,注意我们的java中的泛型不能用基本数据类型,而是应该使用包装类注意!java一定是定义后需要声明,然后才能使用1047.删除字符串中的所有相邻重复项 略比较简单150.逆波兰表达式求值注意:leetcode内置jdk的问题,不能使......
  • 优先级队列的实现详解( Java 实现)
    前言优先级队列是在队列的基础上,每个元素都带有一个优先级,可以实现按照优先级高低进行存储和访问。Java提供了许多实现优先级队列的方法,例如使用堆来实现。在本篇博客中,我将介绍Java实现优先级队列实现的具体方法,以及如何使用它来解决实际问题。一、优先级队列的概念优先级队列......
  • Linux为什么要有大页内存
    Linux为什么要有大页内存?为什么DPDK要求必须要设置大页内存?这都是由系统架构决定的,系统架构发展到现在,又是在原来的基础上一点点演变的。一开始为了解决一个问题,大家设计了一个很好的方案,随着事物的发展,发现无法满足需求,就在原来的基础上改进,慢慢的变成了现在的样子。不过技术革新......
  • 记一次redis数据库RDB内存事故排查处理
    事故表现:redis状态正常,但客户端不能使用,定位日志结论,redis内存申请不通过,导致中断用户操作解决办法1.解锁相关配置(不能解决根本问题,根本原因来源于开发使用姿势不对)两种解决办法一.打开系统层始终同意分配内存(不建议)编辑文件/etc/sysctl.conf添加vm.overcommit_memory=1内核参......
  • java面试(9)内存泄露
    1:Java中也存在栈内存泄露的情况?  在Java中,栈内存主要用于存储方法调用和本地变量。与堆内存不同,栈内存的分配和释放是由编译器和虚拟机自动处理的,通常不需要手动释放。  然而,如果在编写代码时出现一些问题,可能会导致栈内存泄露。以下是一些可能引起栈内存泄露的常见情......