본문 바로가기

💻Spring

Spring boot 3.2에서 Aws Sqs Temporary Queue를 이용한 요청 응답 구현하기

반응형

서론

정말 오랜만에 글을 쓴다 ! 

최근 헥사고날 아키텍쳐에대한 관심이 높아져 그간 진행해오던 사이드 프로젝트를 헥사고날 아키텍쳐로 변경해보며 공부를 하고 있다.

프로젝트는 도메인별로 분리하여 최종적으로는 MSA 형태처럼 구현해보는것이 목표이다. 

그러기 위해서는 각 도메인간 의존성이 존재하지 않아야 하기 때문에 각 도메인간 통신을 Spring에서 제공하는 EventPublisher를 통해 진행을 하였다. 

그러던 중 회원과 인증에 대한 도메인을 분리하여 회원가입 및 로그인 기능을 구현하고 있었는데, 인증 서버로 들어온 요청에 대해 토큰을 반환하는 API 작업 중, 회원 서버를 거쳐야하는 동기 로직이 필요했다. 일반적인 http 요청 응답 형식으로도 진행을 할 수 있었지만

Aws의 Sqs의 기능 중 Temporary Queue를 이용하여 요청 응답처럼 구현해서 사용 할 수 있는 방법을 알게되어 사용해보았다.

 

본론

- 해당 글은 Spring Boot 3.2.2 , Java 17 버전을 기준으로 작성되었습니다.

 

필자는 기존에 SQS를 사용해본 적도 없고, 공식 문서와 예제를 찾아봐도.. 자세한 구현 방법이 많이 나오질 않아 애를 먹었다.

기본 SQS 설정 방법에 예제와 코드를 분석해보며 정리한 글이니 설명에 틀린점이 있다면 꼭.. 댓글좀 부탁드립니다...

 

https://docs.aws.amazon.com/ko_kr/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-temporary-queues.html#request-reply-messaging-pattern 

 

Amazon SQS 임시 대기열 - Amazon Simple Queue Service

ReceiveMessage 작업을 포함하여 대기열에서 이루어진 모든 API 작업은 대기열을 비유휴 상태로 표시합니다.

docs.aws.amazon.com

 

https://devbksheen.tistory.com/entry/Spring-boot-3x-Amazon-SQS-%EC%A0%81%EC%9A%A9%EA%B8%B0

 

Spring boot 3.x + Amazon SQS 적용기

개요 업무상 Amazon SQS를 적용할 일이 있어 예전에 정리한 SQS 설정 방법대로 설정했지만 Listener이 작동하지 않는 이슈가 있었습니다. 서칭해본 결과 Spring Cloud AWS 2.x 버전은 Spring Boot 3.x에 완전히

devbksheen.tistory.com

위의 예제들을 참고하며 구현을 했다.

 

Aws의 SQS는 많이 들어봤겠지만, Aws에서 제공하는 Message Queue이다. 

그 중 Temporary Queue란, 일반적인 MQ 형식인 단방향 메세징 제공에서, 임시 대기열을 추가하여 응답을 받을 수 있게 해주는 방법 이라고 한다.

 

앞서 말한듯, 나는 인증 도메인으로 들어온 로그인 요청을 회원 도메인에서 이메일과 패스워드 체크 후 회원의 ID 값을 응답으로 받아와 토큰으로 변환하는 로직을 구현하려고 했기 때문에, 동기적으로 도메인간 요청, 응답이 필요했다.

 

 

예.. 그림을 굉장히 못그렸지만... 아무튼 도식화 하자면 이런 로직이다.

 

Aws SQS, IAM생성

우선 Aws의 SQS를 생성해야한다. 필자는 Aws 프리티어를 이용중이기 때문에 월 100만건의 메세지가 무료라고 한다.

 

AWS에 들어가 검색에 SQS를 입력하여 대기열 생성을 눌러준다. 

 

유형은 표준과 FIFO가 있는데, 자세한건 구글링하면 차이점이 나온다 ! 필자는 표준으로 진행하였다.

