RabbitMQ Request Reply 패턴

2024. 4. 22. 16:20· 자바/Spring Framework
목차
  1. 배경
  2. 환경
  3. 구현
  4. 공통 
  5. A서버
  6. B서버
  7. 실행 결과
  8. A서버
  9. B서버

RabbitMQ 3.13 에서 테스트 진행

배경

Spring 환경에서 RabbitMQ를 이용해 다른 서버에 요청하고 거의 Restful API와 같이 즉각적인 응답을 받아야 하는 경우가 발생했다.

메세지큐의 경우 별도의 쓰레드에서 이를 받아 수행하는 예제가 일반적인데, 내 경우엔 스프링 접속자 쓰레드에서 응답까지 오고 이에 대한 리턴을 줘야 하는 상황이었다. 

비동기식이 아닌 동기식 코드를 원했기에 반대편 응답에 따라 그 순간은 쓰레드가 블로킹될 수 있다는 걸 감안해서라도 꼭 필요한 기능이었다. 

공식홈페이지를 통해 Direct Reply-to 방식을 알게 됐었는데 내용중에 나온 부분을 확인하는 과정에서, 인기가 많던 Request Reply 패턴을 알게 되어 이에 대해 작성해보고자 한다. 

환경

  • RabbitMQ 3.13
  • 스프링 부트가 아닌 스프링 프레임워크 6.1 기반으로 진행
  • A서버 ( Request ) 
  • B서버 ( Reply ) 

구현

공통 

Java Configuration 파일 내부에 설정 

    @Bean
    public CachingConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost("{{RABBITMQ_HOST}}");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("{{RABBITMQ_USERNAME}}");
        connectionFactory.setPassword("{{RABBITMQ_PASSWORD}}");
        connectionFactory.setVirtualHost("/");

        return connectionFactory;
    }

    @Bean
    public AmqpAdmin amqpAdmin() {
        RabbitAdmin rabbitAdmin = null;
        try {
            rabbitAdmin = new RabbitAdmin(connectionFactory());
            
            // 시작과 동시에 큐 생성
            rabbitAdmin.declareQueue(testRequestQueue());
         
        }catch(Exception e){
            System.out.println("###################################");
            System.out.println("!!!! RabbitMQ 서버 연결에 실패하였습니다. !!!!");
            System.out.println("###################################");
        }

        return rabbitAdmin;
    }


    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        
        // 아래 코드는 correlativeId를 사용할 때 쓰는 설정, B서버는 필수고 A서버는 안해도 된다.
        rabbitTemplate.setUserCorrelationId(true);
        return rabbitTemplate;
    }

    @Bean
    public Queue testRequestQueue() {
        final boolean isDurable = true;
        final boolean isExclusive = false;
        final boolean autoDelete = false;
        return new Queue("test_request", isDurable, isExclusive, autoDelete);
    }

위 설정에서 가장 중요한 부분은, rabbitTemplate.setUserCorrelationId(true) 부분이다.

위 설정을 하지 않으면 아무리 어떤 correlativeid 값을 넣어도 아마 테스트 시 무조건 "1"이 나올 것이다.

sendAndReceive 함수를 깊게 파보면 나오지만 userCorrelationId가 false로 되어있는 게 default이기 때문에 이로 인해 자동 증가 수로 처리되어 있다. 그렇다 보니 1로 나오는 것.

커스텀하게 correlationId를 주고 싶다면 정의한 rabbitTemplate 인스턴스에서 꼭 userCorrelationId 함수를 통해 true로 바꿔주자.

A서버

테스트코드

어차피 요청만 하면 되는 부분이라.. 그냥 테스트코드상에서 실행했다.

실제로 사용할 땐 call 함수 내부 코드만 참고하자.

@RequiredArgsConstructor
public class RabbitMQTest {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    public void call() throws IOException, InterruptedException, ExecutionException {
        String message = "hello";
        final String corrId = "2222";
        System.out.println(corrId);
        // 공유 요청 큐 이름
        String requestQueueName = "test_request";

        String replyQueueName = UUID.randomUUID().toString(); // 임시 대기열 이름 생성

        MessageProperties props = new MessageProperties();
        props.setReplyTo(replyQueueName);
        props.setExpiration("5000");
        props.setCorrelationId(corrId);

        Message requestMessage = new Message(message.getBytes(), props);

        // 요청 메시지 전송 및 응답 수신 (5초 대기)
        Message responseMessage = rabbitTemplate.sendAndReceive("test_request", requestMessage);

        if (responseMessage != null) {
            System.out.println(new String(responseMessage.getBody()));
        } else {
            // 응답 없음 처리
            //throw new TimeoutException("Response timeout");
            System.out.println("타임아웃");
        }
    }
}

B서버

TestRabbitMQService.java

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import lombok.RequiredArgsConstructor;
import org.apache.logging.log4j.message.TimestampMessage;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;

@Component
@RequiredArgsConstructor
public class TestRabbitMQService  implements ChannelAwareMessageListener {


    private final RabbitTemplate rabbitTemplate;

