웹 개발/Back End

RabbitMQ 개념

L3m0n S0ju 2025. 4. 20. 21:00

 

 

 

메시지 송신 과정

🟢 1단계: 메시지 발송 메서드 (sendScheduleEventMessage)

 

public void sendScheduleEventMessage(RabbitMqEventBaseDto rabbitMqEventDto) {
    try {
        // 메시지 본문을 JSON으로 직렬화
        String jsonMessage = JsonUtils.writeValue(rabbitMqEventDto);

        // 지연 시간 (ms), 설정값에 따라 0 또는 지정된 값
        Long expirationMs = noWaitAlarmDelayedQueue ? 0 : rabbitMqEventDto.getEventInfo().getDelayMilliSeconds();

        // 메시지 발행 시간 기록 (ISO 형식)
        String eventIssuedTime = LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME);

        // 메시지 생성 (본문, 타입, 회사 UUID, 발행 시간, 지연 시간 포함)
        Message message = MessageBuilder.withBody(jsonMessage.getBytes())
                .setContentType(MessageProperties.CONTENT_TYPE_JSON)
                .setHeader(HEADER_MQ_MESSAGE_TYPE, rabbitMqEventDto.getMessageType())
                .setHeader(HEADER_MQ_COMPANY_UUID, rabbitMqEventDto.getCompanyUuid())
                .setHeader(HEADER_MQ_EVENT_ISSUED_TIME, eventIssuedTime)
                .setExpiration(String.format("%d", expirationMs))
                .build();

        // 라우팅 키 생성 (발생 시간 기반)
        String routingKey = makeRandomRoutingKey(rabbitMqEventDto.getEventInfo().getMqEventTriggerTime());

        // 메시지 전송 (지연 익스체인지에 라우팅 키와 함께)
        rabbitTemplate.send(SCHEDULE_DELAYED_EXCHANGE_NAME, routingKey, message);

        log.debug("Message sent: {}", jsonMessage);
    } catch (Exception e) {
        // 예외 발생 시 로그 남기고 커스텀 예외 발생
        throw new DopBatchException(RABBITMQ_PUBLISH_ERROR, e);
    }
}

 

RabbitMq로 메시지를 보내는 방법은 간단합니다. 보낼 메시지를 직렬화 하고 메시지를 생성합니다. 필요한 속성들을 정의해주고 라우팅 키를 생성 후 전송하면 됩니다. rabbitTemplate는 스프링에서 제공됩니다.

 

 

 

 

 

 

 


메시지 수신 과정

🟢 1단계: 내 전용 큐 만들기

@Bean(name = "leaveEventQueue")
public Queue leaveEventQueue() {
    return QueueBuilder
        .durable(LEAVE_QUEUE_NAME_PREFIX + "_" + UUID.randomUUID()) // 서버 재시작에도 큐 유지 (하지만 아래 설정들과 함께 사용 시 의미 제한적)
        .withArgument("x-expires", 10000) // 큐를 사용하지 않으면 10초 후 자동 삭제
        .exclusive() // 현재 커넥션에서만 사용 가능, 커넥션 종료 시 자동 삭제
        .build();
}

 

첫번 째로 메시지를 받을 큐를 생성합니다. 

 

 

durable: 큐 정의를 디스크에 저장하여 RabbitMQ 재시작 시에도 남게 함.

-> 다만, exclusive + random UUID + x-expires 조합이라 실질적으론 일회성 큐.

x-expires: 지정된 시간(ms) 동안 사용하지 않으면 큐가 자동 삭제됨.

exclusive: 해당 커넥션에서만 접근 가능하며, 커넥션 종료 시 자동 삭제됨.

UUID: 큐 이름을 매번 다르게 만들어 중복 방지 및 독립성 확보.

 

 

 

 

 

 

🟠 2단계 : 익스체인지 만들고 큐랑 연결하기

 

코드 (익스체인지 + 바인딩)