이름도 원하는대로 작성을하고 하위 항목들은 암호화만 비활성화로 처리 후 기본으로 설정 후에 대기열을 생성하였다.

 

 

해당 SQS에 접근하기 위해서는 AWS의 Credentials과 Region을 입력해주어야 하기 때문에 IAM을 설정하도록 한다.

필자는 기존에 S3 설정을 하며 사용하던 IAM이 있기 때문에 해당 계정에 SQS 권한을 추가해주었다

 

사용자 생성 -> 직접 정책 연결 -> SQS 입력 -> SQSFullAccess를 선택 후 정책을 생성해준다.

 

설정 후 public 액세스키와 private 액세스키가 나오는데, 이 키를 property에 입력해주어야 하니 따로 복사해둔다.

 

이제 SQS 및 IAM 설정을 마쳤으니 구현을 해보자.

 

의존성 설정

build.gradle에 sqs, temporary queue와 관련된 dependency를 추가해준다.

// aws sqs
implementation platform("io.awspring.cloud:spring-cloud-aws-dependencies:3.0.1")
implementation 'io.awspring.cloud:spring-cloud-aws-starter-sqs'

// aws temporary queue
implementation 'com.amazonaws:amazon-sqs-java-temporary-queues-client:2.0.1'

 

 

Property 설정

SQS 설정을 위해 application.yml에 방금 복사해둔 공개, 비공개 액세스키를 입력한다

위 블로그에서 써있듯 Spring Cloud 3.x 버전에서는 다음과 같이 property 계층을 사용한다.

저 sqs 항목은 필수 설정은 아니지만, 구현 코드에서 사용하기 때문에 선언을 해주었다.

 

Bean 등록

설정한 property를 기반으로 Sqs 라이브러리를 사용하기 위해 빈을 등록해준다. 

Config 파일을 생성해준다.

@Import(SqsBootstrapConfiguration.class)
@Configuration
public class AwsSqsConfig {

    @Value("${spring.cloud.aws.credentials.access-key}")
    private String AWS_ACCESS_KEY;

    @Value("${spring.cloud.aws.credentials.secret-key}")
    private String AWS_SECRET_KEY;

    @Value("${spring.cloud.aws.region.static}")
    private String AWS_REGION;
    @Value("${spring.cloud.aws.sqs.endpoint}")
    private String requestQueueUrl;

    // sqs Client For Sync
    @Bean
    public SqsClient sqsClient() {
        return SqsClient.builder()
                .credentialsProvider(() -> new AwsCredentials() {
                    @Override
                    public String accessKeyId() {
                        return AWS_ACCESS_KEY;
                    }
                    @Override
                    public String secretAccessKey() {
                        return AWS_SECRET_KEY;
                    }
                })
                .region(Region.of(AWS_REGION))
                .build();
    }

    // Sync Aws Publisher
    @Bean
    public AmazonSQSRequester publisherFactory() {
        return AmazonSQSRequesterClientBuilder.standard()
                .withAmazonSQS(sqsClient())
                .build();
    }

    // Sync Aws Consumer
    @Bean
    public AmazonSQSResponder consumerFactory() {
        return AmazonSQSResponderClientBuilder.standard()
                .withAmazonSQS(sqsClient())
                .build();
    
}

 

위와 같이 SqsClient, Publisher와 Consumer용 빈을 등록해준다.

 

Publisher

@MessageQueue
class LoginMemberMessagePublisher implements LoadMemberInfoPort {
    private final AmazonSQSRequester sqsRequester;
    private final String requestQueueUrl;

    public LoginMemberMessagePublisher(@Value("${spring.cloud.aws.sqs.endpoint}") String requestQueueUrl, AmazonSQSRequester sqsRequester) {
        this.sqsRequester = sqsRequester;
        this.requestQueueUrl = requestQueueUrl;
    }
    
