Spring remoting AMQP

In this post I’ll show how to implement simple Java RPC-based application with Spring remoting AMQP. Usually when building complex enterprise systems you need your application to be distributed rather than monolithic.

Imagine a web application with three modules:

  • Front-end module, which serves user request and renders resulting HTML page.
  • REST API module, which provides common interface to interact with your application.
  • Back-end module where all business logic is performed.

In this example, Back-end acts as a server and Front-end and Rest API are both clients. As a Java developer you often face with several remote technologies:

  • RMI
  • Hessian/Burlap
  • JAX-WS (JAX-RPC)
  • JMS
  • AMQP

You can see all remote technologies supported by Spring Framework here: Remoting and web services using Spring.

JMS/AMQP

I’d like to mention the last two options: JMS and AMQP. These technologies use message broker as an intermediate member. It has some advantages over other technologies. In RMI, for instance, your clients have to know the address of RMI registry. If you have three RMI registry servers, all your clients have to know their addresses.

As opposed to RMI, when using message-based technology, all your members should know the address of message broker only. They don’t need to know about each other at all! Message broker itself does all routing routine.

The difference between JMS and AMQP is that JMS is a standard Java messaging API and AMQP is a standard messaging protocol. I won’t dive deeper into details, there’s a good article: Understanding the Differences between AMQP & JMS.

In this article I’ll show how to implement RPC-based application using Spring and AMQP. We will use RabbitMQ message broker for this, running in Docker container.

RabbitMQ RPC overview

RabbitMQ RPC diagram

In our example we will have Server and Client modules. We can have as many clients and servers as we want! Again, message broker handles all routine. Adding new client or server to our system is as simple as just starting new instance of respective module. No additional configuration required.

We have a Java interface with only one method:

1
2
3
4
5
6
7
8
public interface SimpleInterface {

    /**
     * Returns UUID generated on server side
     * @param clientId client ID
     */
    String doJob(int clientId);
}

When client calls this method, server generates new UUID and returns it back to client. Simple.

Server with message listener, serving SimpleInterface method invocation is bound to “SimpleInterface” queue. On the other side, this queue is bound to rpc exchange with SimpleInterface routing key.

Client sends a message to rpc exchange with SimpleInterface routing key and RabbitMQ message broker routes this message to “SimpleInterface” queue according to our binding.

Okay, let’s start writing our server module. I’ll use Gradle as a build system.

Add these libraries to your dependencies:

1
2
3
4
5
6
dependencies {
    compile group: 'org.springframework', name: 'spring-context', version: '4.3.7.RELEASE'
    compile group: 'org.springframework.amqp', name: 'spring-rabbit', version: '1.7.0.RELEASE'
    compile group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.1'
    compile 'org.slf4j:jcl-over-slf4j:1.7.24'
}

Writing server application

Here’s our SimpleInterface implementation:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package me.molchanoff.amqp.server;

import me.molchanoff.amqp.common.SimpleInterface;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.util.UUID;

@Component
public class SimpleInterfaceImpl implements SimpleInterface {
    private final int nodeId;

    public SimpleInterfaceImpl(@Value(SpringConfig.NODE_ID_PROPERTY_EL) int nodeId) {
        log.info("Node with ID: {} started", nodeId);
        this.nodeId = nodeId;
    }

    private final static Logger log = LoggerFactory.getLogger(SimpleInterfaceImpl.class);

    @Override
    public String doJob(int clientId) {
        String uuid = UUID.randomUUID().toString();
        log.info("UUID {} generated on node {} upon request from client {}", uuid, nodeId, clientId);
        return uuid;
    }
}

And here’s our Spring Java configuration:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package me.molchanoff.amqp.server;

import me.molchanoff.amqp.common.SimpleInterface;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.remoting.service.AmqpInvokerServiceExporter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

@Configuration
@ComponentScan(basePackages = "me.molchanoff.amqp.server")
public class SpringConfig {

    public static final String NODE_ID_PROPERTY = "node";
    public static final String NODE_ID_PROPERTY_EL = "#{systemProperties[" + NODE_ID_PROPERTY + "]}";

    @Bean
    public ConnectionFactory connectionFactory() {
        return new CachingConnectionFactory("localhost", 5672);
    }

    @Bean
    public DirectExchange exchange() {
        return new DirectExchange("rpc");
    }

    @Bean
    public Queue queue() {
        return new Queue(SimpleInterface.class.getSimpleName());
    }

    @Bean
    public Binding binding(Queue queue, Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(SimpleInterface.class.getSimpleName()).noargs();
    }

    @Bean
    public RabbitAdmin admin(ConnectionFactory factory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(factory);

        return rabbitAdmin;
    }

    @Bean
    public RabbitTemplate template(ConnectionFactory factory) {
        RabbitTemplate template = new RabbitTemplate(factory);

        return template;
    }

