Spring AMQP + RabbitMQ message pub/sub example
목표
- AMQP 프로토콜에 대한 이해
- 메시지 전송 기반 시스템으로 음식점에 주문을 하는 간단한 시스템을 만들기
- message queuing을 위해서 AMQP Broker인 RabbitMQ를 사용
AMQP란?
메시지 전송을 위한 프로토콜로 다음 그림을 보면 쉽게 이해가된다.
Advanced Message Queuing Protocol
- Message
- Exchange : 메시지를 담고 있으며, queue에 바인딩 시키는 라우터 역할을 한다. 종류는 다음과 같다.
- Direct Exchange : routing key를 기반으로 큐에 전달
- Fanout Exchange : 모든 큐에 전달
- Topic Exchange : routing key 패턴으로 바인딩
- Headers Exchange : 해더 속성을 사용하여 바인딩
- Queue : 대기열
- Binding : Exchange가 Queue에 바인딩하기 위한 rule
RabbitMQ 사이트에 재밌는 예시가 있다.
“Queue가 뉴욕의 한 목적지 라면, Exchange는 JFK 공항이고, Bindings는 목적지까지 가는 여러개의 길이다.”
Required
- Java 11
- RabbitMQ 3.8.14
- Spring boot 2.4.3
Setup the RabbitMQ
- RabbitMQ 다운로드
- 다운로드하여 직접 설치해도 되지만 필자는 편의상 Docker로 실행하였다.도커 실행 후 http://localhost:15672로 접속하여 로그인 화면이 나온다면 제대로 구동된 것이다.
1
$> docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
guest 계정으로 접속하면 다음과 같은 Rabbit Management 화면을 볼 수 있다.
Spring boot Application
- Spring Initializr를 사용해 spring boot 프로젝트를 하나 만든다
dependency에 spring-boot-starter-amqp를 추가해준다.
pom.xml :
1 |
|
Message Sender
우선 메시지 송신을 위한 AMQP 관련 객체들을 bean으로 등록한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47package io.timpac.amqpdemo.config;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
public class OrderAmqpConfig {
public static final String QUEUE_NAME = "ORDER_QUEUE";
public static final String EXCHANGE_NAME = "ORDER_EXCHANGE";
public static final String ROUTING_KEY_PREPEND = "IO.TIMPAC.";
public Queue queue() {
return new Queue(QUEUE_NAME);
}
public TopicExchange exchange() {
return new TopicExchange(EXCHANGE_NAME);
}
public Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_PREPEND + "#");
}
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
public AmqpTemplate amqpTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(messageConverter());
return template;
}
}queue() : “ORDER_QUEUE”란 이름을 가진 Queue를 만든다.
exchange() : “ORDER_EXCHANGE”란 이름을 가진 Exchange를 만든다. 패턴으로 바인딩하기 위해 TopicExchange로 만들었다.
binding() : Exchange와 Queue를 route key로 바인딩 시킨다. RabbitMQ는 메시지를 보낼 때 큐에 바인딩 하기 위해 이 객체를 사용한다.
amqpTemplate() : RabbitTemplate을 등록해준다. json형식으로 보내기 위해서 messegeConverter로 Jackson2JsonMessageConverter를 사용하였다.
주문 정보를 담을 ORDER 클래스를 만든다
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38package io.timpac.amqpdemo.dto;
public class Order {
private String orderId;
private String menu;
private int quantity;
private int price;
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public String getMenu() {
return menu;
}
public void setMenu(String menu) {
this.menu = menu;
}
public int getQuantity() {
return quantity;
}
public void setQuantity(int quantity) {
this.quantity = quantity;
}
public int getPrice() {
return price;
}
public void setPrice(int price) {
this.price = price;
}
public String toString() {
return "Order [orderId=" + orderId + ", menu=" + menu + ", quantity=" + quantity + ", price=" + price + "]";
}
}대충 메뉴, 수량, 가격 등 간단하게 만들었다.
주문 메시지를 전송할 Publisher를 만든다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28package io.timpac.amqpdemo.publisher;
import java.util.UUID;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import io.timpac.amqpdemo.config.OrderAmqpConfig;
import io.timpac.amqpdemo.dto.Order;
public class OrderPublisherController {
private final RabbitTemplate template;
public OrderPublisherController(RabbitTemplate template) {
this.template = template;
}
public String order( Order order, String restaurant){
order.setOrderId(UUID.randomUUID().toString());
template.convertAndSend(OrderAmqpConfig.EXCHANGE_NAME, OrderAmqpConfig.ROUTING_KEY_PREPEND + restaurant, order);
return "주문 완료";
}
}RabbitTemplate의 convertAndSend() 메서드를 이용해 message를 Rabbit 서버에 전송한다.
인자로는 Exchange name, Routing Key, Playload가 들어간다.
Exchange name에는 OrderAmqpConfig에서 지정한 EXCHANGE_NAME을 넣는다
Routing Key는 음식점별로 큐를 분리하기 위해서 패턴 뒤에 음식점 이름을 패스인자로 받아서 넣었다
Playload에는 주문정보가 담긴 Order 객체가 들어간다.
Send a Test Message
이제 메시지 전송 테스트를 해보자. POSTMAN을 사용하여 다음과 같이 요청했다.
내가 좋아하는 맘스터치 화이트갈릭버거를 신청했다. (맘스터치 광고 아님 ^^)
신청 후 다시 RabbitMQ 관리자 페이지를 확인해보자
위와 같이 큐에 메시지 하나가 들어가 있는걸 볼 수 있다. 리시버를 아직 등록하지 않았기 때문에 대기 상태에 있다.
Receiver
메시지를 받을 Receiver도 등록해보자
1 | package io.timpac.amqpdemo.receiver; |
@RabbitListener 어노테이션을 쓰면 쉽게 리스너로 등록 가능하다
다시 POST 요청을 보내고 확인해보면 아래와 같이 메시지를 받은 것을 확인할 수 있다.