    @Override
    // 회원 도메인과 통신하여 memberId를 가져온다.
    public Long signIn(LogInRequestDto requestDto) {
        String body = new Gson().toJson(requestDto);
        SendMessageRequest request = SendMessageRequest.builder()
                .queueUrl(requestQueueUrl)
                .messageBody(body)
                .build();
         try {
            Message reply = sqsRequester.sendMessageAndGetResponse(request,2, TimeUnit.SECONDS);
            return Long.parseLong(reply.body());
        } catch (SdkClientException | TimeoutException e) {
             log.error(e.getMessage());
            throw new MessagingException(ErrorMessage.COMMUNICATION_EXCEPTION.getMessage());
        } catch (NumberFormatException e) {
             throw new IllegalArgumentException(e.getMessage());
         }
//        finally {
//            sqsRequester.shutdown();
//        }
    }
}

 

@MessageQueue는 빈으로 등록하기 위해 생성한 커스텀 어노테이션이다 

빈으로 등록한 Requester를 Publisher에서 사용하고,

Request Url은 테스트레벨에서 변경하여 사용 예정이므로 생성자로 받아준다.

sqsRequester을 이용하여 sendMessageAndGetResponse 메서드를 통하여 응답을 반환받을 수 있고,

requester의 경우 임시 대기열을 사용하지 않을 경우 shutdown을 호출해주어야 하기 때문에 finally 메서드로 선언을 해주었다. 

-> shutdown()을 저렇게 호출해주니 RequestQueue를 찾지 못하는 현상이 발생하여 주석처리 하였고, 해당 메서드에 대해 추후 확인이 필요 할 것 같다. 

Consumer (Listener)

@MessageQueue
@RequiredArgsConstructor
public class LoginMemberMessageListener {
    private final AmazonSQSResponder sqsResponder;
    private final SignInUseCase signInUseCase;

    @SqsListener("${spring.cloud.aws.sqs.queue-name}")
    public void messageListener(Message message) {
        handleLoginRequest(message);
    }

    private void handleLoginRequest(Message message) {
        LoginMemberRequestDto requestDto = new Gson().fromJson(message.body(), LoginMemberRequestDto.class);
        try {
            Long memberId = signInUseCase.signIn(requestDto.email(), requestDto.password());
            sqsResponder.sendResponseMessage(MessageContent.fromMessage(message),new MessageContent(String.valueOf(memberId)));
        } catch (IllegalArgumentException ex) {
            sqsResponder.sendResponseMessage(MessageContent.fromMessage(message),new MessageContent(ex.getMessage()));
        }
    }
}

 

Listener에서는 @SqsListener를 통해 요청 큐에대한 데이터를 수신하고, 

Responder을 통해 임시 대기열을 생성하여 Request를 보낸쪽으로 응답을 보낼 수 있다.

Responder에도 shutdown() 메서드가 있었으나, 구현체를 보니 비어있기 때문에 Listener에서는 별다른 처리를 해주지 않았다.

 

 

Test

테스트엔 AwsConfig Bean만 로드시킨 후 ,

테스트를 개별적으로 진행하기 위해 테스트 시작 전과 후 큐를 추가하고 삭제해주었다.

(큐의 삭제는 60초에 한 번 가능하기 때문에 Before/AfterEach가 아닌 Before/AfterAll로 진행)

 

Publisher Test

@SpringBootTest(classes = AwsSqsConfig.class)
class LoginMemberMessagePublisherTest {
    private static final String QUEUE_NAME = "respalMessageTestQueue";
    @Autowired
    AmazonSQSRequester requester;
    @Autowired
    AmazonSQSResponder responder;
    private static String queueUrl;

    @BeforeAll
    private static void beforeAll(@Autowired AmazonSQSRequester requester) {
        queueUrl = requester.getAmazonSQS()
                .createQueue(CreateQueueRequest
                        .builder()
                        .queueName(QUEUE_NAME)
                        .build())
                .queueUrl();
    }

    @AfterAll
    private static void afterEach(@Autowired AmazonSQSResponder responder) {
        responder.getAmazonSQS()
                .deleteQueue(DeleteQueueRequest
                        .builder()
                        .queueUrl(queueUrl)
                        .build());
    }