    @Bean
    public AmqpInvokerServiceExporter exporter(RabbitTemplate template, SimpleInterface service) {
        AmqpInvokerServiceExporter exporter = new AmqpInvokerServiceExporter();
        exporter.setAmqpTemplate(template);
        exporter.setService(service);
        exporter.setServiceInterface(SimpleInterface.class);

        return exporter;
    }

    @Bean
    public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, Queue queue, AmqpInvokerServiceExporter exporter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueues(queue);
        container.setMessageListener(exporter);

        return container;
    }
}

As you can see here, RabbitMQ has to be running on standard 5672 port. We use node JVM property to pass node number as argument when starting our server app. In this config we also declare our exchange, queue and binding. Note, that we have to declare RabbitAdmin bean in order to get our exchange, queue and binding automatically created. The key bean here is AmqpInvokerServiceExporter and SimpleMessageListenerContainer. You need to declare every of them for every interface you want to export with Spring.

And finally, our main class:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
package me.molchanoff.amqp.server;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

public class ServerMain {

    private static final Logger log = LoggerFactory.getLogger(ServerMain.class);

    public static void main(String[] args) {
        log.info("Started");

        if (System.getProperty(SpringConfig.NODE_ID_PROPERTY) != null) {
            AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(SpringConfig.class);
        }
        else {
            log.error("node property is missing. Pass -Dnode=[node ID] argument");
        }
    }
}

Writing client application

Spring Java configuration for client app will be much simplier:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package me.molchanoff.amqp.client;

import me.molchanoff.amqp.common.SimpleInterface;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.remoting.client.AmqpProxyFactoryBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class SpringConfig {

    @Bean
    public ConnectionFactory connectionFactory() {
        return new CachingConnectionFactory("localhost", 5672);
    }

    @Bean
    public RabbitTemplate template() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        template.setExchange("rpc");

        return template;
    }

    @Bean
    public AmqpProxyFactoryBean proxy(RabbitTemplate template) {
        AmqpProxyFactoryBean proxy = new AmqpProxyFactoryBean();
        proxy.setAmqpTemplate(template);
        proxy.setServiceInterface(SimpleInterface.class);
        proxy.setRoutingKey(SimpleInterface.class.getSimpleName());

        return proxy;
    }
}

Here AmqpProxyFactoryBean is used to create proxy object for exported interface. You have to declare this bean for every exported interface you want to use.

Finally, our main class:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package me.molchanoff.amqp.client;

import me.molchanoff.amqp.common.SimpleInterface;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

public class ClientMain {

    private final static Logger log = LoggerFactory.getLogger(ClientMain.class);

    public static void main(String[] args) throws InterruptedException {
        log.info("Started");

        if (System.getProperty("client") != null) {
            int clientId = Integer.parseInt(System.getProperty("client"));
            AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(SpringConfig.class);
            SimpleInterface proxy = context.getBean(SimpleInterface.class);

            for (int i = 0; i < 10; i++) {
                String payload = proxy.doJob(clientId);
                log.info("Client {} received payload: {}", clientId, payload);
                Thread.sleep(2000);
            }
        }
        else {
            log.error("client property is missing. Pass -Dclient=[client ID] argument");
        }
    }
}

Launching applications

You can get full example project on GitHub here. In this example we will run RabbitMQ in Docker container. Official image is used: RabbitMQ Docker image.

If you have Docker installed, type docker-compose up to start RabbitMQ. You can then build project by typing ./gradlew jar.

