배경
Spring 환경에서 RabbitMQ를 이용해 다른 서버에 요청하고 거의 Restful API와 같이 즉각적인 응답을 받아야 하는 경우가 발생했다.
메세지큐의 경우 별도의 쓰레드에서 이를 받아 수행하는 예제가 일반적인데, 내 경우엔 스프링 접속자 쓰레드에서 응답까지 오고 이에 대한 리턴을 줘야 하는 상황이었다.
비동기식이 아닌 동기식 코드를 원했기에 반대편 응답에 따라 그 순간은 쓰레드가 블로킹될 수 있다는 걸 감안해서라도 꼭 필요한 기능이었다.
공식홈페이지를 통해 Direct Reply-to 방식을 알게 됐었는데 내용중에 나온 부분을 확인하는 과정에서, 인기가 많던 Request Reply 패턴을 알게 되어 이에 대해 작성해보고자 한다.
환경
- RabbitMQ 3.13
- 스프링 부트가 아닌 스프링 프레임워크 6.1 기반으로 진행
- A서버 ( Request )
- B서버 ( Reply )
구현
공통
Java Configuration 파일 내부에 설정
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost("{{RABBITMQ_HOST}}");
connectionFactory.setPort(5672);
connectionFactory.setUsername("{{RABBITMQ_USERNAME}}");
connectionFactory.setPassword("{{RABBITMQ_PASSWORD}}");
connectionFactory.setVirtualHost("/");
return connectionFactory;
}
@Bean
public AmqpAdmin amqpAdmin() {
RabbitAdmin rabbitAdmin = null;
try {
rabbitAdmin = new RabbitAdmin(connectionFactory());
// 시작과 동시에 큐 생성
rabbitAdmin.declareQueue(testRequestQueue());
}catch(Exception e){
System.out.println("###################################");
System.out.println("!!!! RabbitMQ 서버 연결에 실패하였습니다. !!!!");
System.out.println("###################################");
}
return rabbitAdmin;
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
// 아래 코드는 correlativeId를 사용할 때 쓰는 설정, B서버는 필수고 A서버는 안해도 된다.
rabbitTemplate.setUserCorrelationId(true);
return rabbitTemplate;
}
@Bean
public Queue testRequestQueue() {
final boolean isDurable = true;
final boolean isExclusive = false;
final boolean autoDelete = false;
return new Queue("test_request", isDurable, isExclusive, autoDelete);
}
위 설정에서 가장 중요한 부분은, rabbitTemplate.setUserCorrelationId(true) 부분이다.
위 설정을 하지 않으면 아무리 어떤 correlativeid 값을 넣어도 아마 테스트 시 무조건 "1"이 나올 것이다.
sendAndReceive 함수를 깊게 파보면 나오지만 userCorrelationId가 false로 되어있는 게 default이기 때문에 이로 인해 자동 증가 수로 처리되어 있다. 그렇다 보니 1로 나오는 것.
커스텀하게 correlationId를 주고 싶다면 정의한 rabbitTemplate 인스턴스에서 꼭 userCorrelationId 함수를 통해 true로 바꿔주자.
A서버
테스트코드
어차피 요청만 하면 되는 부분이라.. 그냥 테스트코드상에서 실행했다.
실제로 사용할 땐 call 함수 내부 코드만 참고하자.
@RequiredArgsConstructor
public class RabbitMQTest {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void call() throws IOException, InterruptedException, ExecutionException {
String message = "hello";
final String corrId = "2222";
System.out.println(corrId);
// 공유 요청 큐 이름
String requestQueueName = "test_request";
String replyQueueName = UUID.randomUUID().toString(); // 임시 대기열 이름 생성
MessageProperties props = new MessageProperties();
props.setReplyTo(replyQueueName);
props.setExpiration("5000");
props.setCorrelationId(corrId);
Message requestMessage = new Message(message.getBytes(), props);
// 요청 메시지 전송 및 응답 수신 (5초 대기)
Message responseMessage = rabbitTemplate.sendAndReceive("test_request", requestMessage);
if (responseMessage != null) {
System.out.println(new String(responseMessage.getBody()));
} else {
// 응답 없음 처리
//throw new TimeoutException("Response timeout");
System.out.println("타임아웃");
}
}
}
B서버
TestRabbitMQService.java
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import lombok.RequiredArgsConstructor;
import org.apache.logging.log4j.message.TimestampMessage;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
@Component
@RequiredArgsConstructor
public class TestRabbitMQService implements ChannelAwareMessageListener {
private final RabbitTemplate rabbitTemplate;
@Override
public void onMessage(Message message, Channel channel) throws JsonProcessingException {
//예시
try {
String replyTo = message.getMessageProperties().getReplyTo();
String correlationId = message.getMessageProperties().getCorrelationId();
System.out.println("corRelationId : " + correlationId);
// 요청 메시지 처리 로직
String responseMessage = "Response: " + replyTo;
System.out.println("받은 메세지 : " + responseMessage);
String reresponse = "BYE";
// 응답 메시지 전송
MessageProperties props = new MessageProperties();
props.setCorrelationId(correlationId); // 상관관계 ID 설정
Message responseMessageObj = new Message(reresponse.getBytes(), props);
rabbitTemplate.send(replyTo, responseMessageObj);
} catch (Exception e) {
rabbitTemplate.convertAndSend("", "error", new String(message.getBody(), StandardCharsets.UTF_8).trim());
}
}
}
JavaConfig 파일 내부 추가
@Bean
public SimpleMessageListenerContainer testMessageListenerContainer(CachingConnectionFactory connectionFactory, TestRabbitMQService receiver) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setAutoDeclare(true);
container.setQueueNames("test_request"); // 리스닝할 큐 이름 지정
container.setMessageListener(receiver); // 리스너 등록
container.setConcurrentConsumers(1); // 병렬 소비자 수 지정
container.setPrefetchCount(50); // 한 번에 받아올 메시지의 최대 수 지정
container.setDeclarationRetries(2);
return container;
}
임시로 Reply 받을 큐는 코드상에서와 같이 uuid로 처리했다.
실행 결과
A서버
B서버
동시성 이슈가 없도록 correlativeId와 reply QueueName을 uuid로 지정했지만, 실무에서 사용해도 되는지는 테스트가 필요하다.
직접 찾아보고 테스트하며 작성했지만, 물론 이 방식이 정답이 아닐 수 있습니다.
더 좋은 방법이 있거나 개선할 점이 있다면 댓글 부탁드립니다.
'자바 > Spring Framework' 카테고리의 다른 글
[Spring boot3] SpringDoc - OpenAPI Servers url 수정 (0) | 2024.10.02 |
---|---|
Spring Framework 6 RabbitMQ no queue in vhost 이슈 해결 (0) | 2024.03.13 |
스프링 6.1 로 업그레이드시 매개변수,빈 인식 불가현상 (0) | 2023.12.12 |
lucy-xss-servlet-filter Spring 6 용 JAR 빌드/적용 (2) (0) | 2023.08.17 |