    @Nested
    @DisplayName("로그인 이벤트 발송 테스트")
    class SignIn {
        private static final String EMAIL = "ailiartsual2@gmail.com";
        private static final String PASSWORD = "123456789a!";

        LoginMemberMessagePublisher loginMemberMessagePublisher = new LoginMemberMessagePublisher(queueUrl, requester);
        LogInRequestDto logInRequestDto = new LogInRequestDto(EMAIL, PASSWORD);
        Message message = Message.builder()
                .body(new Gson().toJson(logInRequestDto))
                .messageAttributes(Map.of(Constants.RESPONSE_QUEUE_URL_ATTRIBUTE_NAME,
                        MessageAttributeValue.builder()
                                .stringValue(queueUrl)
                                .build()))
                .build();

        @Test
        @DisplayName("로그인시 회원 ID를 받아온다.")
        void test1() {
            //given
            Long memberId = 0L;
            ExecutorService executorService = Executors.newSingleThreadExecutor();
            ExecutorService executorService2 = Executors.newSingleThreadExecutor();

            executorService.submit(() -> {
                // 임의 전송
                responder.sendResponseMessage(MessageContent.fromMessage(message),new MessageContent(String.valueOf(memberId)));
            });

            executorService2.submit(() -> {
                // when
                Long expectedMemberId = loginMemberMessagePublisher.signIn(logInRequestDto);

                //then
                assertThat(expectedMemberId).isEqualTo(memberId);
            });

            executorService.shutdown();
            executorService2.shutdown();
        }

        @Test
        @DisplayName("통신 간 Timeout Exception 발생 시 MessagingException이 발생한다.")
        void test2() throws TimeoutException {
            //given & when & then
            assertThatThrownBy(() -> loginMemberMessagePublisher.signIn(logInRequestDto))
                    .isInstanceOf(MessagingException.class)
                    .hasMessage(ErrorMessage.COMMUNICATION_EXCEPTION.getMessage());
        }

        @Test
        @DisplayName("응답받은 회원 ID가 회원 ID 타입 (long)이 아닌 경우 MessagingException이 발생한다.")
        void test3() throws TimeoutException {
            //given
            String memberId = "invalid";
            ExecutorService executorService = Executors.newSingleThreadExecutor();
            ExecutorService executorService2 = Executors.newSingleThreadExecutor();

            executorService.submit(() -> {
                responder.sendResponseMessage(MessageContent.fromMessage(message),new MessageContent(memberId));
            });

            //when & then
            executorService2.submit(() -> {
                assertThatThrownBy(() -> loginMemberMessagePublisher.signIn(logInRequestDto))
                        .isInstanceOf(MessagingException.class)
                        .hasMessage(ErrorMessage.COMMUNICATION_EXCEPTION.getMessage());
            });

            executorService.shutdown();
            executorService2.shutdown();
        }
    }
}

 

 

Publisher 테스트는 요청에 대한 응답이 필요하여 요청과 응답에 대해 개별 스레드를 추가하고,

스레드내에서 응답을 변경해가며 테스트를 진행했다. 

 

테스트 결과

 

Listener Test

@SpringBootTest(classes = AwsSqsConfig.class)
class LoginMemberMessageListenerTest {

    private static final String QUEUE_NAME = "respalMessageTestQueue";
    
    private static String queueUrl;
    
    @Autowired
    AmazonSQSResponder responder;

    @BeforeAll
    private static void beforeAll(@Autowired AmazonSQSResponder responder) {
        queueUrl = responder.getAmazonSQS().createQueue(CreateQueueRequest.builder().queueName(QUEUE_NAME).build()).queueUrl();
    }

    @AfterAll
    private static void afterEach(@Autowired AmazonSQSResponder responder) {
        responder.getAmazonSQS().deleteQueue(DeleteQueueRequest.builder().queueUrl(queueUrl).build());
    }