After you have RabbitMQ running, you can launch as many client and server applications as you need. Just pass -Dclient={client#} JVM argument to client and -Dnode={node#} to server. These arguments are used to identify which client sent request and which server processed that request.

Launch server and client applications:

java -Dnode=1 -jar server/build/libs/server-1.0.jar

java -Dclient=1 -jar client/build/libs/client-1.0.jar

Server application output:

14:36:47.100 [main] INFO  me.molchanoff.amqp.server.ServerMain - Started
14:36:47.251 [main] INFO  o.s.c.a.AnnotationConfigApplicationContext - Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@47f37ef1: startup date [Mon Mar 13 14:36:47 MSK 2017]; root of context hierarchy
14:36:47.585 [main] INFO  m.m.amqp.server.SimpleInterfaceImpl - Node with ID: 1 started
14:36:47.817 [main] INFO  o.s.c.s.DefaultLifecycleProcessor - Starting beans in phase -2147482648
14:36:47.818 [main] INFO  o.s.c.s.DefaultLifecycleProcessor - Starting beans in phase 2147483647
14:36:47.907 [container-1] INFO  o.s.a.r.c.CachingConnectionFactory - Created new connection: SimpleConnection@479759bd [delegate=amqp://guest@127.0.0.1:5672/, localPort= 55058]
14:37:10.050 [container-1] INFO  m.m.amqp.server.SimpleInterfaceImpl - UUID 6dbf93bf-621d-4e7c-b703-908295f4507d generated on node 1 upon request from client 1
14:37:12.137 [container-1] INFO  m.m.amqp.server.SimpleInterfaceImpl - UUID 48f1d401-42fa-44a0-af1e-5e189930514f generated on node 1 upon request from client 1
14:37:14.155 [container-1] INFO  m.m.amqp.server.SimpleInterfaceImpl - UUID cbdbbd5c-6747-47bf-a15a-267066a40a16 generated on node 1 upon request from client 1
14:37:16.169 [container-1] INFO  m.m.amqp.server.SimpleInterfaceImpl - UUID 3cb93958-d670-4a53-bf36-c1d663a771c3 generated on node 1 upon request from client 1
14:37:18.186 [container-1] INFO  m.m.amqp.server.SimpleInterfaceImpl - UUID 75dc7c0b-2771-4305-ad8f-42c343f3e033 generated on node 1 upon request from client 1
14:37:20.205 [container-1] INFO  m.m.amqp.server.SimpleInterfaceImpl - UUID 42688ce1-a51a-4647-80e0-477195347599 generated on node 1 upon request from client 1
14:37:22.222 [container-1] INFO  m.m.amqp.server.SimpleInterfaceImpl - UUID 9a223770-b0fa-4f24-b3f9-b722b25364f0 generated on node 1 upon request from client 1
14:37:24.238 [container-1] INFO  m.m.amqp.server.SimpleInterfaceImpl - UUID c23afd04-56bb-42ef-bd0d-6f95f9ba93ca generated on node 1 upon request from client 1
14:37:26.249 [container-1] INFO  m.m.amqp.server.SimpleInterfaceImpl - UUID 4d917176-910d-4e2c-b53d-e65d61067c3d generated on node 1 upon request from client 1
14:37:28.262 [container-1] INFO  m.m.amqp.server.SimpleInterfaceImpl - UUID a59f7637-a557-42db-ab72-851a38e8e2a4 generated on node 1 upon request from client 1

Client application output:

14:37:09.182 [main] INFO  me.molchanoff.amqp.client.ClientMain - Started
14:37:09.370 [main] INFO  o.s.c.a.AnnotationConfigApplicationContext - Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@47f37ef1: startup date [Mon Mar 13 14:37:09 MSK 2017]; root of context hierarchy
14:37:09.771 [main] INFO  o.s.c.s.DefaultLifecycleProcessor - Starting beans in phase -2147482648
14:37:09.875 [main] INFO  o.s.a.r.c.CachingConnectionFactory - Created new connection: SimpleConnection@3c3d9b6b [delegate=amqp://guest@0:0:0:0:0:0:0:1:5672/, localPort= 55061]
14:37:10.126 [main] INFO  me.molchanoff.amqp.client.ClientMain - Client 1 received payload: 6dbf93bf-621d-4e7c-b703-908295f4507d
14:37:12.142 [main] INFO  me.molchanoff.amqp.client.ClientMain - Client 1 received payload: 48f1d401-42fa-44a0-af1e-5e189930514f
14:37:14.160 [main] INFO  me.molchanoff.amqp.client.ClientMain - Client 1 received payload: cbdbbd5c-6747-47bf-a15a-267066a40a16
14:37:16.173 [main] INFO  me.molchanoff.amqp.client.ClientMain - Client 1 received payload: 3cb93958-d670-4a53-bf36-c1d663a771c3
14:37:18.191 [main] INFO  me.molchanoff.amqp.client.ClientMain - Client 1 received payload: 75dc7c0b-2771-4305-ad8f-42c343f3e033
14:37:20.210 [main] INFO  me.molchanoff.amqp.client.ClientMain - Client 1 received payload: 42688ce1-a51a-4647-80e0-477195347599
14:37:22.226 [main] INFO  me.molchanoff.amqp.client.ClientMain - Client 1 received payload: 9a223770-b0fa-4f24-b3f9-b722b25364f0
14:37:24.242 [main] INFO  me.molchanoff.amqp.client.ClientMain - Client 1 received payload: c23afd04-56bb-42ef-bd0d-6f95f9ba93ca
14:37:26.254 [main] INFO  me.molchanoff.amqp.client.ClientMain - Client 1 received payload: 4d917176-910d-4e2c-b53d-e65d61067c3d
14:37:28.267 [main] INFO  me.molchanoff.amqp.client.ClientMain - Client 1 received payload: a59f7637-a557-42db-ab72-851a38e8e2a4

This project is available on my GitHub repository. Try yourself with more clients and servers and see the difference.