Spring AMQP + RabbitMQ message pub/sub example

목표

  • AMQP 프로토콜에 대한 이해
  • 메시지 전송 기반 시스템으로 음식점에 주문을 하는 간단한 시스템을 만들기
  • message queuing을 위해서 AMQP Broker인 RabbitMQ를 사용

AMQP란?

메시지 전송을 위한 프로토콜로 다음 그림을 보면 쉽게 이해가된다.

amqpAdvanced Message Queuing Protocol

  • Message
  • Exchange : 메시지를 담고 있으며, queue에 바인딩 시키는 라우터 역할을 한다. 종류는 다음과 같다.
    • Direct Exchange : routing key를 기반으로 큐에 전달
    • Fanout Exchange : 모든 큐에 전달
    • Topic Exchange : routing key 패턴으로 바인딩
    • Headers Exchange : 해더 속성을 사용하여 바인딩
  • Queue : 대기열
  • Binding : Exchange가 Queue에 바인딩하기 위한 rule

    RabbitMQ 사이트에 재밌는 예시가 있다.
    “Queue가 뉴욕의 한 목적지 라면, Exchange는 JFK 공항이고, Bindings는 목적지까지 가는 여러개의 길이다.”

Required

  • Java 11
  • RabbitMQ 3.8.14
  • Spring boot 2.4.3

Setup the RabbitMQ

  • RabbitMQ 다운로드
  • 다운로드하여 직접 설치해도 되지만 필자는 편의상 Docker로 실행하였다.
    1
    $> docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
    도커 실행 후 http://localhost:15672로 접속하여 로그인 화면이 나온다면 제대로 구동된 것이다.
    guest 계정으로 접속하면 다음과 같은 Rabbit Management 화면을 볼 수 있다.

amqp

Spring boot Application

  • Spring Initializr를 사용해 spring boot 프로젝트를 하나 만든다
    dependency에 spring-boot-starter-amqp를 추가해준다.

pom.xml :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.3</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>io.timpac</groupId>
<artifactId>amqpdemo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>amqpdemo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

Message Sender

  • 우선 메시지 송신을 위한 AMQP 관련 객체들을 bean으로 등록한다.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    package io.timpac.amqpdemo.config;

    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.TopicExchange;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;

    @Configuration
    public class OrderAmqpConfig {
    public static final String QUEUE_NAME = "ORDER_QUEUE";
    public static final String EXCHANGE_NAME = "ORDER_EXCHANGE";
    public static final String ROUTING_KEY_PREPEND = "IO.TIMPAC.";

    @Bean
    public Queue queue() {
    return new Queue(QUEUE_NAME);
    }

    @Bean
    public TopicExchange exchange() {
    return new TopicExchange(EXCHANGE_NAME);
    }

    @Bean
    public Binding binding(Queue queue, TopicExchange exchange) {
    return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_PREPEND + "#");
    }

    @Bean
    public MessageConverter messageConverter() {
    return new Jackson2JsonMessageConverter();
    }

    @Bean
    public AmqpTemplate amqpTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate template = new RabbitTemplate(connectionFactory);
    template.setMessageConverter(messageConverter());
    return template;
    }
    }

    queue() : “ORDER_QUEUE”란 이름을 가진 Queue를 만든다.
    exchange() : “ORDER_EXCHANGE”란 이름을 가진 Exchange를 만든다. 패턴으로 바인딩하기 위해 TopicExchange로 만들었다.
    binding() : Exchange와 Queue를 route key로 바인딩 시킨다. RabbitMQ는 메시지를 보낼 때 큐에 바인딩 하기 위해 이 객체를 사용한다.
    amqpTemplate() : RabbitTemplate을 등록해준다. json형식으로 보내기 위해서 messegeConverter로 Jackson2JsonMessageConverter를 사용하였다.


  • 주문 정보를 담을 ORDER 클래스를 만든다

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    package io.timpac.amqpdemo.dto;

    public class Order {
    private String orderId;
    private String menu;
    private int quantity;
    private int price;

    public String getOrderId() {
    return orderId;
    }
    public void setOrderId(String orderId) {
    this.orderId = orderId;
    }
    public String getMenu() {
    return menu;
    }
    public void setMenu(String menu) {
    this.menu = menu;
    }
    public int getQuantity() {
    return quantity;
    }
    public void setQuantity(int quantity) {
    this.quantity = quantity;
    }
    public int getPrice() {
    return price;
    }
    public void setPrice(int price) {
    this.price = price;
    }

    @Override
    public String toString() {
    return "Order [orderId=" + orderId + ", menu=" + menu + ", quantity=" + quantity + ", price=" + price + "]";
    }
    }

    대충 메뉴, 수량, 가격 등 간단하게 만들었다.


  • 주문 메시지를 전송할 Publisher를 만든다.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    package io.timpac.amqpdemo.publisher;

    import java.util.UUID;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.web.bind.annotation.PathVariable;
    import org.springframework.web.bind.annotation.PostMapping;
    import org.springframework.web.bind.annotation.RequestBody;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    import io.timpac.amqpdemo.config.OrderAmqpConfig;
    import io.timpac.amqpdemo.dto.Order;

    @RestController
    @RequestMapping("/order")
    public class OrderPublisherController {
    private final RabbitTemplate template;

    public OrderPublisherController(RabbitTemplate template) {
    this.template = template;
    }

    @PostMapping("{restaurant}")
    public String order(@RequestBody Order order, @PathVariable String restaurant) {
    order.setOrderId(UUID.randomUUID().toString());
    template.convertAndSend(OrderAmqpConfig.EXCHANGE_NAME, OrderAmqpConfig.ROUTING_KEY_PREPEND + restaurant, order);
    return "주문 완료";
    }
    }

    RabbitTemplate의 convertAndSend() 메서드를 이용해 message를 Rabbit 서버에 전송한다.
    인자로는 Exchange name, Routing Key, Playload가 들어간다.
    Exchange name에는 OrderAmqpConfig에서 지정한 EXCHANGE_NAME을 넣는다
    Routing Key는 음식점별로 큐를 분리하기 위해서 패턴 뒤에 음식점 이름을 패스인자로 받아서 넣었다
    Playload에는 주문정보가 담긴 Order 객체가 들어간다.

Send a Test Message

이제 메시지 전송 테스트를 해보자. POSTMAN을 사용하여 다음과 같이 요청했다.
amqp

내가 좋아하는 맘스터치 화이트갈릭버거를 신청했다. (맘스터치 광고 아님 ^^)

신청 후 다시 RabbitMQ 관리자 페이지를 확인해보자
amqp
위와 같이 큐에 메시지 하나가 들어가 있는걸 볼 수 있다. 리시버를 아직 등록하지 않았기 때문에 대기 상태에 있다.

Receiver

메시지를 받을 Receiver도 등록해보자

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package io.timpac.amqpdemo.receiver;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import io.timpac.amqpdemo.config.OrderAmqpConfig;
import io.timpac.amqpdemo.dto.Order;

@Component
public class Restaurant {

@RabbitListener(queues = OrderAmqpConfig.QUEUE_NAME)
public void receiveMessage(Order order) {
System.out.println("배달의 민족 주문~ : " + order.toString());
}
}

@RabbitListener 어노테이션을 쓰면 쉽게 리스너로 등록 가능하다

다시 POST 요청을 보내고 확인해보면 아래와 같이 메시지를 받은 것을 확인할 수 있다.

amqp

참조