首页 > 编程语言 >Java基于系统api监控文件新增事件

Java基于系统api监控文件新增事件

时间:2024-06-08 17:34:21浏览次数:27  
标签:dto String private api 监控 new Java channel log

得益于jvm对系统api的封装,本文的方法实际是对jvm封装后的方法的再次封装。

在linux上,对于的api为inotify,在windows上,对于的api则为ReadDirectoryChangesW。

本文应用的jdk版本为8。

业务字段:

@Data
public class FileMessageDto {

  private LocalDateTime createTime;

  private String hostName;

  private String channel;

  private String directory;

  private String fileName;

  private String filePath;

}

主类:

@Slf4j
@Service
public class ListenerService {

  @Value("#{${listenerservice.paths}}")
  private Map<String, String> watchPaths;

  @Value("#{${listenerservice.directory}}")
  private Map<String, String> directoryMap;

  private static final BlockingQueue<FileMessageDto> QUEUE = new LinkedBlockingQueue<>();

  private static final int CONCURRENT_SUBMIT_THREADS = Runtime.getRuntime().availableProcessors() * 2;

  private static final String REDIS_DUPLICATION_REMOVE_KEY = "FILENAME:DUPLICATION_REMOVE:";

  @Value("${listenerservice.duplicationRemoveTime:15}")
  private int redisDuplicationRemoveTime;

  @Value("#{'${listenerservice.fastChannels:}'.split(',')}")
  private Set<String> fastChannels;

  @Autowired
  private KafkaTemplate<String, KafkaMessage<FileMessageDto>> kafkaTemplate;

  @Autowired
  private RedisLockService redisLockService;

  public int getQueueSize() {
    return QUEUE.size();
  }


  @PostConstruct
  public void init() {
    Runnable kafkaPusher = () -> {
      while (true) {
        FileMessageDto dto;
        try {
          dto = QUEUE.take();
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
          return;//不可达
        }
        boolean obtained = true;
        try {
          obtained = redisLockService.obtainDistributedLock(
              REDIS_DUPLICATION_REMOVE_KEY + dto.getChannel() + ':' + dto.getFileName(),
              TimeUnit.MINUTES.toMillis(redisDuplicationRemoveTime), false);
        } catch (Exception e) {
          log.error("文件名判重失败", e);
        }
        if (!obtained) {
          log.error("重复的文件名:{}-{}", dto.getChannel(), dto.getFileName());
          continue;
        }
        KafkaMessage<FileMessageDto> msg = new KafkaMessage<>();
        msg.setData(dto);
        String topic;
        if (fastChannels.contains(dto.getChannel())) {
          topic = KafkaMsgTopics.FILE_NAME_FAST;
        } else {
          topic = KafkaMsgTopics.FILE_NAME_SLOW;
        }
        try {
          kafkaTemplate.send(topic, msg);
          log.info(topic + "发送" + msg);
        } catch (Exception e) {
          log.error("", e);
        }
      }
    };
    ExecutorService committerPool = getThreadsPool(CONCURRENT_SUBMIT_THREADS, "_COMMITTER_");
    for (int i = 0; i < CONCURRENT_SUBMIT_THREADS; i++) {
      committerPool.execute(kafkaPusher);
    }
    ExecutorService listenerPool = getThreadsPool(watchPaths.size(), "_LISTENER_");
    for (Map.Entry<String, String> e : watchPaths.entrySet()) {
      String directory = directoryMap.get(e.getKey());
      if (directory == null) {
        throw new IllegalArgumentException("未正确配置路径");
      }
      Task task = new Task(e.getKey(), directory, e.getValue());
      listenerPool.execute(task.listener);
    }
  }

  private ExecutorService getThreadsPool(int concurrentSubmitThreads, String threadNamePrefix) {
    return new ThreadPoolExecutor(
        concurrentSubmitThreads, concurrentSubmitThreads, 60L,
        TimeUnit.SECONDS, new SynchronousQueue<>(),
        new CustomizableThreadFactory(this.getClass().getSimpleName() + threadNamePrefix));
  }


  @RequiredArgsConstructor
  @EqualsAndHashCode(of = {"channel"})
  static class Task {
    @NonNull
    private final String channel;
    @NonNull
    private final String directory;
    @NonNull
    private final String pathStr;
    private final Runnable listener;

    @SuppressWarnings("all")
    public Task(@NonNull String channel, @NonNull String directory, @NonNull String pathStr) {
      this.channel = channel;
      this.directory = directory;
      this.pathStr = pathStr;
      this.listener = () -> {
        while (true) {
          try {
            watcherWrap();
          } catch (Exception e) {
            log.error(this.channel + "监控出错,重启监控", e);
          }
          try {
            Thread.sleep(10000);
          } catch (InterruptedException e) {
            throw new BusinessException(e.getMessage());
          }
        }

      };
    }

