Skip to content

How To Stop Polling InboundChannelAdapter

Im polling files from 2 different directories in 1 server using RotatingServerAdvice and that´s working fine, the problem is that I can´t stop polling once time I start the inboundtest.start (). The main idea is retrive all the files in those directories, and then send inboundtest.stop (), this is the code.

    public SessionFactory<LsEntry> sftpSessionFactory() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(false);
        return factory;

    public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
        SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
        fileSynchronizer.setFilter(new SftpRegexPatternFileListFilter(".*?\.(txt|TXT?)"));
        return fileSynchronizer;

    @Bean(name = "sftpMessageSource")
    @InboundChannelAdapter(channel = "sftpChannel",poller = @Poller("fileReadingMessageSourcePollerMetadata"), autoStartup = "false")
    public MessageSource<File> sftpMessageSource() {
        SftpInboundFileSynchronizingMessageSource source =
                new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
        source.setLocalDirectory(new File(sftpLocalDirectoryDownloadUpload));
        source.setLocalFilter(new AcceptOnceFileListFilter<File>());
        return source;

        public DelegatingSessionFactory<LsEntry> sessionFactory() {
            Map<Object, SessionFactory<LsEntry>> factories = new LinkedHashMap<>();
                factories.put("one", sftpSessionFactory());
            // use the first SF as the default
            return new DelegatingSessionFactory<LsEntry>(factories, factories.values().iterator().next());

    public RotatingServerAdvice advice() {
        List<RotationPolicy.KeyDirectory> keyDirectories = new ArrayList<>();
        keyDirectories.add(new RotationPolicy.KeyDirectory("one", sftpRemoteDirectory));
        keyDirectories.add(new RotationPolicy.KeyDirectory("one", sftpRemoteDirectoryNonUpload));
        return new RotatingServerAdvice(sessionFactory(), keyDirectories, false);

    MessageChannel controlChannel() {
        return new DirectChannel();
    @ServiceActivator(inputChannel = "controlChannel")
    ExpressionControlBusFactoryBean controlBus() {
        return new ExpressionControlBusFactoryBean();

    public PollerMetadata fileReadingMessageSourcePollerMetadata() {
        PollerMetadata meta = new PollerMetadata();
        meta.setTrigger(new PeriodicTrigger(1000));
        meta.setErrorHandler(throwable -> new IOException());
        return meta;

Allways is waiting for a new file in one of the 2 directories, but thats no the idea, the idea is stop polling when all the files be retrived

From another class I call inbound.start() trouhg the control chanel here the code:

private MessageChannel controlChannel;

public void startProcessingFiles() throws InterruptedException {
    controlChannel.send(new GenericMessage<>("@inboundtest.start()"));

I was tryong stop with this class but doesn´t works

public class StopPollingAdvice implements ReceiveMessageAdvice {

private MessageChannel controlChannel;

public Message<?> afterReceive(Message<?> message, Object o) {
    System.out.println("There is no more files, stopping connection" + message.getPayload());
    if(message == null) {
        System.out.println("There is no more files, stopping connection" + message.getPayload());
        Message operation = MessageBuilder.withPayload("@inboundtest.stop()").build();
    return message;



OK. Now I see your point. The RotatingServerAdvice does move to other server only when the first is exhausted (by default, see that fair option). So, when you stop it in the advice it cannot go to other dir for fetching any more. You need to think about some other stopping solution. Something what is not tied to the advice and this afterReceive(), somewhere downstream in your flow…

Or you can provide a custom RotationPolicy (extension of StandardRotationPolicy) and in its overridden afterReceive() check for all the dirs processed and then send stop command.

3 People found this is helpful