GridFS Support

MongoDB supports storing binary files inside its filesystem, GridFS. Spring Data MongoDB provides a GridFsOperations interface as well as the corresponding implementation, GridFsTemplate, to let you interact with the filesystem. You can set up a GridFsTemplate instance by handing it a MongoDbFactory as well as a MongoConverter, as the following example shows:

Example 1. JavaConfig setup for a GridFsTemplate
class GridFsConfiguration extends AbstractMongoConfiguration {

  // … further configuration omitted

  @Bean
  public GridFsTemplate gridFsTemplate() {
    return new GridFsTemplate(mongoDbFactory(), mappingMongoConverter());
  }
}

The corresponding XML configuration follows:

Example 2. XML configuration for a GridFsTemplate
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:mongo="http://www.springframework.org/schema/data/mongo"
  xsi:schemaLocation="http://www.springframework.org/schema/data/mongo
                      http://www.springframework.org/schema/data/mongo/spring-mongo.xsd
                      http://www.springframework.org/schema/beans
                      http://www.springframework.org/schema/beans/spring-beans.xsd">

  <mongo:db-factory id="mongoDbFactory" dbname="database" />
  <mongo:mapping-converter id="converter" />

  <bean class="org.springframework.data.mongodb.gridfs.GridFsTemplate">
    <constructor-arg ref="mongoDbFactory" />
    <constructor-arg ref="converter" />
  </bean>

</beans>

The template can now be injected and used to perform storage and retrieval operations, as the following example shows:

Example 3. Using GridFsTemplate to store files
class GridFsClient {

  @Autowired
  GridFsOperations operations;

  @Test
  public void storeFileToGridFs() {

    FileMetadata metadata = new FileMetadata();
    // populate metadata
    Resource file = … // lookup File or Resource

    operations.store(file.getInputStream(), "filename.txt", metadata);
  }
}

The store(…) operations take an InputStream, a filename, and (optionally) metadata information about the file to store. The metadata can be an arbitrary object, which will be marshaled by the MongoConverter configured with the GridFsTemplate. Alternatively, you can also provide a Document.

You can read files from the filesystem through either the find(…) or the getResources(…) methods. Let’s have a look at the find(…) methods first. You can either find a single file or multiple files that match a Query. You can use the GridFsCriteria helper class to define queries. It provides static factory methods to encapsulate default metadata fields (such as whereFilename() and whereContentType()) or a custom one through whereMetaData(). The following example shows how to use GridFsTemplate to query for files:

Example 4. Using GridFsTemplate to query for files
class GridFsClient {

  @Autowired
  GridFsOperations operations;

  @Test
  public void findFilesInGridFs() {
    GridFSFindIterable result = operations.find(query(whereFilename().is("filename.txt")))
  }
}
Currently, MongoDB does not support defining sort criteria when retrieving files from GridFS. For this reason, any sort criteria defined on the Query instance handed into the find(…) method are disregarded.

The other option to read files from the GridFs is to use the methods introduced by the ResourcePatternResolver interface. They allow handing an Ant path into the method and can thus retrieve files matching the given pattern. The following example shows how to use GridFsTemplate to read files:

Example 5. Using GridFsTemplate to read files
class GridFsClient {

  @Autowired
  GridFsOperations operations;

  @Test
  public void readFilesFromGridFs() {
    GridFsResources[] txtFiles = operations.getResources("*.txt");
  }
}

GridFsOperations extends ResourcePatternResolver and lets the GridFsTemplate (for example) to be plugged into an ApplicationContext to read Spring Config files from MongoDB database.

Infinite Streams with Tailable Cursors

By default, MongoDB automatically closes a cursor when the client exhausts all results supplied by the cursor. Closing a cursor on exhaustion turns a stream into a finite stream. For capped collections, you can use a Tailable Cursor that remains open after the client consumed all initially returned data.

Capped collections can be created with MongoOperations.createCollection. To do so, provide the required CollectionOptions.empty().capped()…​.

Tailable cursors can be consumed with both, the imperative and the reactive MongoDB API. It is highly recommended to use the reactive variant, as it is less resource-intensive. However, if you cannot use the reactive API, you can still use a messaging concept that is already prevalent in the Spring ecosystem.

Tailable Cursors with MessageListener

Listening to a capped collection using a Sync Driver creates a long running, blocking task that needs to be delegated to a separate component. In this case, we need to first create a MessageListenerContainer, which will be the main entry point for running the specific SubscriptionRequest. Spring Data MongoDB already ships with a default implementation that operates on MongoTemplate and is capable of creating and executing Task instances for a TailableCursorRequest.

The following example shows how to use tailable cursors with MessageListener instances:

Example 6. Tailable Cursors with MessageListener instances
MessageListenerContainer container = new DefaultMessageListenerContainer(template);
container.start();                                                                  (1)

MessageListener<Document, User> listener = System.out::println;                     (2)

TailableCursorRequest request = TailableCursorRequest.builder()
  .collection("orders")                                                             (3)
  .filter(query(where("value").lt(100)))                                            (4)
  .publishTo(listener)                                                              (5)
  .build();

container.register(request, User.class);                                            (6)

// ...