    @Nested
    @DisplayName("로그인 이벤트 핸들링 테스트")
    class HandleLoginRequest {
        private static final Long MEMBER_ID = 0L;
        private static final String EMAIL = "ailiartsual2@gmail.com";
        private static final String PASSWORD = "123456789a!";

        SignInUseCase signInUseCaseMock = mock(SignInUseCase.class);
        LoginMemberMessageListener loginMemberMessageListenerMock = new LoginMemberMessageListener(responder, signInUseCaseMock);
        LoginMemberRequestDto loginMemberRequestDto = new LoginMemberRequestDto(EMAIL, PASSWORD);
        Message message = Message.builder()
                .body(new Gson().toJson(loginMemberRequestDto))
                .messageAttributes(Map.of(Constants.RESPONSE_QUEUE_URL_ATTRIBUTE_NAME,
                        MessageAttributeValue.builder()
                                .stringValue(queueUrl)
                                .build()))
                .build();

        @Test
        @DisplayName("로그인 수신시 messageListener 에서 consume 후 응답한다.")
        void test1() {
            //given
            when(signInUseCaseMock.signIn(loginMemberRequestDto.email(), loginMemberRequestDto.password()))
                    .thenReturn(MEMBER_ID);

            //when
            loginMemberMessageListenerMock.messageListener(message);
            String response = receiveMessage();

            //then
            assertThat(response).isEqualTo(String.valueOf(MEMBER_ID));
        }

        @Test
        @DisplayName("비회원 로그인시 에러 메세지 응답")
        void test2() {
            //given
            when(signInUseCaseMock.signIn(loginMemberRequestDto.email(), loginMemberRequestDto.password()))
                    .thenThrow(new IllegalArgumentException(ErrorMessage.NOT_EXIST_MEMBER_EXCEPTION.getMessage()));

            //when
            loginMemberMessageListenerMock.messageListener(message);
            String response = receiveMessage();

            // then
            assertThat(response).isEqualTo(ErrorMessage.NOT_EXIST_MEMBER_EXCEPTION.getMessage());
        }


        @Test
        @DisplayName("비밀번호가 틀린 경우 에러 메세지 응답")
        void test3() {
            //given
            when(signInUseCaseMock.signIn(loginMemberRequestDto.email(), loginMemberRequestDto.password()))
                    .thenThrow(new IllegalArgumentException(ErrorMessage.NOT_MATCH_PASSWORD_EXCEPTION.getMessage()));

            //when
            loginMemberMessageListenerMock.messageListener(message);
            String response = receiveMessage();

            // then
            assertThat(response).isEqualTo(ErrorMessage.NOT_MATCH_PASSWORD_EXCEPTION.getMessage());
        }

        private String receiveMessage() {
            ReceiveMessageResponse receiveMessageResponse = responder.getAmazonSQS()
                    .receiveMessage(ReceiveMessageRequest.builder().queueUrl(queueUrl).maxNumberOfMessages(1).build());
            return receiveMessageResponse.messages().get(0).body();
        }
    }
}

 

Listener 테스트엔 임의로 메세지를 넣어 이벤트 수신 후 핸들링하여 응답이 잘 되는지 테스트를 진행하였다. 

 

테스트 결과

 

 

Postman Test

로그인에 성공한 경우

 

존재하지 않는 회원이 로그인 한 경우

 

비밀번호가 틀린 경우

 

결론

동기적으로 메세지큐를 사용하는건 결코 바람직하지 않은 사용법이겠지만,

의존성을 분리시키며 요청 응답 대기 시간동안 동기적으로 구현해야 하는 경우

위와 같이 Aws Sqs의 Temporary Queue를 이용 할 수 있었다.

하지만 바람직하지 않은 사용법 때문이였는지.. 구현 예제가 없어 코드를 일일이 찾아가보며 작업을 했고,,

특히 외부 라이브러리를 사용하기 때문에 테스트 코드 작성하는 방법이 굉장히 까다로웠던 것 같다.

더 공부를 해보자..!     

반응형