@Bean(name = "leaveShardedExchange")
public CustomExchange leaveShardedExchange() {
    return new CustomExchange(
        LEAVE_SHARDED_EXCHANGE_NAME,
        "x-modulus-hash", // 해시 기반 커스텀 익스체인지 타입
        true,              // durable: 서버 재시작 후에도 유지
        false              // autoDelete: 자동 삭제 안 함
    );
}

@Bean(name = "leaveExchangeBinding")
public Binding leaveShardedExchangeBinding() {
    String routingKey = "10"; // 샤딩 키 (예: 사용자 ID 또는 시간 단위로 매핑됨)
    return new Binding(
        leaveEventQueue().getName(),           // 바인딩할 큐 이름
        DestinationType.QUEUE,                 // 큐 대상으로 바인딩
        leaveShardedExchange().getName(),      // 연결할 익스체인지 이름
        routingKey,                            // 라우팅 키
        new HashMap<>()                        // 추가 인자 없음
    );
}

 

다음으로 익스체인지와 바인딩 함수를 빈으로 등록합니다. CustomExchange 역시 스프링에서 제공됩니다.

 

📌 설명

  • *익스체인지(Exchange)**는 메시지를 어디로 보낼지 결정해주는 우체국입니다.
  • "x-modulus-hash" 타입은 여러 큐 중에서 routingKey 값을 해싱해서 분산 처리하는 방식이에요.

 

rabbitTemplate.convertAndSend(LEAVE_SHARDED_EXCHANGE_NAME", "10", messageBody);

 

상대가 위 코드처럼 exchange 이름을 통해 메시지를 전송하면 해당 큐를 찾아갈 수 있습니다.

 

 

 

  • x-modulus-hash:
    • RabbitMQ 플러그인을 통해 제공되는 샤딩 전용 커스텀 익스체인지 타입
    • 라우팅 키의 해시값을 기반으로 메시지를 균등하게 분산
  • routingKey = "10":
    • x-modulus-hash에서는 이 값이 내부적으로 mod 연산의 대상이 되어 큐로 분배됨
    • 다수의 큐에 분산하기 위해 다양한 키(예: "0" ~ "287" 등)를 사용
  • Binding: 큐와 익스체인지를 라우팅 키를 기준으로 연결

 

 

 

 

 

 

🔵 3단계: 메시지를 처리할 메서드 연결 (listenerAdapter)

🔹 코드

@Bean(name = "leaveListenerAdapter")
public MessageListenerAdapter listenerAdapter(RabbitMQLeaveTaskEventReceiver receiver) {
    MessageListenerAdapter adapter = new CustomMessageListenerAdapter(receiver); // 메시지를 처리할 리시버 클래스 지정
    adapter.setDefaultListenerMethod("handleMessage"); // 메시지 수신 시 호출할 메서드 이름 지정
    return adapter;
}

 

다음으로 메시지를 받았을 때 어떤 메서드를 실행할 건지 입력해주면 됩니다. RabbitMQLeaveTaskEventReceiver는 따로 만든 클래스이고 내부에는 handleMessage라는 메서드가 있습니다.

 

CustomMessageListnerAdapter는 스프링에서 제공해주는 MessageListnerAdapter를 extends 하고 있고 handleMessage라는 메서드가 있는 클래스를 넘겨주면 메세지를 받아서 handleMessage 함수를 실행할 수 있습니다.

 

리스터 컨테이너가 메시지를 받으면 어댑터가 실행되고 어댑터 내부에 주입된 handleMessage 메서드가 실행됩니다.

 

 

📌 설명

  • 이 부분은 “어떤 메서드를 실행할 건지 알려주는 어댑터”예요.
  • receiver 객체가 메시지를 처리할 주체고, 그 안의 handleMessage() 메서드를 실행해요.

 

 

 

 

🔴 4단계: 메시지를 실제로 받을 리스너 등록 (messageListenerContainer)

🔹 코드

@Bean(name = "leaveMessageListenerContainer")
public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory, MessageListener listenerAdapter) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);              // RabbitMQ 커넥션 팩토리 설정
    container.setQueueNames(leaveEventQueue().getName());          // 메시지를 수신할 큐 이름
    container.setMessageListener(listenerAdapter);                 // 메시지 수신 시 실행할 리스너

    int maxThread = AsyncConfig.getMaxConcurrentTasks();           // 동시 소비자 수 설정
    container.setConcurrentConsumers(maxThread);                   // 최소 동시 소비자 수
    container.setMaxConcurrentConsumers(maxThread);                // 최대 동시 소비자 수
    container.setPrefetchCount(maxThread);                         // 메시지 선읽기 개수 (성능 최적화용)

    return container;
}

 

