Reading FTP files within a Mule 3 flow
Sometimes it can be very useful to be able to read the contents of an FTP file in the middle of a flow, instead of triggering the flow via an inbound FTP endpoint. In Mule 4, this is supported by the default FTP connector. Unfortunately, in Mule 3 it’s not.
In this blog, I will describe how we were able to read FTP files from a Mule 3 flow.
Using a simple Groovy Expression
The easiest implementation would be to use a Groovy expression like the one below:
muleContext.client.request('ftp://${ftp.username}:${ftp.password}@${ftp.host}:${ftp.port}/${ftp.directory}', 30000L);
This will read a file from the specified directory on the FTP server, with a timeout of 30 seconds. Right after the Groovy step, we use the below expression to log both the filename and the content of the retrieved file:
Finished reading FTP file #[message.inboundProperties['originalFileName']] with content: #[message.payloadAs(java.lang.String)]
When the poller is executed, assuming a file is present at the specified FTP directory, something like the following will be logged:
INFO 2018-12-12 14:01:51,999 [[ftp-in-flow].read-ftp-file-via-groovy.stage1.02] org.mule.api.processor.LoggerMessageProcessor: Finished reading FTP file test-file.txt with content: CONTENT OF TEST FILE
However, when the poller triggers the flow another time, we see that both the filename and the payload are null. This is because the request to the FTP server is handled the same way as a regular FTP inbound endpoint. And that default behavior is to delete the file once it has being read.
Reading the file without deleting it from the source
If we don’t want the file to be deleted, we have to use a little bit more code in the Groovy expression. Instead of letting Mule determine the inbound endpoint only via it’s URL, we are going to build an InboundEndpoint ourselves in the Groovy expression:
org.mule.api.endpoint.EndpointBuilder endpointBuilder = muleContext.getEndpointFactory().getEndpointBuilder("ftp://${ftp.username}:${ftp.password}@${ftp.host}:${ftp.port}/${ftp.directory}");
// Disable auto delete, by "moving" to same directory
org.mule.api.endpoint.InboundEndpoint endpoint = endpointBuilder.buildInboundEndpoint();
endpoint.getConnector().setMoveToDirectory("/${ftp.directory}");
// Execute request
endpoint.request(30000L);
In our case, the getConnector() method on line 4 will retrieve a com.mulesoft.mule.transport.ftp.EEFtpConnector object. This is a subclass of the FtpConnector. The main difference is that the EEFtpConnector object, also has getter and setter methods for its moveToDirectory and moveToPattern fields.
These can be used to specify where to move and/or rename the file to.
In our case we don’t want to move and/or rename the file, but we certainly do also not want the file to be deleted. We can “fake” this behaviour, by “moving” the file to the same directory as it was also read from.
Note: If you are connecting to an SFTP server instead of an FTP server. The getConnector() result will evaluate to an SftpConnector instance. Here you can just set auto delete to false.
Adding a filename filter
Now we can also add a filename filter fairly easily. We can add a MessageFilter to the EndpointBuilder. In our case we will use a FilenameRegexFilter to check if the filename starts with “test-“ and ends with “.txt”.
org.mule.api.endpoint.EndpointBuilder endpointBuilder = muleContext.getEndpointFactory().getEndpointBuilder("ftp://${ftp.username}:${ftp.password}@${ftp.host}:${ftp.port}/${ftp.directory}");
// Filename filter
org.mule.transport.file.filters.FilenameRegexFilter filter = new org.mule.transport.file.filters.FilenameRegexFilter();
filter.setPattern("test-(.*)\\.txt");
endpointBuilder.addMessageProcessor(new org.mule.routing.MessageFilter(filter));
// Disable auto delete, by "moving" to same directory
org.mule.api.endpoint.InboundEndpoint endpoint = endpointBuilder.buildInboundEndpoint();
endpoint.getConnector().setMoveToDirectory("/${ftp.directory}");
// Execute request
endpoint.request(30000L);
Implementing exception handler
The next thing we ran into, was when we were implementing the catch exception handler for the flow. By default, we uncheck “Log Exceptions” on our catch exception handlers, and implement a custom logger within the handler. In most cases, we use the detailedMessage from the exception.
However, when an exception occurs in the Groovy step (for example when we try to read a file, on which we don’t have read permissions), we only see something like the following in the logfile:
ERROR 2018-12-12 18:38:55,391 [[ftp-in-flow].read-ftp-file-via-groovy.stage1.02] org.mule.api.processor.LoggerMessageProcessor: ERROR: Failed to invoke ScriptComponent{read-ftp-file-via-groovy.component.1493390888}. Component that caused exception is: ScriptComponent{read-ftp-file-via-groovy.component.1493390888}.
This does not give us any information about the root cause of the exception. Luckily we are able to retrieve the root cause using the following MEL expression:
#[org.mule.util.ExceptionUtils.getRootCauseMessage(exception)]
Now we are able to see the actual error code and description in the logfile:
ERROR 2018-12-12 18:40:56,548 [[ftp-in-flow].read-ftp-file-via-groovy.stage1.02] org.mule.api.processor.LoggerMessageProcessor: ERROR: IOException: Failed to change working directory to /test. Ftp error: 550
Errors when reading multiple files asynchronously
The last thing we ran into, was when we called the flow from a Flow Reference within a For Each loop. We did this because we needed to read multiple (3) files from the same directory. However, now we saw errors like the following in the log:
ERROR 2018-12-12 18:56:00,693 [[ftp-in-flow].read-ftp-file-via-groovy.stage1.03] org.mule.api.processor.LoggerMessageProcessor: ERROR: LifecycleException: Cannot perform an action on a connector when it is not started. Connector "connector.ftp.mule.default" is currently in lifecycle phase "dispose"
ERROR 2018-12-12 18:56:00,693 [[ftp-in-flow].read-ftp-file-via-groovy.stage1.02] org.mule.api.processor.LoggerMessageProcessor: ERROR: LifecycleException: Cannot perform an action on a connector when it is not started. Connector "connector.ftp.mule.default" is currently in lifecycle phase "dispose"
ERROR 2018-12-12 18:56:00,693 [[ftp-in-flow].read-ftp-file-via-groovy.stage1.04] org.mule.api.processor.LoggerMessageProcessor: ERROR: LifecycleException: Cannot perform an action on a connector when it is not started. Connector "connector.ftp.mule.default" is currently in lifecycle phase "dispose"
Unfortunately I was not (yet) able to determine the exact cause of these exceptions, but it looks like that Mule does not support multiple (FTP) inbound endpoints to be started (via Groovy) on runtime at the same time, or at least very fast after each other.
In our case, it was sufficient to change the Processing Strategy of the flow which contains the Groovy script for reading the FTP files to “synchronous”, so there is more time between creating (and closing) the FTP inbound endpoints.
Good luck reading your FTP files!
Geen reacties
Geef jouw mening
Reactie plaatsenReactie toevoegen