首页 > 其他分享 >File Support—文件支持

File Support—文件支持

时间:2022-12-08 13:32:54浏览次数:35  
标签:文件 File Support file 使用 directory true 属性

File Support—文件支持_筛选器

Spring 集成的文件支持扩展了 Spring 集成核心,具有专用词汇来处理读取、写入和转换文件。

您需要将此依赖项包含在项目中:

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-file</artifactId>
<version>6.0.0</version>
</dependency>

它提供了一个命名空间,该命名空间支持定义专用于文件的通道适配器的元素,并支持可以将文件内容读入字符串或字节数组的转换器。

本节解释 bean 的工作原理以及如何将它们配置为 bean。 它还讨论了对通过特定于文件的实现来处理文件的支持。 最后,它解释了特定于文件的命名空间。​​FileReadingMessageSource​​​​FileWritingMessageHandler​​​​Transformer​

读取文件

A 可用于使用文件系统中的文件。 这是从文件系统目录创建消息的实现。 以下示例演示如何配置:​​FileReadingMessageSource​​​​MessageSource​​​​FileReadingMessageSource​

<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:directory="${input.directory}"/>

要防止为某些文件创建消息,您可以提供 . 默认情况下,我们使用以下过滤器:​​FileListFilter​

  • ​IgnoreHiddenFileListFilter​
  • ​AcceptOnceFileListFilter​

确保不处理隐藏文件。 请注意,隐藏的确切定义取决于系统。 例如,在基于 UNIX 的系统上,以句点字符开头的文件被视为隐藏。 另一方面,Microsoft Windows具有专用的文件属性来指示隐藏文件。​​IgnoreHiddenFileListFilter​


版本 4.2 引入了 . 在以前的版本中,包含隐藏文件。 对于默认配置,首先触发 ,然后触发 .​​IgnoreHiddenFileListFilter​​​​IgnoreHiddenFileListFilter​​​​AcceptOnceFileListFilter​


确保仅从目录中选取一次文件。​​AcceptOnceFileListFilter​


将其状态存储在内存中。 如果希望状态在系统重新启动后继续存在,可以使用 . 此筛选器将接受的文件名存储在实现中(请参阅元数据存储​)。 此过滤器与文件名和修改时间匹配。​​AcceptOnceFileListFilter​​​​FileSystemPersistentAcceptOnceFileListFilter​​​​MetadataStore​



从版本 4.0 开始,此筛选器需要 . 与共享数据存储一起使用(例如与 一起使用)时,它允许在多个应用程序实例之间或跨多个服务器使用的网络文件共享共享筛选器键。​​ConcurrentMetadataStore​​​​Redis​​​​RedisMetadataStore​



从版本 4.1.5 开始,此筛选器具有一个新属性 (),这会导致它在每次更新时刷新元数据存储(如果存储实现)。​​flushOnUpdate​​​​Flushable​


持久文件列表筛选器现在具有布尔属性。 将此属性设置为 也会设置 ,这意味着出站网关 ( 和 ) 上的递归操作现在每次将始终遍历整个目录树。 这是为了解决未检测到目录树深处的更改的问题。 此外,导致将文件的完整路径用作元数据存储键;这解决了如果具有相同名称的文件在不同目录中多次出现,过滤器无法正常工作的问题。 重要说明:这意味着对于顶级目录下的文件,将找不到持久元数据存储中的现有密钥。 因此,默认情况下,该属性是;这可能会在将来的版本中更改。​​forRecursion​​​​true​​​​alwaysAcceptDirectories​​​​ls​​​​mget​​​​forRecursion=true​​​​false​

以下示例使用筛选器配置 :​​FileReadingMessageSource​

<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:inputDirectory="${input.directory}"
p:filter-ref="customFilterBean"/>

读取文件的一个常见问题是,文件可能在准备就绪之前被检测到(即,其他某个进程可能仍在写入文件)。 默认值不会阻止此操作。 在大多数情况下,如果文件写入过程在准备好读取后立即重命名每个文件,则可以防止这种情况。 仅接受准备就绪的文件(可能基于已知后缀)的 or 过滤器,由默认值组成,允许这种情况。 启用组合,如以下示例所示:​​AcceptOnceFileListFilter​​​​filename-pattern​​​​filename-regex​​​​AcceptOnceFileListFilter​​​​CompositeFileListFilter​

<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:inputDirectory="${input.directory}"
p:filter-ref="compositeFilter"/>

<bean id="compositeFilter"
class="org.springframework.integration.file.filters.CompositeFileListFilter">
<constructor-arg>
<list>
<bean class="o.s.i.file.filters.AcceptOnceFileListFilter"/>
<bean class="o.s.i.file.filters.RegexPatternFileListFilter">
<constructor-arg value="^test.*$"/>
</bean>
</list>
</constructor-arg>
</bean>

如果无法使用临时名称创建文件并重命名为最终名称,Spring 集成提供了另一种选择。 版本 4.2 添加了 . 可以使用属性配置此筛选器,以便筛选器仅传递早于此值的文件。 年龄默认为 60 秒,但您应该选择一个足够大的年龄,以避免过早获取文件(例如,由于网络故障)。 以下示例演示如何配置:​​LastModifiedFileListFilter​​​​age​​​​LastModifiedFileListFilter​

<bean id="filter" class="org.springframework.integration.file.filters.LastModifiedFileListFilter">
<property name="age" value="120" />
</bean>

从版本 4.3.7 开始,引入了 (扩展 ),以允许后续筛选器仅应看到上一个筛选器的结果的情况。 (使用 ,所有筛选器都会看到所有文件,但它仅传递已通过所有筛选器的文件)。 需要新行为的一个示例是 和 的组合,当我们不希望在经过一段时间之前接受文件时。 使用 ,因为 在第一次传递时会看到所有文件,因此当另一个筛选器传递时,它不会传递它。 当模式过滤器与查找辅助文件以指示文件传输已完成的自定义过滤器结合使用时,此方法非常有用。 模式过滤器可能只传递主文件(例如),但“done”过滤器需要查看是否存在(例如)。​​ChainFileListFilter​​​​CompositeFileListFilter​​​​CompositeFileListFilter​​​​LastModifiedFileListFilter​​​​AcceptOnceFileListFilter​​​​CompositeFileListFilter​​​​AcceptOnceFileListFilter​​​​CompositeFileListFilter​​​​something.txt​​​​something.done​

假设我们有文件 、 和 。​​a.txt​​​​a.done​​​​b.txt​

模式过滤器仅通过 和 ,而 “done” 过滤器看到所有三个文件并仅传递 。 复合过滤器的最终结果是仅释放。​​a.txt​​​​b.txt​​​​a.txt​​​​a.txt​

