Reactive Transactions

Same as with the reactive ClientSession support, the ReactiveMongoTemplate offers dedicated methods for operating within a transaction without having to worry about the commit/abort actions depending on the operations outcome.

Reactive use of ClientSession and transactions is limited to Template API usage. There’s currently no session or transaction integration with reactive repositories.

Using the plain MongoDB reactive driver API a delete within a transactional flow may look like this.

Example 1. Native driver support
Mono<DeleteResult> result = Mono
    .from(client.startSession())                                                             (1)

    .flatMap(session -> {
        session.startTransaction();                                                          (2)

        return Mono.from(collection.deleteMany(session, ...))                                (3)

            .onErrorResume(e -> Mono.from(session.abortTransaction()).then(Mono.error(e)))   (4)

            .flatMap(val -> Mono.from(session.commitTransaction()).then(Mono.just(val)))     (5)

            .doFinally(signal -> session.close());                                           (6)
      });
1 First we obviously need to initiate the session.
2 Once we have the ClientSession at hand, start the transaction.
3 Operate within the transaction by passing on the ClientSession to the operation.
4 If the operations completes exceptionally, we need to abort the transaction and preserve the error.
5 Or of course, commit the changes in case of success. Still preserving the operations result.
6 Lastly, we need to make sure to close the session.

The culprit of the above operation is in keeping the main flows DeleteResult instead of the transaction outcome published via either commitTransaction() or abortTransaction(), which leads to a rather complicated setup.

MongoOperations.inTransaction() allows you to utilize the callback from for the reactive session support to actually preserve the flows outcome but also perform commit and abort actions accordingly. This allows you to express the above flow simply as the following:

Example 2. ReactiveMongoTemplate Transactions
Mono<DeleteResult> result = template.inTransaction()                                      (1)

    .execute(action -> action.remove(query(where("id").is("step-1")), Step.class));       (2)
1 Initiate the transaction.
2 Operate within the ClientSession. Each execute(…) unit of work callback initiates a new transaction in the scope of the same ClientSession.
In case you need access to the ClientSession within the flow, you can use ReactiveMongoContext.getSession() to obtain in from the Reactor Context.

Everything happening inside the transactional callback is executed within a managed transaction. Errors within the reactive flow of execute(…) that are not propagated to outside of the callback do not affect the operations within the transaction.

template.inTransaction()                                                            (1)

    .execute(action -> action.find(query(where("state").is("active")), Step.class)
        .flatMap(step -> action.update(Step.class)
            .matching(query(where("id").is(step.id)))
            .apply(update("state", "paused"))
            .all()))                                                                (2)

    .flatMap(updated -> {
        // Exception could happen here                                              (3)
    });
1 Initiate the managed transaction.
2 Operate within the ClientSession. The transaction is committed after this is done or rolled back if an error occurs here.
3 An error outside the transaction flow has no affect on the previous transactional execution.