다음으로 리스너 컨테이너 입니다. 역할은 큐에 메세지가 오면 감지하여 리스너에 알려주는 역할을 합니다.

 

[ Producer ]
     │
     ▼
[ Exchange ] ────(routing key)────▶ [ Queue ]
                                         ▲
                                         │
                             [ SimpleMessageListenerContainer ]
                                         │
                                         ▼
                           [ MessageListenerAdapter (handleMessage) ]

 

 

📌 설명

  • 이 리스너 컨테이너는 큐를 계속 지켜보고 있다가,
  • 메시지가 오면 위에서 연결한 handleMessage() 를 실행합니다.
  • 쉽게 말하면: “✋ 누가 메시지 오면 나한테 알려줘!” 라고 큐에 말해두는 거예요.

 

항목설명

SimpleMessageListenerContainer 큐에서 지속적으로 메시지를 수신하는 기본 컨테이너
setQueueNames(...) 리스닝할 큐 이름 설정
setMessageListener(...) 메시지 수신 시 실행할 리스너 지정 (MessageListenerAdapter 사용)
setConcurrentConsumers(...) 동시에 메시지를 처리할 소비자(스레드) 수 설정
setPrefetchCount(...) 소비자가 한 번에 미리 받아올 메시지 수 (성능에 영향)

 

 

 

 

 

💥 마지막: 실제 메시지 처리 (handleMessage)

 

🔹 코드

@Override
public void handleMessage(byte[] body, Message message) {
    // 비동기 작업 도우미 객체 생성 (Async 작업 모니터링용)
    AsyncJoinUtils asyncJoinUtil = new AsyncJoinUtils(asyncSupportService);

    // 수신된 메시지를 도메인 객체로 변환
    AttendRabbitMqMessage attendMessage = AttendRabbitMqMessage.of(message);

    // 비동기 작업 실행: 메시지 처리 로직을 별도 스레드 풀에서 실행
    asyncJoinUtil.addAsyncJobs(
        CompletableFuture.supplyAsync(
            () -> asyncHandleMessage(message, attendMessage),                     // 비동기 처리 로직
            AsyncConfig.getHashIndexedExecutor(attendMessage.getCompanyUuid())    // 회사 UUID 기반 해시로 스레드 풀 선택
        )
    );

    // 등록된 모든 비동기 작업이 끝날 때까지 블로킹
    asyncJoinUtil.waitAsyncJobsEnd();
}

 

마지막으로 리스너에 handleMessage 메서드를 구현하면 됩니다. 예시에서는 성능을 위하여 비동기로 구현했습니다.

 

 

📌 설명

  • 메시지가 오면, 이 메서드가 실행돼요.
  • AttendRabbitMqMessage.of(message) → 메시지를 읽어서 DTO 객체로 만듭니다.
  • asyncHandleMessage(...) → 실제 처리 로직을 비동기로 실행합니다.
  • waitAsyncJobsEnd() → 작업이 끝날 때까지 기다립니다.

 

 

 

비동기 예시

    /**
     * 특정 스레드에서만 실행이 보장 되도록 문자열을 해시하여 스레드의 인덱스를 반환한다.
     * 보통 companyUuid를 해시하여 특정한 회사의 잡은 동일한 스레드에서 돌도록 한다.
     * @param hashString 스레드 인덱스를 얻기 위해 해시 할 문자열
     * @return ExecutorService
     */
    public static ExecutorService getHashIndexedExecutor(String hashString) {
        initHashExecutorService();
        int threadIndex = Math.abs(hashString.hashCode()) % getDefaultCpuCount();
        return hashExecutorService.get(threadIndex);

    }