container.stop();                                                                   (7)
1 Starting the container intializes the resources and starts Task instances for already registered SubscriptionRequest instances. Requests added after startup are ran immediately.
2 Define the listener called when a Message is received. The Message#getBody() is converted to the requested domain type. Use Document to receive raw results without conversion.
3 Set the collection to listen to.
4 Provide an optional filter for documents to receive.
5 Set the message listener to publish incoming Messages to.
6 Register the request. The returned Subscription can be used to check the current Task state and cancel its execution to free resources.
7 Do not forget to stop the container once you are sure you no longer need it. Doing so stops all running Task instances within the container.

Reactive Tailable Cursors

Using tailable cursors with a reactive data types allows construction of infinite streams. A tailable cursor remains open until it is closed externally. It emits data as new documents arrive in a capped collection.

Tailable cursors may become dead, or invalid, if either the query returns no match or the cursor returns the document at the “end” of the collection and the application then deletes that document. The following example shows how to create and use an infinite stream query:

Example 7. Infinite Stream queries with ReactiveMongoOperations
Flux<Person> stream = template.tail(query(where("name").is("Joe")), Person.class);

Disposable subscription = stream.doOnNext(person -> System.out.println(person)).subscribe();

// …

// Later: Dispose the subscription to close the stream
subscription.dispose();

Spring Data MongoDB Reactive repositories support infinite streams by annotating a query method with @Tailable. This works for methods that return Flux and other reactive types capable of emitting multiple elements, as the following example shows:

Example 8. Infinite Stream queries with ReactiveMongoRepository
public interface PersonRepository extends ReactiveMongoRepository<Person, String> {

  @Tailable
  Flux<Person> findByFirstname(String firstname);

}

Flux<Person> stream = repository.findByFirstname("Joe");

Disposable subscription = stream.doOnNext(System.out::println).subscribe();

// …

// Later: Dispose the subscription to close the stream
subscription.dispose();

Change Streams

As of MongoDB 3.6, Change Streams let applications get notified about changes without having to tail the oplog.

Change Stream support is only possible for replica sets or for a sharded cluster.

Change Streams can be consumed with both, the imperative and the reactive MongoDB Java driver. It is highly recommended to use the reactive variant, as it is less resource-intensive. However, if you cannot use the reactive API, you can still obtain change events by using the messaging concept that is already prevalent in the Spring ecosystem.

It is possible to watch both on a collection as well as database level, whereas the database level variant publishes changes from all collections within the database. When subscribing to a database change stream, make sure to use a suitable type for the event type as conversion might not apply correctly across different entity types. In doubt, use Document.

Change Streams with MessageListener

Listening to a Change Stream by using a Sync Driver creates a long running, blocking task that needs to be delegated to a separate component. In this case, we need to first create a MessageListenerContainer, which will be the main entry point for running the specific SubscriptionRequest tasks. Spring Data MongoDB already ships with a default implementation that operates on MongoTemplate and is capable of creating and executing Task instances for a ChangeStreamRequest.

The following example shows how to use Change Streams with MessageListener instances:

Example 9. Change Streams with MessageListener instances
MessageListenerContainer container = new DefaultMessageListenerContainer(template);
container.start();                                                                                        (1)

MessageListener<ChangeStreamDocument<Document>, User> listener = System.out::println;                     (2)
ChangeStreamRequestOptions options = new ChangeStreamRequestOptions("user", ChangeStreamOptions.empty()); (3)

Subscription subscription = container.register(new ChangeStreamRequest<>(listener, options), User.class); (4)

// ...

container.stop();                                                                                         (5)
1 Starting the container intializes the resources and starts Task instances for already registered SubscriptionRequest instances. Requests added after startup are ran immediately.
2 Define the listener called when a Message is received. The Message#getBody() is converted to the requested domain type. Use Document to receive raw results without conversion.
3 Set the collection to listen to and provide additional options through ChangeStreamOptions.
4 Register the request. The returned Subscription can be used to check the current Task state and cancel its execution to free resources.
5 Do not forget to stop the container once you are sure you no longer need it. Doing so stops all running Task instances within the container.

Reactive Change Streams

Subscribing to Change Streams with the reactive API is a more natural approach to work with streams. Still, the essential building blocks, such as ChangeStreamOptions, remain the same. The following example shows how to use Change Streams emitting ChangeStreamEvents:

Example 10. Change Streams emitting ChangeStreamEvent
ChangeStreamOptions options = ChangeStreamOptions.builder()
    .filter(newAggregation(User.class, match(where("age").gte(38)))                              (1)
    .build();

Flux<ChangeStreamEvent<User>> flux = reactiveTemplate.changeStream("user", options, User.class); (2)
1 Use an aggregation pipeline to filter events.
2 Obtain a Flux of change stream events. The ChangeStreamEvent#getBody() is converted to the requested domain type. Use Document to receive raw results without conversion.

Resuming Change Streams

Change Streams can be resumed and resume emitting events where you left. To resume the stream, you need to supply either a resume token or the last known server time (in UTC). Use ChangeStreamOptions to set the value accordingly.

The following example shows how to set the resume offset using server time:

Example 11. Resume a Change Stream
ChangeStreamOptions = ChangeStreamOptions.builder()
    .resumeAt(Instant.now().minusSeconds(1))        (1)
    .build()

Flux<ChangeStreamEvent<Person>> resumed = template.changeStream("person", options, User.class)
1 You may obtain the server time of an ChangeStreamEvent through the getTimestamp method or use the resumeToken exposed through getResumeToken.