使用 ,如果链中的任何筛选器返回空列表,则不会调用其余筛选器。​​ChainFileListFilter​

版本 5.0 引入了针对文件执行 SpEL 表达式作为上下文评估根对象。 为此,提供了用于文件处理(本地和远程)的所有 XML 组件以及现有属性,如以下示例所示:​​ExpressionFileListFilter​​​​filter​​​​filter-expression​

<int-file:inbound-channel-adapter
directory="${inputdir}"
filter-expression="name matches '.text'"
auto-startup="false"/>

版本 5.0.5 引入了对被拒绝的文件感兴趣的实现。 为此,应通过 提供此类筛选器实现的回调。 在框架中,此功能从 与 结合使用。 与常规不同,它根据目标文件系统上的事件提供文件进行处理。 在用这些文件轮询内部队列的那一刻,可能会丢弃它们,因为它们相对于其配置的 . 因此,我们会丢失文件以备将来考虑。 discard 回调钩子允许我们将文件保留在内部队列中,以便可以在后续轮询中根据 进行检查。 它还实现 a 并填充对其所有委托的放弃回调。​​DiscardAwareFileListFilter​​​​addDiscardCallback(Consumer<File>)​​​​FileReadingMessageSource.WatchServiceDirectoryScanner​​​​LastModifiedFileListFilter​​​​DirectoryScanner​​​​WatchService​​​​LastModifiedFileListFilter​​​​age​​​​age​​​​CompositeFileListFilter​​​​DiscardAwareFileListFilter​​​​DiscardAwareFileListFilter​

由于将文件与所有委托匹配,因此可以对同一文件多次调用 。​​CompositeFileListFilter​​​​discardCallback​

从版本 5.1 开始,不会检查目录是否存在,并且在调用目录之前不会创建它(通常通过包装)。 以前,没有简单的方法来防止在引用目录(例如从测试中引用)或稍后应用权限时出现操作系统权限错误。​​FileReadingMessageSource​​​​start()​​​​SourcePollingChannelAdapter​

邮件头

