Redis Messaging (Pub/Sub)

Spring Data provides dedicated messaging integration for Redis, similar in functionality and naming to the JMS integration in Spring Framework.

Redis messaging can be roughly divided into two areas of functionality:

  • Publication or production of messages

  • Subscription or consumption of messages

This is an example of the pattern often called Publish/Subscribe (Pub/Sub for short). The RedisTemplate class is used for message production. For asynchronous reception similar to Java EE’s message-driven bean style, Spring Data provides a dedicated message listener container that is used to create Message-Driven POJOs (MDPs) and, for synchronous reception, the RedisConnection contract.

The org.springframework.data.redis.connection and org.springframework.data.redis.listener packages provide the core functionality for Redis messaging.

Publishing (Sending Messages)

To publish a message, you can use, as with the other operations, either the low-level RedisConnection or the high-level RedisTemplate. Both entities offer the publish method, which accepts the message and the destination channel as arguments. While RedisConnection requires raw data (array of bytes), the RedisTemplate lets arbitrary objects be passed in as messages, as shown in the following example:

// send message through connection RedisConnection con = ...
byte[] msg = ...
byte[] channel = ...
con.publish(msg, channel); // send message through RedisTemplate
RedisTemplate template = ...
template.convertAndSend("hello!", "world");

Subscribing (Receiving Messages)

On the receiving side, one can subscribe to one or multiple channels either by naming them directly or by using pattern matching. The latter approach is quite useful, as it not only lets multiple subscriptions be created with one command but can also listen on channels not yet created at subscription time (as long as they match the pattern).

At the low-level, RedisConnection offers the subscribe and pSubscribe methods that map the Redis commands for subscribing by channel or by pattern, respectively. Note that multiple channels or patterns can be used as arguments. To change the subscription of a connection or query whether it is listening, RedisConnection provides the getSubscription and isSubscribed methods.

Subscription commands in Spring Data Redis are blocking. That is, calling subscribe on a connection causes the current thread to block as it starts waiting for messages. The thread is released only if the subscription is canceled, which happens when another thread invokes unsubscribe or pUnsubscribe on the same connection. See “redis:pubsub:subscribe:containers” (later in this document) for a solution to this problem.

As mentioned earlier, once subscribed, a connection starts waiting for messages. Only commands that add new subscriptions, modify existing subscriptions, and cancel existing subscriptions are allowed. Invoking anything other than subscribe, pSubscribe, unsubscribe, or pUnsubscribe throws an exception.

In order to subscribe to messages, one needs to implement the MessageListener callback. Each time a new message arrives, the callback gets invoked and the user code gets run by the onMessage method. The interface gives access not only to the actual message but also to the channel it has been received through and the pattern (if any) used by the subscription to match the channel. This information lets the callee differentiate between various messages not just by content but also examining additional details.

Message Listener Containers

Due to its blocking nature, low-level subscription is not attractive, as it requires connection and thread management for every single listener. To alleviate this problem, Spring Data offers RedisMessageListenerContainer, which does all the heavy lifting. If you are familiar with EJB and JMS, you should find the concepts familiar, as it is designed to be as close as possible to the support in Spring Framework and its message-driven POJOs (MDPs).

RedisMessageListenerContainer acts as a message listener container. It is used to receive messages from a Redis channel and drive the MessageListener instances that are injected into it. The listener container is responsible for all threading of message reception and dispatches into the listener for processing. A message listener container is the intermediary between an MDP and a messaging provider and takes care of registering to receive messages, resource acquisition and release, exception conversion, and the like. This lets you as an application developer write the (possibly complex) business logic associated with receiving a message (and reacting to it) and delegates boilerplate Redis infrastructure concerns to the framework.

Furthermore, to minimize the application footprint, RedisMessageListenerContainer lets one connection and one thread be shared by multiple listeners even though they do not share a subscription. Thus, no matter how many listeners or channels an application tracks, the runtime cost remains the same throughout its lifetime. Moreover, the container allows runtime configuration changes so that you can add or remove listeners while an application is running without the need for a restart. Additionally, the container uses a lazy subscription approach, using a RedisConnection only when needed. If all the listeners are unsubscribed, cleanup is automatically performed, and the thread is released.

To help with the asynchronous nature of messages, the container requires a java.util.concurrent.Executor (or Spring’s TaskExecutor) for dispatching the messages. Depending on the load, the number of listeners, or the runtime environment, you should change or tweak the executor to better serve your needs. In particular, in managed environments (such as app servers), it is highly recommended to pick a proper TaskExecutor to take advantage of its runtime.

The MessageListenerAdapter

The MessageListenerAdapter class is the final component in Spring’s asynchronous messaging support. In a nutshell, it lets you expose almost any class as a MDP (though there are some constraints).

Consider the following interface definition:

public interface MessageDelegate {
  void handleMessage(String message);
  void handleMessage(Map message); void handleMessage(byte[] message);
  void handleMessage(Serializable message);
  // pass the channel/pattern as well
  void handleMessage(Serializable message, String channel);
 }

Notice that, although the interface does not extend the MessageListener interface, it can still be used as a MDP by using the MessageListenerAdapter class. Notice also how the various message handling methods are strongly typed according to the contents of the various Message types that they can receive and handle. In addition, the channel or pattern to which a message is sent can be passed in to the method as the second argument of type String:

public class DefaultMessageDelegate implements MessageDelegate {
  // implementation elided for clarity...
}

Notice how the above implementation of the MessageDelegate interface (the above DefaultMessageDelegate class) has no Redis dependencies at all. It truly is a POJO that we make into an MDP with the following configuration:

<?xml version="1.0" encoding="UTF-8"?>
 <beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xmlns:redis="http://www.springframework.org/schema/redis"
   xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
   http://www.springframework.org/schema/redis http://www.springframework.org/schema/redis/spring-redis.xsd">

<!-- the default ConnectionFactory -->
<redis:listener-container>
  <!-- the method attribute can be skipped as the default method name is "handleMessage" -->
  <redis:listener ref="listener" method="handleMessage" topic="chatroom" />
</redis:listener-container>

<bean id="listener" class="redisexample.DefaultMessageDelegate"/>
 ...
<beans>
The listener topic can be either a channel (for example, topic="chatroom") or a pattern (for example, topic="*room")

The preceding example uses the Redis namespace to declare the message listener container and automatically register the POJOs as listeners. The full blown beans definition follows:

<bean id="messageListener" class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter">
  <constructor-arg>
    <bean class="redisexample.DefaultMessageDelegate"/>
  </constructor-arg>
</bean>

<bean id="redisContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer">
  <property name="connectionFactory" ref="connectionFactory"/>
  <property name="messageListeners">
    <map>
      <entry key-ref="messageListener">
        <bean class="org.springframework.data.redis.listener.ChannelTopic">
          <constructor-arg value="chatroom">
        </bean>
      </entry>
    </map>
  </property>
</bean>

Each time a message is received, the adapter automatically and transparently performs translation (using the configured RedisSerializer) between the low-level format and the required object type. Any exception caused by the method invocation is caught and handled by the container (by default, exceptions get logged).