得益于jvm对系统api的封装,本文的方法实际是对jvm封装后的方法的再次封装。
在linux上,对于的api为inotify,在windows上,对于的api则为ReadDirectoryChangesW。
本文应用的jdk版本为8。
业务字段:
@Data
public class FileMessageDto {
private LocalDateTime createTime;
private String hostName;
private String channel;
private String directory;
private String fileName;
private String filePath;
}
主类:
@Slf4j
@Service
public class ListenerService {
@Value("#{${listenerservice.paths}}")
private Map<String, String> watchPaths;
@Value("#{${listenerservice.directory}}")
private Map<String, String> directoryMap;
private static final BlockingQueue<FileMessageDto> QUEUE = new LinkedBlockingQueue<>();
private static final int CONCURRENT_SUBMIT_THREADS = Runtime.getRuntime().availableProcessors() * 2;
private static final String REDIS_DUPLICATION_REMOVE_KEY = "FILENAME:DUPLICATION_REMOVE:";
@Value("${listenerservice.duplicationRemoveTime:15}")
private int redisDuplicationRemoveTime;
@Value("#{'${listenerservice.fastChannels:}'.split(',')}")
private Set<String> fastChannels;
@Autowired
private KafkaTemplate<String, KafkaMessage<FileMessageDto>> kafkaTemplate;
@Autowired
private RedisLockService redisLockService;
public int getQueueSize() {
return QUEUE.size();
}
@PostConstruct
public void init() {
Runnable kafkaPusher = () -> {
while (true) {
FileMessageDto dto;
try {
dto = QUEUE.take();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;//不可达
}
boolean obtained = true;
try {
obtained = redisLockService.obtainDistributedLock(
REDIS_DUPLICATION_REMOVE_KEY + dto.getChannel() + ':' + dto.getFileName(),
TimeUnit.MINUTES.toMillis(redisDuplicationRemoveTime), false);
} catch (Exception e) {
log.error("文件名判重失败", e);
}
if (!obtained) {
log.error("重复的文件名:{}-{}", dto.getChannel(), dto.getFileName());
continue;
}
KafkaMessage<FileMessageDto> msg = new KafkaMessage<>();
msg.setData(dto);
String topic;
if (fastChannels.contains(dto.getChannel())) {
topic = KafkaMsgTopics.FILE_NAME_FAST;
} else {
topic = KafkaMsgTopics.FILE_NAME_SLOW;
}
try {
kafkaTemplate.send(topic, msg);
log.info(topic + "发送" + msg);
} catch (Exception e) {
log.error("", e);
}
}
};
ExecutorService committerPool = getThreadsPool(CONCURRENT_SUBMIT_THREADS, "_COMMITTER_");
for (int i = 0; i < CONCURRENT_SUBMIT_THREADS; i++) {
committerPool.execute(kafkaPusher);
}
ExecutorService listenerPool = getThreadsPool(watchPaths.size(), "_LISTENER_");
for (Map.Entry<String, String> e : watchPaths.entrySet()) {
String directory = directoryMap.get(e.getKey());
if (directory == null) {
throw new IllegalArgumentException("未正确配置路径");
}
Task task = new Task(e.getKey(), directory, e.getValue());
listenerPool.execute(task.listener);
}
}
private ExecutorService getThreadsPool(int concurrentSubmitThreads, String threadNamePrefix) {
return new ThreadPoolExecutor(
concurrentSubmitThreads, concurrentSubmitThreads, 60L,
TimeUnit.SECONDS, new SynchronousQueue<>(),
new CustomizableThreadFactory(this.getClass().getSimpleName() + threadNamePrefix));
}
@RequiredArgsConstructor
@EqualsAndHashCode(of = {"channel"})
static class Task {
@NonNull
private final String channel;
@NonNull
private final String directory;
@NonNull
private final String pathStr;
private final Runnable listener;
@SuppressWarnings("all")
public Task(@NonNull String channel, @NonNull String directory, @NonNull String pathStr) {
this.channel = channel;
this.directory = directory;
this.pathStr = pathStr;
this.listener = () -> {
while (true) {
try {
watcherWrap();
} catch (Exception e) {
log.error(this.channel + "监控出错,重启监控", e);
}
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
throw new BusinessException(e.getMessage());
}
}
};
}
private void watcherWrap() {
try (WatchService watchService = FileSystems.getDefault().newWatchService()) {
Path path = Paths.get(this.pathStr);
path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE);
log.info("监控开始,渠道:{},路径:{}", channel, pathStr);
while (true) {
WatchKey key = watchService.take();
List<WatchEvent<?>> watchEvents = key.pollEvents();
for (WatchEvent<?> event : watchEvents) {
FileMessageDto dto = new FileMessageDto();
dto.setCreateTime(LocalDateTime.now());
dto.setHostName(HostNameUtil.getHostName());
dto.setChannel(this.channel);
dto.setDirectory(this.directory);
dto.setFileName(event.context().toString());
dto.setFilePath(path.toString());
QUEUE.put(dto);
}
boolean valid = key.reset();
if (!valid) {
throw new SystemException(String.format("监控中断,渠道:%s,路径:%s", channel, pathStr));
}
}
} catch (IOException e) {
log.error("监控失败,渠道:{},路径:{}", channel, pathStr);
log.error("", e);
} catch (InterruptedException e) {
log.error("监控中断,渠道:{},路径:{}", channel, pathStr);
log.error("", e);
Thread.currentThread().interrupt();
}
}
}
实际上,BlockingQueue<FileMessageDto> QUEUE = new LinkedBlockingQueue<>()这个队列应该可以不要或者设置一个比较小的容量,此处是为了能察觉kafka推送是否顺畅。
标签:dto,String,private,api,监控,new,Java,channel,log From: https://www.cnblogs.com/JackieJK/p/18238790