RabbitMQ 개념
메시지 송신 과정
🟢 1단계: 메시지 발송 메서드 (sendScheduleEventMessage)
public void sendScheduleEventMessage(RabbitMqEventBaseDto rabbitMqEventDto) {
try {
// 메시지 본문을 JSON으로 직렬화
String jsonMessage = JsonUtils.writeValue(rabbitMqEventDto);
// 지연 시간 (ms), 설정값에 따라 0 또는 지정된 값
Long expirationMs = noWaitAlarmDelayedQueue ? 0 : rabbitMqEventDto.getEventInfo().getDelayMilliSeconds();
// 메시지 발행 시간 기록 (ISO 형식)
String eventIssuedTime = LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME);
// 메시지 생성 (본문, 타입, 회사 UUID, 발행 시간, 지연 시간 포함)
Message message = MessageBuilder.withBody(jsonMessage.getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_JSON)
.setHeader(HEADER_MQ_MESSAGE_TYPE, rabbitMqEventDto.getMessageType())
.setHeader(HEADER_MQ_COMPANY_UUID, rabbitMqEventDto.getCompanyUuid())
.setHeader(HEADER_MQ_EVENT_ISSUED_TIME, eventIssuedTime)
.setExpiration(String.format("%d", expirationMs))
.build();
// 라우팅 키 생성 (발생 시간 기반)
String routingKey = makeRandomRoutingKey(rabbitMqEventDto.getEventInfo().getMqEventTriggerTime());
// 메시지 전송 (지연 익스체인지에 라우팅 키와 함께)
rabbitTemplate.send(SCHEDULE_DELAYED_EXCHANGE_NAME, routingKey, message);
log.debug("Message sent: {}", jsonMessage);
} catch (Exception e) {
// 예외 발생 시 로그 남기고 커스텀 예외 발생
throw new DopBatchException(RABBITMQ_PUBLISH_ERROR, e);
}
}
RabbitMq로 메시지를 보내는 방법은 간단합니다. 보낼 메시지를 직렬화 하고 메시지를 생성합니다. 필요한 속성들을 정의해주고 라우팅 키를 생성 후 전송하면 됩니다. rabbitTemplate는 스프링에서 제공됩니다.
메시지 수신 과정
🟢 1단계: 내 전용 큐 만들기
@Bean(name = "leaveEventQueue")
public Queue leaveEventQueue() {
return QueueBuilder
.durable(LEAVE_QUEUE_NAME_PREFIX + "_" + UUID.randomUUID()) // 서버 재시작에도 큐 유지 (하지만 아래 설정들과 함께 사용 시 의미 제한적)
.withArgument("x-expires", 10000) // 큐를 사용하지 않으면 10초 후 자동 삭제
.exclusive() // 현재 커넥션에서만 사용 가능, 커넥션 종료 시 자동 삭제
.build();
}
첫번 째로 메시지를 받을 큐를 생성합니다.
durable: 큐 정의를 디스크에 저장하여 RabbitMQ 재시작 시에도 남게 함.
-> 다만, exclusive + random UUID + x-expires 조합이라 실질적으론 일회성 큐.
x-expires: 지정된 시간(ms) 동안 사용하지 않으면 큐가 자동 삭제됨.
exclusive: 해당 커넥션에서만 접근 가능하며, 커넥션 종료 시 자동 삭제됨.
UUID: 큐 이름을 매번 다르게 만들어 중복 방지 및 독립성 확보.
🟠 2단계 : 익스체인지 만들고 큐랑 연결하기
코드 (익스체인지 + 바인딩)
@Bean(name = "leaveShardedExchange")
public CustomExchange leaveShardedExchange() {
return new CustomExchange(
LEAVE_SHARDED_EXCHANGE_NAME,
"x-modulus-hash", // 해시 기반 커스텀 익스체인지 타입
true, // durable: 서버 재시작 후에도 유지
false // autoDelete: 자동 삭제 안 함
);
}
@Bean(name = "leaveExchangeBinding")
public Binding leaveShardedExchangeBinding() {
String routingKey = "10"; // 샤딩 키 (예: 사용자 ID 또는 시간 단위로 매핑됨)
return new Binding(
leaveEventQueue().getName(), // 바인딩할 큐 이름
DestinationType.QUEUE, // 큐 대상으로 바인딩
leaveShardedExchange().getName(), // 연결할 익스체인지 이름
routingKey, // 라우팅 키
new HashMap<>() // 추가 인자 없음
);
}
다음으로 익스체인지와 바인딩 함수를 빈으로 등록합니다. CustomExchange 역시 스프링에서 제공됩니다.
📌 설명
- *익스체인지(Exchange)**는 메시지를 어디로 보낼지 결정해주는 우체국입니다.
- "x-modulus-hash" 타입은 여러 큐 중에서 routingKey 값을 해싱해서 분산 처리하는 방식이에요.
rabbitTemplate.convertAndSend(LEAVE_SHARDED_EXCHANGE_NAME", "10", messageBody);
상대가 위 코드처럼 exchange 이름을 통해 메시지를 전송하면 해당 큐를 찾아갈 수 있습니다.
- x-modulus-hash:
- RabbitMQ 플러그인을 통해 제공되는 샤딩 전용 커스텀 익스체인지 타입
- 라우팅 키의 해시값을 기반으로 메시지를 균등하게 분산
- routingKey = "10":
- x-modulus-hash에서는 이 값이 내부적으로 mod 연산의 대상이 되어 큐로 분배됨
- 다수의 큐에 분산하기 위해 다양한 키(예: "0" ~ "287" 등)를 사용
- Binding: 큐와 익스체인지를 라우팅 키를 기준으로 연결
🔵 3단계: 메시지를 처리할 메서드 연결 (listenerAdapter)
🔹 코드
@Bean(name = "leaveListenerAdapter")
public MessageListenerAdapter listenerAdapter(RabbitMQLeaveTaskEventReceiver receiver) {
MessageListenerAdapter adapter = new CustomMessageListenerAdapter(receiver); // 메시지를 처리할 리시버 클래스 지정
adapter.setDefaultListenerMethod("handleMessage"); // 메시지 수신 시 호출할 메서드 이름 지정
return adapter;
}
다음으로 메시지를 받았을 때 어떤 메서드를 실행할 건지 입력해주면 됩니다. RabbitMQLeaveTaskEventReceiver는 따로 만든 클래스이고 내부에는 handleMessage라는 메서드가 있습니다.
CustomMessageListnerAdapter는 스프링에서 제공해주는 MessageListnerAdapter를 extends 하고 있고 handleMessage라는 메서드가 있는 클래스를 넘겨주면 메세지를 받아서 handleMessage 함수를 실행할 수 있습니다.
리스터 컨테이너가 메시지를 받으면 어댑터가 실행되고 어댑터 내부에 주입된 handleMessage 메서드가 실행됩니다.
📌 설명
- 이 부분은 “어떤 메서드를 실행할 건지 알려주는 어댑터”예요.
- receiver 객체가 메시지를 처리할 주체고, 그 안의 handleMessage() 메서드를 실행해요.
🔴 4단계: 메시지를 실제로 받을 리스너 등록 (messageListenerContainer)
🔹 코드
@Bean(name = "leaveMessageListenerContainer")
public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory, MessageListener listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory); // RabbitMQ 커넥션 팩토리 설정
container.setQueueNames(leaveEventQueue().getName()); // 메시지를 수신할 큐 이름
container.setMessageListener(listenerAdapter); // 메시지 수신 시 실행할 리스너
int maxThread = AsyncConfig.getMaxConcurrentTasks(); // 동시 소비자 수 설정
container.setConcurrentConsumers(maxThread); // 최소 동시 소비자 수
container.setMaxConcurrentConsumers(maxThread); // 최대 동시 소비자 수
container.setPrefetchCount(maxThread); // 메시지 선읽기 개수 (성능 최적화용)
return container;
}
다음으로 리스너 컨테이너 입니다. 역할은 큐에 메세지가 오면 감지하여 리스너에 알려주는 역할을 합니다.
[ Producer ]
│
▼
[ Exchange ] ────(routing key)────▶ [ Queue ]
▲
│
[ SimpleMessageListenerContainer ]
│
▼
[ MessageListenerAdapter (handleMessage) ]
📌 설명
- 이 리스너 컨테이너는 큐를 계속 지켜보고 있다가,
- 메시지가 오면 위에서 연결한 handleMessage() 를 실행합니다.
- 쉽게 말하면: “✋ 누가 메시지 오면 나한테 알려줘!” 라고 큐에 말해두는 거예요.
항목설명
SimpleMessageListenerContainer | 큐에서 지속적으로 메시지를 수신하는 기본 컨테이너 |
setQueueNames(...) | 리스닝할 큐 이름 설정 |
setMessageListener(...) | 메시지 수신 시 실행할 리스너 지정 (MessageListenerAdapter 사용) |
setConcurrentConsumers(...) | 동시에 메시지를 처리할 소비자(스레드) 수 설정 |
setPrefetchCount(...) | 소비자가 한 번에 미리 받아올 메시지 수 (성능에 영향) |
💥 마지막: 실제 메시지 처리 (handleMessage)
🔹 코드
@Override
public void handleMessage(byte[] body, Message message) {
// 비동기 작업 도우미 객체 생성 (Async 작업 모니터링용)
AsyncJoinUtils asyncJoinUtil = new AsyncJoinUtils(asyncSupportService);
// 수신된 메시지를 도메인 객체로 변환
AttendRabbitMqMessage attendMessage = AttendRabbitMqMessage.of(message);
// 비동기 작업 실행: 메시지 처리 로직을 별도 스레드 풀에서 실행
asyncJoinUtil.addAsyncJobs(
CompletableFuture.supplyAsync(
() -> asyncHandleMessage(message, attendMessage), // 비동기 처리 로직
AsyncConfig.getHashIndexedExecutor(attendMessage.getCompanyUuid()) // 회사 UUID 기반 해시로 스레드 풀 선택
)
);
// 등록된 모든 비동기 작업이 끝날 때까지 블로킹
asyncJoinUtil.waitAsyncJobsEnd();
}
마지막으로 리스너에 handleMessage 메서드를 구현하면 됩니다. 예시에서는 성능을 위하여 비동기로 구현했습니다.
📌 설명
- 메시지가 오면, 이 메서드가 실행돼요.
- AttendRabbitMqMessage.of(message) → 메시지를 읽어서 DTO 객체로 만듭니다.
- asyncHandleMessage(...) → 실제 처리 로직을 비동기로 실행합니다.
- waitAsyncJobsEnd() → 작업이 끝날 때까지 기다립니다.
비동기 예시
/**
* 특정 스레드에서만 실행이 보장 되도록 문자열을 해시하여 스레드의 인덱스를 반환한다.
* 보통 companyUuid를 해시하여 특정한 회사의 잡은 동일한 스레드에서 돌도록 한다.
* @param hashString 스레드 인덱스를 얻기 위해 해시 할 문자열
* @return ExecutorService
*/
public static ExecutorService getHashIndexedExecutor(String hashString) {
initHashExecutorService();
int threadIndex = Math.abs(hashString.hashCode()) % getDefaultCpuCount();
return hashExecutorService.get(threadIndex);
}