从版本 5.0 开始,(除了轮询之外)将以下标头填充到出站:​​FileReadingMessageSource​​​​payload​​​​File​​​​Message​

  • ​FileHeaders.FILENAME​​:要发送的文件。 可用于后续重命名或复制逻辑。File.getName()
  • ​FileHeaders.ORIGINAL_FILE​​:对象本身。 通常,当我们丢失原始对象时,此标头由框架组件(例如拆分器或转换器)自动填充。 但是,为了与任何其他自定义用例保持一致和方便,此标头可用于访问原始文件。FileFile
  • ​FileHeaders.RELATIVE_PATH​​:引入的新标头,用于表示相对于扫描根目录的文件路径部分。 当要求在其他位置还原源目录层次结构时,此标头可能很有用。 为此,可以将(请参阅“'生成文件名)配置为使用此标头。DefaultFileNameGenerator

目录扫描和轮询

不会立即为目录中的文件生成消息。 它使用内部队列来存储 返回的“合格文件”。 该选项用于确保在每次轮询中使用最新的输入目录内容刷新内部队列。 默认情况下 (),在再次扫描目录之前清空其队列。 此默认行为对于减少对目录中大量文件的扫描特别有用。 但是,在需要自定义排序的情况下,请务必考虑将此标志设置为 . 处理文件的顺序可能与预期不符。 默认情况下,队列中的文件按其自然 () 顺序处理。 扫描添加的新文件,即使队列已有文件,也会插入到适当的位置以保持该自然顺序。 若要自定义顺序,可以接受 a 作为构造函数参数。 内部 () 使用它根据业务需求对其内容进行重新排序。 因此,要按特定顺序处理文件,应提供比较器,而不是对自定义生成的列表进行排序。​​FileReadingMessageSource​​​​scanner​​​​scanEachPoll​​​​scanEachPoll = false​​​​FileReadingMessageSource​​​​true​​​​path​​​​FileReadingMessageSource​​​​Comparator<File>​​​​PriorityBlockingQueue​​​​FileReadingMessageSource​​​​DirectoryScanner​

引入了版本 5.0 以执行文件树访问。 实现基于功能。 根目录 () 参数从结果中排除。 所有其他子目录包含和排除都基于目标实现。 例如,默认情况下会筛选出目录。 有关更多信息,请参阅 AbstractDirectoryAwareFileListFilter 及其实现。​​RecursiveDirectoryScanner​​​​Files.walk(Path start, int maxDepth, FileVisitOption… options)​​​​DirectoryScanner.listFiles(File)​​​​FileListFilter​​​​SimplePatternFileListFilter​

从版本 5.5 开始,Java DSL 有一个方便的选项,可以在目标中使用 a 而不是默认选项。​​FileInboundChannelAdapterSpec​​​​recursive(boolean)​​​​RecursiveDirectoryScanner​​​​FileReadingMessageSource​

命名空间支持

通过使用特定于文件的命名空间,可以简化文件读取的配置。 为此,请使用以下模板:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xsi:schemaLocation="http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file
https://www.springframework.org/schema/integration/file/spring-integration-file.xsd">
</beans>

在此命名空间中,您可以减少 并将其包装在入站通道适配器中,如下所示:​​FileReadingMessageSource​

<int-file:inbound-channel-adapter id="filesIn1"
directory="file:${input.directory}" prevent-duplicates="true" ignore-hidden="true"/>

<int-file:inbound-channel-adapter id="filesIn2"
directory="file:${input.directory}"
filter="customFilterBean" />

<int-file:inbound-channel-adapter id="filesIn3"
directory="file:${input.directory}"
filename-pattern="test*" />

<int-file:inbound-channel-adapter id="filesIn4"
directory="file:${input.directory}"
filename-regex="test[0-9]+\.txt" />

第一个通道适配器示例依赖于默认实现:​​FileListFilter​

  • ​IgnoreHiddenFileListFilter​​(不处理隐藏文件)
  • ​AcceptOnceFileListFilter​​(防止重复)

因此,您也可以省略 and 属性,因为它们是默认属性。​​prevent-duplicates​​​​ignore-hidden​​​​true​


Spring Integration 4.2 引入了该属性。 在以前的版本中,包含隐藏文件。​​ignore-hidden​


第二个通道适配器示例使用自定义筛选器,第三个使用属性添加基于筛选器,第四个使用该属性将基于正则表达式模式的筛选器添加到 。 和属性都与常规引用属性互斥。 但是,您可以使用该属性来引用组合任意数量的过滤器(包括一个或多个基于模式的过滤器)以满足特定需求的实例。​​filename-pattern​​​​AntPathMatcher​​​​filename-regex​​​​FileReadingMessageSource​​​​filename-pattern​​​​filename-regex​​​​filter​​​​filter​​​​CompositeFileListFilter​

当多个进程从同一目录读取时,您可能需要锁定文件以防止同时选取它们。 为此,可以使用 . 有一个基于 -的实现可用,但也可以实现自己的锁定方案。 储物柜可以按如下方式注入:​​FileLocker​​​​java.nio​​​​nio​

<int-file:inbound-channel-adapter id="filesIn"
directory="file:${input.directory}" prevent-duplicates="true">
<int-file:nio-locker/>
</int-file:inbound-channel-adapter>

您可以按如下方式配置自定义储物柜:

<int-file:inbound-channel-adapter id="filesIn"
directory="file:${input.directory}" prevent-duplicates="true">
<int-file:locker ref="customLocker"/>
</int-file:inbound-channel-adapter>

当文件入站适配器配置了锁时,它负责在允许接收文件之前获取锁。 它不承担解锁文件的责任。 如果您已处理文件并保持锁悬而未决,则会出现内存泄漏。 如果这是一个问题,您应该在适当的时候打电话给自己。​​FileLocker.unlock(File file)​

当筛选和锁定文件不够时,您可能需要完全控制文件的列出方式。 要实现此类要求,可以使用 的实现。 此扫描程序可让您准确确定每个轮询中列出的文件。 这也是 Spring Integration 在内部用来连接实例和 . 您可以将自定义注入到 on 属性中,如以下示例所示:​​DirectoryScanner​​​​FileListFilter​​​​FileLocker​​​​FileReadingMessageSource​​​​DirectoryScanner​​​​<int-file:inbound-channel-adapter/>​​​​scanner​

<int-file:inbound-channel-adapter id="filesIn" directory="file:${input.directory}"
scanner="customDirectoryScanner"/>

这样做可以让您完全自由地选择排序、列出和锁定策略。

同样重要的是要了解过滤器(包括、、和其他)和实例实际上由 使用。 适配器上设置的这些属性中的任何一个随后都会注入到内部 . 对于外部 ,禁止在 上使用所有过滤器和锁定器属性。 必须在该自定义 上指定(如果需要)它们。 换句话说,如果你在 中注入 a,你应该提供 和 ,而不是在 .​​patterns​​​​regex​​​​prevent-duplicates​​​​locker​​​​scanner​​​​scanner​​​​scanner​​​​FileReadingMessageSource​​​​DirectoryScanner​​​​scanner​​​​FileReadingMessageSource​​​​filter​​​​locker​​​​scanner​​​​FileReadingMessageSource​

默认情况下,使用 和 . 要防止使用它们,您可以配置自己的过滤器(例如 ),甚至将其设置为 .​​DefaultDirectoryScanner​​​​IgnoreHiddenFileListFilter​​​​AcceptOnceFileListFilter​​​​AcceptAllFileListFilter​​​​null​

​WatchServiceDirectoryScanner​

依赖于将新文件添加到目录时的文件系统事件。 在初始化期间,将注册目录以生成事件。 初始文件列表也是在初始化期间构建的。 在遍历目录树时,还会注册遇到的任何子目录以生成事件。 在第一次轮询时,返回遍历目录的初始文件列表。 在后续轮询中,将返回来自新创建事件的文件。 如果添加了新的子目录,则其创建事件用于遍历新子树以查找现有文件并注册找到的任何新子目录。​​FileReadingMessageSource.WatchServiceDirectoryScanner​

当其内部事件未在目录修改事件发生时被程序清空时,会出现问题。 如果超出队列大小,则会发出 a 以指示某些文件系统事件可能会丢失。 在这种情况下,将完全重新扫描根目录。 为避免重复,请考虑在处理完成后使用适当的(如 )或删除文件。​​WatchKey​​​​queue​​​​StandardWatchEventKinds.OVERFLOW​​​​FileListFilter​​​​AcceptOnceFileListFilter​

可以通过选项启用,该选项与选项互斥。 将为提供的 填充内部实例。​​WatchServiceDirectoryScanner​​​​FileReadingMessageSource.use-watch-service​​​​scanner​​​​FileReadingMessageSource.WatchServiceDirectoryScanner​​​​directory​

此外,现在轮询逻辑可以跟踪和.​​WatchService​​​​StandardWatchEventKinds.ENTRY_MODIFY​​​​StandardWatchEventKinds.ENTRY_DELETE​

如果需要跟踪现有文件和新文件的修改,则应在 中实现事件逻辑。 否则,这些事件中的文件将以相同的方式处理。​​ENTRY_MODIFY​​​​FileListFilter​

实现选取事件。 因此,为操作提供了他们的文件。 启用此事件后,筛选器(如 已删除文件)。 因此,如果出现同名文件,它将通过过滤器并作为消息发送。​​ResettableFileListFilter​​​​ENTRY_DELETE​​​​remove()​​​​AcceptOnceFileListFilter​

为此,引入了属性 ()。 ( 是 中的公共内部枚举。 有了这样的选项,我们可以对新文件使用一个下游流逻辑,对修改的文件使用一些其他逻辑。 以下示例演示如何为同一目录中的创建和修改事件配置不同的逻辑:​​watch-events​​​​FileReadingMessageSource.setWatchEvents(WatchEventType… watchEvents)​​​​WatchEventType​​​​FileReadingMessageSource​

<int-file:inbound-channel-adapter id="newFiles"
directory="${input.directory}"
use-watch-service="true"/>

<int-file:inbound-channel-adapter id="modifiedFiles"
directory="${input.directory}"
use-watch-service="true"
filter="acceptAllFilter"
watch-events="MODIFY"/> <!-- The default is CREATE. -->

限制内存消耗

您可以使用 来限制内存中保留的文件数。 这在扫描大型目录时很有用。 对于 XML 配置,可以通过设置入站通道适配器上的属性来启用此功能。​​HeadDirectoryScanner​​​​queue-size​

在版本 4.2 之前,此设置与任何其他筛选器的使用不兼容。 任何其他筛选器(包括 )都会覆盖用于限制大小的筛选器。​​prevent-duplicates="true"​


使用 与 不兼容。 由于在轮询决策期间会参考所有筛选器,因此不知道其他筛选器可能正在临时筛选文件。 即使以前由 过滤的文件现在可用,也会对其进行筛选。​​HeadDirectoryScanner​​​​AcceptOnceFileListFilter​​​​AcceptOnceFileListFilter​​​​HeadDirectoryScanner.HeadFilter​​​​AcceptOnceFileListFilter​



通常,在这种情况下,您应该删除已处理的文件,以便以前筛选的文件在将来的轮询中可用,而不是在这种情况下使用 。​​AcceptOnceFileListFilter​


使用 Java 配置进行配置

以下 Spring 引导应用程序显示了如何使用 Java 配置配置出站适配器的示例:

@SpringBootApplication
public class FileReadingJavaApplication {

public static void main(String[] args) {
new SpringApplicationBuilder(FileReadingJavaApplication.class)
.web(false)
.run(args);
}

@Bean
public MessageChannel fileInputChannel() {
return new DirectChannel();
}

@Bean
@InboundChannelAdapter(value = "fileInputChannel", poller = @Poller(fixedDelay = "1000"))
public MessageSource<File> fileReadingMessageSource() {
FileReadingMessageSource source = new FileReadingMessageSource();
source.setDirectory(new File(INBOUND_PATH));
source.setFilter(new SimplePatternFileListFilter("*.txt"));
return source;
}

@Bean
@Transformer(inputChannel = "fileInputChannel", outputChannel = "processFileChannel")
public FileToStringTransformer fileToStringTransformer() {
return new FileToStringTransformer();
}

}

使用 Java DSL 进行配置

以下 Spring 引导应用程序显示了如何使用 Java DSL 配置出站适配器的示例:

@SpringBootApplication
public class FileReadingJavaApplication {

public static void main(String[] args) {
new SpringApplicationBuilder(FileReadingJavaApplication.class)
.web(false)
.run(args);
}

@Bean
public IntegrationFlow fileReadingFlow() {
return IntegrationFlow
.from(Files.inboundAdapter(new File(INBOUND_PATH))
.patternFilter("*.txt"),
e -> e.poller(Pollers.fixedDelay(1000)))
.transform(Files.toStringTransformer())
.channel("processFileChannel")
.get();
}

}

'尾部文件'

另一个流行的用例是从文件的末尾(或尾部)获取“行”,在添加新行时捕获它们。 提供了两种实现。 第一个 使用 native 命令(在具有本机命令的操作系统上)。 这通常是这些平台上最有效的实现。 对于没有命令的操作系统,第二个实现使用 Apache 类。​​OSDelegatingFileTailingMessageProducer​​​​tail​​​​tail​​​​ApacheCommonsFileTailingMessageProducer​​​​commons-io​​​​Tailer​

在这两种情况下,文件系统事件(例如文件不可用和其他事件)都使用正常的 Spring 事件发布机制作为实例发布。 此类事件的示例包括:​​ApplicationEvent​

[message=tail: cannot open '/tmp/somefile' for reading:
No such file or directory, file=/tmp/somefile]

[message=tail: '/tmp/somefile' has become accessible, file=/tmp/somefile]

[message=tail: '/tmp/somefile' has become inaccessible:
No such file or directory, file=/tmp/somefile]

[message=tail: '/tmp/somefile' has appeared;
following end of new file, file=/tmp/somefile]

前面示例中所示的事件序列可能会发生,例如,在轮换文件时。

从版本 5.0 开始,如果在 期间文件中没有数据,则会发出 。 以下示例显示了此类事件的外观:​​FileTailingIdleEvent​​​​idleEventInterval​

[message=Idle timeout, file=/tmp/somefile] [idle time=5438]

并非所有支持命令的平台都提供这些状态消息。​​tail​

从这些终结点发出的消息具有以下标头:

  • ​FileHeaders.ORIGINAL_FILE​​:对象File
  • ​FileHeaders.FILENAME​​:文件名 (File.getName())

在 5.0 版之前的版本中,标头包含文件绝对路径的字符串表示形式。 现在,您可以通过调用原始文件头来获取该字符串表示形式。​​FileHeaders.FILENAME​​​​getAbsolutePath()​

以下示例使用默认选项('-F -n 0',表示从当前末尾跟随文件名)创建一个本机适配器。

<int-file:tail-inbound-channel-adapter id="native"
channel="input"
task-executor="exec"
file="/tmp/foo"/>

以下示例使用“-F -n +0”选项创建本机适配器(表示跟随文件名,发出所有现有行)。

<int-file:tail-inbound-channel-adapter id="native"
channel="input"
native-options="-F -n +0"
task-executor="exec"
file-delay=10000
file="/tmp/foo"/>

如果命令失败(在某些平台上,缺少文件会导致失败,即使指定了),则每 10 秒重试一次命令。​​tail​​​​tail​​​​-F​

默认情况下,本机适配器从标准输出捕获并将内容作为消息发送。 它们还从标准错误中捕获以引发事件。 从版本 4.3.6 开始,可以通过将 设置为 来放弃标准错误事件,如以下示例所示:​​enable-status-reader​​​​false​

<int-file:tail-inbound-channel-adapter id="native"
channel="input"
enable-status-reader="false"
task-executor="exec"
file="/tmp/foo"/>

在以下示例中,设置为 ,表示如果五秒内未写入任何行,则每五秒触发一次:​​IdleEventInterval​​​​5000​​​​FileTailingIdleEvent​

<int-file:tail-inbound-channel-adapter id="native"
channel="input"
idle-event-interval="5000"
task-executor="exec"
file="/tmp/somefile"/>

当您需要停止适配器时,这可能很有用。

以下示例创建一个 Apache 适配器,该适配器每两秒检查一次文件的新行,每十秒检查是否存在丢失的文件:​​commons-io​​​​Tailer​

<int-file:tail-inbound-channel-adapter id="apache"
channel="input"
task-executor="exec"
file="/tmp/bar"
delay="2000"
end="false"
reopen="true"
file-delay="10000"/>

文件从开头 () 而不是结尾(这是默认值)尾部。​​end="false"​

为每个区块重新打开文件(默认设置是保持文件打开)。

指定 或 属性会强制使用 Apache 适配器,并使该属性不可用。​​delay​​​​end​​​​reopen​​​​commons-io​​​​native-options​

处理不完整的数据

文件传输方案中的一个常见问题是如何确定传输已完成,以便您不会开始读取不完整的文件。 解决此问题的常用技术是使用临时名称写入文件,然后将其原子重命名为最终名称。 此技术与屏蔽临时文件不被使用者拾取的筛选器一起,提供了一个强大的解决方案。 这种技术由写入文件(本地或远程)的 Spring 集成组件使用。 默认情况下,它们会附加到文件名,并在传输完成后将其删除。​​.writing​

另一种常见的技术是编写第二个“标记”文件以指示文件传输已完成。 在这种情况下,您不应考虑(例如)在也存在之前可以使用。 Spring Integration 版本 5.0 引入了新的过滤器来支持这种机制。 为文件系统 ()、FTP 和 SFTP 提供了实现。 它们是可配置的,因此标记文件可以具有任何名称,尽管它通常与正在传输的文件相关。 有关更多信息,请参阅 Javadoc。​​somefile.txt​​​​somefile.txt.complete​​​​FileSystemMarkerFilePresentFileListFilter​

写入文件

若要将消息写入文件系统,可以使用 FileWritingMessageHandler。 此类可以处理以下有效负载类型:

  • ​File​
  • ​String​
  • 字节数组
  • ​InputStream​​(从版本 4.2 开始)

对于字符串有效负载,您可以配置编码和字符集。

为了简化操作,可以使用 XML 命名空间将 配置为出站通道适配器或出站网关的一部分。​​FileWritingMessageHandler​

从版本 4.3 开始,您可以指定写入文件时要使用的缓冲区大小。

从版本 5.1 开始,您可以提供一个 如果使用 或 并且必须创建新文件,则会触发 。 此回调接收新创建的文件和触发它的消息。 例如,此回调可用于编写消息标头中定义的 CSV 标头。​​BiConsumer<File, Message<?>>​​​​newFileCallback​​​​FileExistsMode.APPEND​​​​FileExistsMode.APPEND_NO_FLUSH​

生成文件名

在最简单的形式中,只需要一个目标目录来写入文件。 要写入的文件的名称由处理程序的文件名生成器确定。 默认实现查找其键与定义为 FileHeaders.FILENAME的常量匹配的消息标头。​​FileWritingMessageHandler​

或者,您可以指定要根据消息计算的表达式以生成文件名,例如 。 表达式的计算结果必须为 . 为方便起见,还提供了该方法,允许您显式指定其值将用作文件名的消息标头。​​headers['myCustomHeader'] + '.something'​​​​String​​​​DefaultFileNameGenerator​​​​setHeaderName​

设置完成后,将采用以下解析步骤来确定给定消息有效负载的文件名:​​DefaultFileNameGenerator​

  1. 根据消息计算表达式,如果结果为非空,则将其用作文件名。String
  2. 否则,如果有效负载为 ,请使用对象的文件名。java.io.FileFile
  3. 否则,请使用附加 的消息 ID。作为文件名。msg

使用 XML 命名空间支持时,文件出站通道适配器和文件出站网关都支持以下互斥配置属性:

  • ​filename-generator​​(对实现的引用)FileNameGenerator
  • ​filename-generator-expression​​(计算结果为String)

写入文件时,将使用临时文件后缀(缺省值为 )。 在写入文件时,它会追加到文件名中。 若要自定义后缀,可以在文件出站通道适配器和文件出站网关上设置属性。​​.writing​​​​temporary-file-suffix​

使用 file 时,该属性将被忽略,因为数据直接附加到文件中。​​APPEND​​​​mode​​​​temporary-file-suffix​

从 版本 4.2.5 开始,生成的文件名(作为结果或评估)可以表示子路径以及目标文件名。 它像以前一样用作第二个构造函数参数。 但是,在过去,我们不会为子路径创建 () 目录,只假定文件名。 这种方法对于我们需要恢复文件系统树以匹配源目录的情况很有用 - 例如,解压缩存档并按原始顺序保存目标目录中的所有文件时。​​filename-generator​​​​filename-generator-expression​​​​File(File parent, String child)​​​​mkdirs()​

指定输出目录

文件出站通道适配器和文件出站网关都提供两个互斥的配置属性,用于指定输出目录:

  • ​directory​
  • ​directory-expression​

Spring Integration 2.2 引入了该属性。​​directory-expression​

使用属性​​directory​

使用该属性时,输出目录将设置为固定值,该值在初始化 时设置。 如果未指定此属性,则必须使用该属性。​​directory​​​​FileWritingMessageHandler​​​​directory-expression​

使用属性​​directory-expression​

如果要获得完整的 SpEL 支持,可以使用该属性。 此属性接受针对正在处理的每条消息计算的 SpEL 表达式。 因此,当您动态指定输出文件目录时,您可以完全访问消息的有效负载及其标头。​​directory-expression​

SpEL 表达式必须解析为 、 或 。 (后者无论如何都会被评估为 a。 此外,生成的 or 必须指向一个目录。 如果未指定属性,则必须设置该属性。​​String​​​​java.io.File​​​​org.springframework.core.io.Resource​​​​File​​​​String​​​​File​​​​directory-expression​​​​directory​

使用属性​​auto-create-directory​

默认情况下,如果目标目录不存在,则会自动创建相应的目标目录和任何不存在的父目录。 要防止这种行为,可以将该属性设置为 。 此属性适用于 和 属性。​​auto-create-directory​​​​false​​​​directory​​​​directory-expression​


使用 属性 和 is 时,从 Spring Integration 2.2 开始进行了以下更改:​​directory​​​​auto-create-directory​​​​false​



现在,对正在处理的每条消息执行此检查,而不是在初始化适配器时检查目标目录是否存在。



此外,如果在处理消息之间删除了目录,则会为正在处理的每条消息重新创建该目录。​​auto-create-directory​​​​true​


处理现有目标文件

当您写入文件并且目标文件已存在时,默认行为是覆盖该目标文件。 您可以通过在相关文件出站组件上设置属性来更改此行为。 存在以下选项:​​mode​

  • ​REPLACE​​(默认)
  • ​REPLACE_IF_MODIFIED​
  • ​APPEND​
  • ​APPEND_NO_FLUSH​
  • ​FAIL​
  • ​IGNORE​

Spring Integration 2.2 引入了属性和 、 和选项。​​mode​​​​APPEND​​​​FAIL​​​​IGNORE​

​REPLACE​

如果目标文件已存在,则会覆盖该文件。 如果未指定该属性,这是写入文件时的默认行为。​​mode​

​REPLACE_IF_MODIFIED​

如果目标文件已存在,则仅当上次修改的时间戳与源文件的时间戳不同时,才会覆盖该文件。 对于有效负载,将有效负载时间与现有文件进行比较。 对于其他有效负载,会将 () 标头与现有文件进行比较。 如果标头丢失或值不是 ,则始终替换该文件。​​File​​​​lastModified​​​​FileHeaders.SET_MODIFIED​​​​file_setModified​​​​Number​

​APPEND​

此模式允许您将邮件内容追加到现有文件,而不是每次都创建新文件。 请注意,此属性与该属性互斥,因为当它将内容追加到现有文件时,适配器不再使用临时文件。 该文件在每条消息后关闭。​​temporary-file-suffix​

​APPEND_NO_FLUSH​

此选项具有与 相同的语义,但不会刷新数据,并且不会在每条消息后关闭文件。 这可以提供显著的性能,但在发生故障时有数据丢失的风险。 有关详细信息,请参阅使用 APPEND_NO_FLUSH 时刷新文件。APPEND

FAIL

如果目标文件存在,则会引发 MessageHandlingException。

​IGNORE​

如果目标文件存在,则以静默方式忽略消息负载。

使用临时文件后缀(默认值为 )时,如果存在最终文件名或临时文件名,则应用该选项。​​.writing​​​​IGNORE​

使用时刷新文件​​APPEND_NO_FLUSH​

该模式是在版本 4.3 中添加的。 使用它可以提高性能,因为文件不会在每封邮件后关闭。 但是,这可能会导致发生故障时数据丢失。​​APPEND_NO_FLUSH​

Spring 集成提供了几种刷新策略来减轻这种数据丢失:

  • 用。 如果文件在此时间段内未写入,则会自动刷新该文件。 这是近似值,可能到现在为止(平均值为 )。flushInterval1.33x1.167x
  • 将包含正则表达式的消息发送到消息处理程序的方法。 具有与模式匹配的绝对路径名的文件将被刷新。trigger
  • 为处理程序提供自定义实现,以修改将消息发送到方法时执行的操作。MessageFlushPredicatetrigger
  • 通过传入自定义或实现来调用处理程序的方法之一。flushIfNeededFileWritingMessageHandler.FlushPredicateFileWritingMessageHandler.MessageFlushPredicate

为每个打开的文件调用谓词。 有关更多信息,请参阅 Javadoc 了解这些接口。 请注意,从版本 5.0 开始,谓词方法提供了另一个参数:如果当前文件是新的或以前关闭的,则首次写入的时间。

使用 时,间隔从上次写入开始。 仅当文件在时间间隔内处于空闲状态时,才会刷新该文件。 从版本 4.3.7 开始,可以将附加属性 () 设置为 ,这意味着间隔从第一次写入以前刷新的(或新)文件开始。​​flushInterval​​​​flushWhenIdle​​​​false​

文件时间戳

默认情况下,目标文件的时间戳是创建文件的时间(除非就地重命名保留当前时间戳)。 从版本 4.3 开始,您现在可以配置(或使用 Java 配置时)。 对于有效负载,这会将时间戳从入站文件传输到出站文件(无论是否需要副本)。 对于其他有效负载,如果标头 () 存在,则它用于设置目标文件的时间戳,只要标头为 .​​lastModified​​​​preserve-timestamp​​​​setPreserveTimestamp(true)​​​​File​​​​FileHeaders.SET_MODIFIED​​​​file_setModified​​​​lastModified​​​​Number​

文件权限

从版本 5.0 开始,将文件写入支持 Posix 权限的文件系统时,可以在出站通道适配器或网关上指定这些权限。 该属性是一个整数,通常以熟悉的八进制格式提供,例如,表示所有者具有读/写权限,组具有只读权限,而其他人没有访问权限。​​0640​

文件出站通道适配器

以下示例配置文件出站通道适配器:

<int-file:outbound-channel-adapter id="filesOut" directory="${input.directory.property}"/>

基于命名空间的配置还支持属性。 如果设置为 ,则会在写入目标后触发原始源文件的删除。 该标志的缺省值为 。 下面的示例演示如何将其设置为 :​​delete-source-files​​​​true​​​​false​​​​true​

<int-file:outbound-channel-adapter id="filesOut"
directory="${output.directory}"
delete-source-files="true"/>

仅当入站消息具有有效负载或标头值包含源实例或表示原始文件路径的 时,该属性才有效。​​delete-source-files​​​​File​​​​FileHeaders.ORIGINAL_FILE​​​​File​​​​String​

从版本 4.2 开始,支持一个选项。 如果设置为 ,则在写入消息后,将在文件中追加一个新行。 缺省属性值为 。 以下示例演示如何使用该选项:​​FileWritingMessageHandler​​​​append-new-line​​​​true​​​​false​​​​append-new-line​

<int-file:outbound-channel-adapter id="newlineAdapter"
append-new-line="true"
directory="${output.directory}"/>

出站网关

如果要继续处理基于写入文件的消息,则可以改用 。 它的作用类似于 . 但是,在写入文件后,它还会将其作为消息的有效负载发送到回复通道。​​outbound-gateway​​​​outbound-channel-adapter​

以下示例配置出站网关:

<int-file:outbound-gateway id="mover" request-channel="moveInput"
reply-channel="output"
directory="${output.directory}"
mode="REPLACE" delete-source-files="true"/>

如前所述,您还可以指定属性,该属性定义如何处理目标文件已存在的情况的行为。 有关更多详细信息,请参阅处理现有目标文件。 通常,使用文件出站网关时,结果文件作为回复通道上的消息负载返回。mode

这也适用于指定模式时。 在这种情况下,将返回预先存在的目标文件。 如果请求消息的有效负载是文件,您仍然可以通过消息标头访问该原始文件。 请参阅FileHeaders.ORIGINAL_FILE。​​IGNORE​

“出站网关”在您希望首先移动文件,然后通过处理管道发送文件的情况下非常有效。 在这种情况下,可以将文件命名空间的元素连接到 ,然后将该网关的元素连接到管道的开头。​​inbound-channel-adapter​​​​outbound-gateway​​​​reply-channel​

如果您有更详细的要求或需要支持其他有效负载类型作为要转换为文件内容的输入,则可以扩展 ,但更好的选择是依赖 Transformer。​​FileWritingMessageHandler​

使用 Java 配置进行配置

以下 Spring 引导应用程序显示了如何使用 Java 配置配置入站适配器的示例:

@SpringBootApplication
@IntegrationComponentScan
public class FileWritingJavaApplication {

public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(FileWritingJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.writeToFile("foo.txt", new File(tmpDir.getRoot(), "fileWritingFlow"), "foo");
}

@Bean
@ServiceActivator(inputChannel = "writeToFileChannel")
public MessageHandler fileWritingMessageHandler() {
Expression directoryExpression = new SpelExpressionParser().parseExpression("headers.directory");
FileWritingMessageHandler handler = new FileWritingMessageHandler(directoryExpression);
handler.setFileExistsMode(FileExistsMode.APPEND);
return handler;
}

@MessagingGateway(defaultRequestChannel = "writeToFileChannel")
public interface MyGateway {

void writeToFile(@Header(FileHeaders.FILENAME) String fileName,
@Header(FileHeaders.FILENAME) File directory, String data);

}
}

使用 Java DSL 进行配置

以下 Spring 引导应用程序显示了如何使用 Java DSL 配置入站适配器的示例:

@SpringBootApplication
public class FileWritingJavaApplication {

public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(FileWritingJavaApplication.class)
.web(false)
.run(args);
MessageChannel fileWritingInput = context.getBean("fileWritingInput", MessageChannel.class);
fileWritingInput.send(new GenericMessage<>("foo"));
}

@Bean
public IntegrationFlow fileWritingFlow() {
return IntegrationFlow.from("fileWritingInput")
.enrichHeaders(h -> h.header(FileHeaders.FILENAME, "foo.txt")
.header("directory", new File(tmpDir.getRoot(), "fileWritingFlow")))
.handle(Files.outboundGateway(m -> m.getHeaders().get("directory")))
.channel(MessageChannels.queue("fileWritingResultChannel"))
.get();
}

}

文件转换器

要将从文件系统读取的数据转换为对象,反之亦然,您需要做一些工作。 与在较小程度上不同,您可能需要自己的机制来完成工作。 为此,您可以实现该接口。 或者,您可以扩展入站消息。 Spring Integration提供了一些明显的实现。​​FileReadingMessageSource​​​​FileWritingMessageHandler​​​​Transformer​​​​AbstractFilePayloadTransformer​

请参阅 Javadoc 的 Transformer 接口,了解哪些 Spring Integration 类实现了它。 类似地,你可以检查Javadoc中的AbstractFilePayloadTransformer类,看看哪些Spring Integration类扩展了它。

​FileToByteArrayTransformer​​使用 Spring 的 扩展对象并将其转换为 . 使用一系列转换器通常比将所有转换放在单个类中更好。 在这种情况下,转换可能是合乎逻辑的第一步。​​AbstractFilePayloadTransformer​​​​File​​​​byte[]​​​​FileCopyUtils​​​​File​​​​byte[]​

​FileToStringTransformer​​扩展 将对象转换为 . 如果没有别的,这对于调试很有用(考虑将其与窃听一起使用)。​​AbstractFilePayloadTransformer​​​​File​​​​String​

若要配置文件特定的转换器,可以使用文件命名空间中的相应元素,如以下示例所示:

<int-file:file-to-bytes-transformer  input-channel="input" output-channel="output"
delete-files="true"/>

<int-file:file-to-string-transformer input-channel="input" output-channel="output"
delete-files="true" charset="UTF-8"/>

该选项向转换器发出信号,指示它应在转换完成后删除入站文件。 这绝不是使用在多线程环境中使用时(例如,当您通常使用 Spring 集成时)使用的替代品。​​delete-files​​​​AcceptOnceFileListFilter​​​​FileReadingMessageSource​

文件拆分器

在版本 4.1.2 中添加了 ,在版本 4.2 中添加了其命名空间支持。 根据 将文本文件拆分为单独的行。 默认情况下,拆分器使用 在从文件中读取行时一次发出一行。 将该属性设置为 使其在将所有行作为消息发出之前读入内存。 一个用例可能是,如果要在发送任何包含行的消息之前检测文件的 I/O 错误。 但是,它仅适用于相对较短的文件。​​FileSplitter​​​​FileSplitter​​​​BufferedReader.readLine()​​​​Iterator​​​​iterator​​​​false​

入站有效负载可以是 、(路径)或 。 其他有效负载类型将保持不变。​​File​​​​String​​​​File​​​​InputStream​​​​Reader​

以下清单显示了配置 :​​FileSplitter​

@SpringBootApplication
public class FileSplitterApplication {

public static void main(String[] args) {
new SpringApplicationBuilder(FileSplitterApplication.class)
.web(false)
.run(args);
}

@Bean
public IntegrationFlow fileSplitterFlow() {
return IntegrationFlow
.from(Files.inboundAdapter(tmpDir.getRoot())
.filter(new ChainFileListFilter<File>()
.addFilter(new AcceptOnceFileListFilter<>())
.addFilter(new ExpressionFileListFilter<>(
new FunctionExpression<File>(f -> "foo.tmp".equals(f.getName()))))))
.split(Files.splitter()
.markers()
.charset(StandardCharsets.US_ASCII)
.firstLineAsHeader("fileHeader")
.applySequence(true))
.channel(c -> c.queue("fileSplittingResultChannel"))
.get();
}

}

拆分器的 Bean 名称。

设置为 (默认值) 以使用迭代器或在发送行之前将文件加载到内存中。​​true​​​​false​

设置为 以在文件数据之前和之后发出文件开头和文件结尾标记消息。 标记是具有有效负载(属性中带有 and 值)的消息。 在筛选某些行的下游流中按顺序处理文件时,可以使用标记。 它们使下游处理能够知道文件何时被完全处理。 此外,包含这些消息或添加到这些消息的标头。 标记包括行数。 如果文件为空,则仅以 . 缺省值为 。 当 为 时,缺省情况下为 。 另请参阅(下一个属性)。​​true​​​​FileSplitter.FileMarker​​​​START​​​​END​​​​mark​​​​file_marker​​​​START​​​​END​​​​END​​​​START​​​​END​​​​0​​​​lineCount​​​​false​​​​true​​​​apply-sequence​​​​false​​​​markers-json​

如果为 true,则将其设置为将对象转换为 JSON 字符串。 (使用下面)。​​markers​​​​true​​​​FileMarker​​​​SimpleJsonSerializer​

设置为 以禁用在邮件中包含 和 标头。 默认值为 ,除非为 。 当和是时,标记包含在序列中。 当 和 为 时,标头设置为 ,因为大小未知。​​false​​​​sequenceSize​​​​sequenceNumber​​​​true​​​​markers​​​​true​​​​true​​​​markers​​​​true​​​​true​​​​iterator​​​​true​​​​sequenceSize​​​​0​

设置为 如果文件中没有行,则引发 。 缺省值为 。​​true​​​​RequiresReplyException​​​​false​

设置将文本数据读入有效负载时要使用的字符集名称。 默认值为平台字符集。​​String​

第一行的标头名称,作为标头在为其余行发出的消息中携带。 从版本 5.0 开始。

设置用于将消息发送到拆分器的输入通道。

设置消息发送到的输出通道。

设置发送超时。 仅当 can 阻止(例如完整的 .​​output-channel​​​​QueueChannel​

设置为 以禁用在刷新上下文时自动启动拆分器。 缺省值为 。​​false​​​​true​

如果 是 .​​input-channel​​​​<publish-subscribe-channel/>​

设置拆分器的启动阶段(在 时使用 )。​​auto-startup​​​​true​

还将任何基于文本的行拆分为行。 从版本 4.3 开始,当与 FTP 或 SFTP 流式处理入站通道适配器或使用选项检索文件的 FTP 或 SFTP 出站网关结合使用时,拆分器会在文件完全使用时自动关闭支持流的会话 有关这些设施的详细信息,请参阅 FTP 流式入站通道适配器和 SFTP 流式入站通道适配器以及 FTP 出站网关和 SFTP 出站网关。​​FileSplitter​​​​InputStream​​​​stream​

使用 Java 配置时,可以使用其他构造函数,如以下示例所示:

public FileSplitter(boolean iterator, boolean markers, boolean markersJson)

如果为 true,则标记表示为 JSON 字符串(使用 )。​​markersJson​​​​SimpleJsonSerializer​

版本 5.0 引入了指定内容第一行是标题的选项(例如 CSV 文件中的列名)。 传递给此属性的参数是标头名称,在为其余行发出的消息中,第一行作为标头携带。 此行不包含在序列标头中(如果为 true),也不包含在与 关联的 中。 注意:从版本 5.5 开始,lineCount' 也作为消息的标头包含在内,因为可以序列化为 JSON。 如果文件仅包含标题行,则该文件被视为空,因此在拆分期间仅发出实例(如果启用了标记 - 否则,不会发出任何消息)。 默认情况下(如果未设置标头名称),则第一行被视为数据,并成为发出的第一条消息的有效负载。​​firstLineAsHeader​​​​applySequence​​​​lineCount​​​​FileMarker.END​​​​FileHeaders.LINE_COUNT​​​​FileMarker.END​​​​FileMarker​​​​FileMarker​

如果需要有关从文件内容中提取标头的更复杂的逻辑(不是第一行,不是行的全部内容,不是某个特定标头等),请考虑在 . 请注意,已移动到标题的行可能会从正常内容流程的下游进行筛选。​​FileSplitter​

幂等下游处理拆分文件

如果为 true,则拆分器在标题中添加行号(如果为 true,则标记计为行)。 行号可与幂等接收器一起使用,以避免在重新启动后重新处理行。​​apply-sequence​​​​SEQUENCE_NUMBER​​​​markers​

例如:

@Bean
public ConcurrentMetadataStore store() {
return new ZookeeperMetadataStore();
}

@Bean
public MetadataStoreSelector selector() {
return new MetadataStoreSelector(
message -> message.getHeaders().get(FileHeaders.ORIGINAL_FILE, File.class)
.getAbsolutePath(),
message -> message.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER)
.toString(),
store())
.compareValues(
(oldVal, newVal) -> Integer.parseInt(oldVal) < Integer.parseInt(newVal));
}

@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(selector());
}

@Bean
public IntegrationFlow flow() {
...
.split(new FileSplitter())
...
.handle("lineHandler", e -> e.advice(idempotentReceiverInterceptor()))
...
}

文件聚合器

从版本 5.5 开始,当启用 START/END 标记时,引入了 a 以涵盖用例的另一面。 为方便起见,实现了所有三个序列详细信息策略:​​FileAggregator​​​​FileSplitter​​​​FileAggregator​

  • 与属性用于相关键计算。 在 上启用标记时,它不会填充序列详细信息标头,因为 START/END 标记消息也包含在序列大小中。 仍然为发出的每行填充,包括开始/结束标记消息。HeaderAttributeCorrelationStrategyFileHeaders.FILENAMEFileSplitterFileHeaders.FILENAME
  • - 检查组中的消息,然后将标头值与组大小减去 - 实例进行比较。 它还实现了方便的触点,以便在 . 有关详细信息,请参阅消息组条件。FileMarkerReleaseStrategyFileSplitter.FileMarker.Mark.ENDFileHeaders.LINE_COUNT2FileSplitter.FileMarkerGroupConditionProviderconditionSupplierAbstractCorrelatingMessageHandler
  • 只需从组中删除消息,并将其余消息收集到列表有效负载中以生成。FileAggregatingMessageGroupProcessorFileSplitter.FileMarker

以下清单显示了配置 :​​FileAggregator​

@Bean
public IntegrationFlow fileSplitterAggregatorFlow(TaskExecutor taskExecutor) {
return f -> f
.split(Files.splitter()
.markers()
.firstLineAsHeader("firstLine"))
.channel(c -> c.executor(taskExecutor))
.filter(payload -> !(payload instanceof FileSplitter.FileMarker),
e -> e.discardChannel("aggregatorChannel"))
.<String, String>transform(String::toUpperCase)
.channel("aggregatorChannel")
.aggregate(new FileAggregator())
.channel(c -> c.queue("resultChannel"));
}

如果 的默认行为不满足目标逻辑,建议使用单个策略配置聚合器终结点。 有关更多信息,请参阅 JavaDocs。​​FileAggregator​​​​FileAggregator​

远程持久文件列表筛选器

入站和流式入站远程文件通道适配器(、 和其他技术)配置有相应的实现,默认情况下配置有内存中 . 要在群集中运行,可以使用共享筛选器将其替换为筛选器(有关详细信息,请参阅元数据存储)。 这些过滤器用于防止多次获取同一文件(除非修改时间发生变化)。 从版本 5.2 开始,文件会在获取文件之前立即添加到过滤器中(如果获取失败,则会撤消)。​​FTP​​​​SFTP​​​​AbstractPersistentFileListFilter​​​​MetadataStore​​​​MetadataStore​

如果发生灾难性故障(例如断电),当前正在提取的文件可能会保留在筛选器中,并且在重新启动应用程序时不会重新提取。 在这种情况下,您需要手动从 .​​MetadataStore​

在以前的版本中,在获取任何文件之前会过滤这些文件,这意味着在发生灾难性故障后,多个文件可能处于此状态。

为了促进这种新行为,向 中添加了两种新方法。​​FileListFilter​

boolean accept(F file);

boolean supportsSingleFileFiltering();

如果筛选器返回 ,则必须实现 。​true​​​​supportsSingleFileFiltering​​​​accept()​

如果远程筛选器不支持单个文件筛选(如 ),适配器将恢复到以前的行为。​​AbstractMarkerFilePresentFileListFilter​

如果使用了多个筛选器(使用 或 ),则所有委托筛选器都必须支持单个文件筛选,复合筛选器才能支持它。​​CompositeFileListFilter​​​​ChainFileListFilter​

持久文件列表筛选器现在具有布尔属性。 将此属性设置为 也会设置 ,这意味着出站网关 ( 和 ) 上的递归操作现在每次将始终遍历整个目录树。 这是为了解决未检测到目录树深处的更改的问题。 此外,导致将文件的完整路径用作元数据存储键;这解决了如果具有相同名称的文件在不同目录中多次出现,过滤器无法正常工作的问题。 重要说明:这意味着对于顶级目录下的文件,将找不到持久元数据存储中的现有密钥。 因此,默认情况下,该属性是;这可能会在将来的版本中更改。​​forRecursion​​​​true​​​​alwaysAcceptDirectories​​​​ls​​​​mget​​​​forRecursion=true​​​​false​


标签:文件,File,Support,file,使用,directory,true,属性
From: https://blog.51cto.com/u_15326439/5921028

相关文章