    private void watcherWrap() {
      try (WatchService watchService = FileSystems.getDefault().newWatchService()) {
        Path path = Paths.get(this.pathStr);
        path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE);
        log.info("监控开始,渠道:{},路径:{}", channel, pathStr);

        while (true) {
          WatchKey key = watchService.take();
          List<WatchEvent<?>> watchEvents = key.pollEvents();
          for (WatchEvent<?> event : watchEvents) {
            FileMessageDto dto = new FileMessageDto();
            dto.setCreateTime(LocalDateTime.now());
            dto.setHostName(HostNameUtil.getHostName());
            dto.setChannel(this.channel);
            dto.setDirectory(this.directory);
            dto.setFileName(event.context().toString());
            dto.setFilePath(path.toString());

            QUEUE.put(dto);
          }
          boolean valid = key.reset();
          if (!valid) {
            throw new SystemException(String.format("监控中断,渠道:%s,路径:%s", channel, pathStr));
          }
        }

      } catch (IOException e) {
        log.error("监控失败,渠道:{},路径:{}", channel, pathStr);
        log.error("", e);
      } catch (InterruptedException e) {
        log.error("监控中断,渠道:{},路径:{}", channel, pathStr);
        log.error("", e);
        Thread.currentThread().interrupt();
      }
    }


  }

实际上,BlockingQueue<FileMessageDto> QUEUE = new LinkedBlockingQueue<>()这个队列应该可以不要或者设置一个比较小的容量,此处是为了能察觉kafka推送是否顺畅。

标签:dto,String,private,api,监控,new,Java,channel,log
From: https://www.cnblogs.com/JackieJK/p/18238790

相关文章

  • FastAPI-5:Pydantic、类型提示和模型预览
    5Pydantic、类型提示和模型FastAPI主要基于Pydantic。它使用模型(Python对象类)来定义数据结构。这些模型在FastAPI应用程序中被大量使用,是编写大型应用程序时的真正优势。5.1类型提示在许多计算机语言中,变量直接指向内存中的值。这就要求程序员声明它的类型,以便确定值的大小......
  • 还在为线上BUG苦苦找寻?试试IntelliJ IDEA远程调试线上Java程序
    ......
  • Docker中部署nacos报Caused by: java.lang.IllegalStateException: No DataSource set
    在进入nacos日志内部发现再次重启同样如此;其实从上面你也就很容易看出问题所在,没有数据;经过排查发现我在docker部署时之前部署的mysql容器并没有启动,需要启动mysql容器z输入两个命令分别启动和查看dockerstartmysqldps经过之后再次重启nacos进入日志后成功:​​​​​......
  • Java的BigDecimal与数据库的Decimal的应用
    数据库方面Decimal的语法DECIMAL(M,D)M:总位数,包括小数点前和小数点后的数字。D:小数点后的位数。Decimal的实际应用场景货币计算、精度计算Java代码方面add(BigDecimalaugend):加法运算//本月总计==本月拓客+本月服务+行动长计划//BigDecimalthisMo......
  • 另一个Java基于阻塞的定时消费内存队列(依赖guava)
    本文的代码是对一个Java基于阻塞的定时消费内存队列-Jackie_JK-博客园(cnblogs.com)方法的改进,完善了包装以及部分细节,非jdk21可能需要更换线程池类型。消费类型:@Getter@AllArgsConstructorpublicenumPushType{ELASTIC,SQL,;}队列参数枚举:@Getter@AllAr......
  • java小记
    今天学了一些计算机存储数据的原理:文本,图片,声音等存储编码:gb2312,Unicode,gbk等等。数据类型相关遇到了一些问题:解决办法大概是:但是对于我来说是无效的,最后还是照着下面来:成功......
  • 【Selenium+java环境配置】(超详细教程常见问题解决)
    Selenium+java环境配置windows电脑环境搭建-chrome浏览器1.下载chrome浏览器2.查看chrome浏览器版本3.下载chrome浏览器驱动4.配置系统环境变量PATH验证环境是否搭建成功1.创建java项目,添加pom文件中添加依赖2.编写代码运行常见问题&解决办法1.访问失败Theversio......
  • vits-simple-api搭建
    根据vits-simple-api中文文档指南自行搭建后端以下步骤均在windows平台cpu推理搭建为例选择你的vits模型(注意是vits!不是So-VitsBertVits2GptVits)建议去抱脸网搜索或者b站搜素以及自己训练.在vits-simple-api的路径的model目录下新建你下载模型的名字的文件夹将......
  • Java---异常【详细】
     1.异常的简介1.编译错误:基本语法错误,编译器进行语法检查,如果没有通过,程序违背了计算机编程语言的语法。2.运行错误:程序可以执行,在执行过程中发生异常,导致程序提前退出,没有得到预计的执行效果。3.逻辑错误:程序可能执行,结果不对。4.异常体系,保证程序的健壮性。2.java异......
  • 宝塔搭建javaweb_宝塔工具+javaweb+mysql+tomcat部署项目
    1.首先我们得有一个安装了宝塔工具的云服务器首先我们要在服务器安全组开放宝塔常用的端口,看你是什么服务器,然后对应下面官网的教程就行了。腾讯云:https://www.bt.cn/bbs/thread-1229-1-1.html阿里云:https://www.bt.cn/bbs/thread-2897-1-1.html华为云:https://www.bt.cn/bbs/t......