    @Override
    public void onMessage(Message message, Channel channel) throws JsonProcessingException {
        //예시
        try {
            String replyTo = message.getMessageProperties().getReplyTo();
            String correlationId = message.getMessageProperties().getCorrelationId();

            System.out.println("corRelationId : " + correlationId);
            // 요청 메시지 처리 로직
            String responseMessage = "Response: " + replyTo;
            System.out.println("받은 메세지 : " + responseMessage);

            String reresponse = "BYE";

            // 응답 메시지 전송
            MessageProperties props = new MessageProperties();
            props.setCorrelationId(correlationId); // 상관관계 ID 설정
            Message responseMessageObj = new Message(reresponse.getBytes(), props);
            rabbitTemplate.send(replyTo, responseMessageObj);

        } catch (Exception e) {
            rabbitTemplate.convertAndSend("", "error", new String(message.getBody(), StandardCharsets.UTF_8).trim());
        }


    }
}

JavaConfig 파일 내부 추가

    @Bean
    public SimpleMessageListenerContainer testMessageListenerContainer(CachingConnectionFactory connectionFactory, TestRabbitMQService receiver) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setAutoDeclare(true);
        container.setQueueNames("test_request"); // 리스닝할 큐 이름 지정
        container.setMessageListener(receiver); // 리스너 등록
        container.setConcurrentConsumers(1); // 병렬 소비자 수 지정
        container.setPrefetchCount(50); // 한 번에 받아올 메시지의 최대 수 지정
        container.setDeclarationRetries(2);
        return container;
    }

 

임시로 Reply 받을 큐는 코드상에서와 같이 uuid로 처리했다. 

실행 결과

A서버

각각 correlationId 응답메세지

B서버

B서버에서 받은 CorRelationId / replayTo / 받은 메세지

동시성 이슈가 없도록 correlativeId와 reply QueueName을 uuid로 지정했지만, 실무에서 사용해도 되는지는 테스트가 필요하다.

 

직접 찾아보고 테스트하며 작성했지만, 물론 이 방식이 정답이 아닐 수 있습니다.
더 좋은 방법이 있거나 개선할 점이 있다면 댓글 부탁드립니다.
반응형
저작자표시 비영리 변경금지 (새창열림)

'자바 > Spring Framework' 카테고리의 다른 글

[Spring boot3] SpringDoc - OpenAPI Servers url 수정  (1) 2024.10.02
Spring Framework 6 RabbitMQ no queue in vhost 이슈 해결  (0) 2024.03.13
스프링 6.1 로 업그레이드시 매개변수,빈 인식 불가현상  (0) 2023.12.12
lucy-xss-servlet-filter Spring 6 용 JAR 빌드/적용 (2)  (0) 2023.08.17
  1. 배경
  2. 환경
  3. 구현
  4. 공통 
  5. A서버
  6. B서버
  7. 실행 결과
  8. A서버
  9. B서버
'자바/Spring Framework' 카테고리의 다른 글
  • [Spring boot3] SpringDoc - OpenAPI Servers url 수정
  • Spring Framework 6 RabbitMQ no queue in vhost 이슈 해결
  • 스프링 6.1 로 업그레이드시 매개변수,빈 인식 불가현상
  • lucy-xss-servlet-filter Spring 6 용 JAR 빌드/적용 (2)
신·기록
신·기록
개발 관련 이슈 해결방안, 활용 방법, 제품 리뷰 등을 기록하는 개인 블로그
신·기록
문제해결집
신·기록
전체
오늘
어제
  • 분류 전체보기 (175)
    • 뉴스 (9)
    • 제품 리뷰 (19)
    • 운영체제 및 서버 (53)
      • Linux (27)
      • Mac OS (8)
      • Proxmox (6)
      • Nginx (2)
      • Raspberry Pi (3)
      • Odroid M2 (4)
    • AI 인공지능 (7)
    • 모바일 (4)
      • Flutter (1)
      • 안드로이드 (1)
    • 데이터베이스 (11)
      • Mysql (9)
    • 자바 (11)
      • Spring Framework (9)
    • 자바스크립트 (19)
      • Node JS (4)
      • Electron Framework (10)
      • React & Next.js (4)
    • 클라우드 플랫폼 (4)
    • 학습 (4)
    • 이슈 (21)
    • 비디오게임 에뮬레이터 (2)
    • CPU 성능 순위 (1)
    • 그래픽카드 성능 순위 (1)
    • 기타 (7)

블로그 메뉴

  • 이용 원칙
  • 태그
  • 방명록

인기 글

최근 글

08-03 12:24
관련 문의는 방명록에 부탁드립니다.
본 블로그의 스킨 출처는 아래와 같습니다.
hELLO · Designed By 정상우.v4.2.2
신·기록
RabbitMQ Request Reply 패턴
상단으로

티스토리툴바

개인정보

  • 티스토리 홈
  • 포럼
  • 로그인

단축키

내 블로그

내 블로그 - 관리자 홈 전환
Q
Q
새 글 쓰기
W
W

블로그 게시글

글 수정 (권한 있는 경우)
E
E
댓글 영역으로 이동
C
C

모든 영역

이 페이지의 URL 복사
S
S
맨 위로 이동
T
T
티스토리 홈 이동
H
H
단축키 안내
Shift + /
⇧ + /

* 단축키는 한글/영문 대소문자로 이용 가능하며, 티스토리 기본 도메인에서만 동작합니다.