본문 바로가기
프로그래밍/기타

CQRS(Command and Query Responsibility Segregation) 맛보기 - 실습편

by 사바라다 2022. 6. 25.

안녕하세요. 이전 포스팅에서 우리는 CQRS는 어떤것이고 어떤 장점과 단점을 가지고 있는지 함께 보는 시간을 가져보았습니다. 오늘은 CQRS를 한번 실습해보는 시간을 가져보도록 하겠습니다.

이전 시간에 CQRS를 설명할 때 두가지 모델뿐만 아니라 CQRS를 지속적으로 이어주기 위한 사이드적인 매커니즘들이 필요하다고 말씀드렸었습니다. 그러한 부분을 함께 보도록 하겠습니다.

1편 : CQRS(Command and Query Responsibility Segregation) 맛보기 - 이론편

환경

오늘 포스팅에서 사용할 환경은 아래와 같습니다. write DB의 DataSource는 MongoDB를 이용합니다. 그리고 Read 모델의 Repository는 Redis 라고 명시해 두었지만 실습이기 때문에 빠른 구현을 위해 메모리에 저장한다고 가정하고 임시로 Java의 Heap에 저장해두고 가져오는 방식을 채택하도록 하겠습니다.

  • MongoDB : 5.0.9
  • Spring Boot : 2.5.5
  • kotlin : 1.5.31
  • webflux
  • coroutine
  • JUnit 5

실습 목표

오늘의 실습 목표는 위의 이미지를 구현하는 것입니다. Write 모델을 생성하면 동기화 프로세스에 의해서 Projection 코드가 실행되며 이에 의해서 Read 모델에 들어가는 구조입니다. 실습은 아래의 순서로 진행되게됩니다.

  1. Write 모델 작성
  2. Read 모델 작성
  3. Projection 코드 작성
  4. 동기화 코드 작성
  5. 테스트 코드 작성후 테스트 진행

Write 모델과 Repository 작성

먼저 Write에 저장할 모델과 Repository 코드를 작성해보도록하겠습니다. MongoDB를 설정하는 방법에 대해서는 여기서 다루지는 않고 이후에 다른 포스팅에서 관련된 내용으로 찾아뵙도록 하겠습니다. User 라는 도메인 모델을 아래의 코드로 정의했습니다. 그리고 Spring Data Mongo를 사용하기 때문에 아래의 ReactvieCrudRepository를 implement 하여 실제 MongoDB에 접근하도록 하였습니다.

@Document
class User(
    var nickname: String,
    val signUpType: SignUpType,
    var status: UserStatus,
    var isSync: Boolean = false,
    val createdAt: Instant,
    var updatedAt: Instant
) {

    @Id
    override var id: ObjectId? = null

}

필드 하나에 대해서 확인을 한 후 넘어가도록 하겠습니다. isSync 필드입니다. 해당 필드는 현재 모델이 정상적으로 read Model과 동기화 되었는지에 대해 나타내는 필드입니다. 이것은 write 모델과 read 모델 사이의 확실한 동기화를 위해서 사용하는 NoSQL의 OutBoxPattern 방법입니다. 만약 RDBMS를 사용한다면 전용 테이블을 만들고 Transaction을 이용해서 묶기도 합니다.

interface UserRepository : ReactiveCrudRepository<User, ObjectId> {

    fun findAllByIsSyncFalse(): Flux<User>

}

실제 저장되는 데이터는 아래와 같습니다.

Read 모델 및 Reository 작성

Read 모델은 Redis의 Key-Value 자료구조에 데이터를 저장하는 방식을 채택하여 구현하도록 하겠습니다. 실제 Redis를 사용하진 않고 Heap에 HashMap을 만들고 거기에 InmemoryDB 처럼 사용하도록 하겠습니다. key-value 자료구조는 아래와 같습니다. value는 Redis의 Lists 자료구조를 사용한다고 봐주시면 될 것 같습니다.

key : User::{SignUpType}
value : MutableList<String> // nickname 

아래는 저장하기 위한 Repository 입니다. SignUpType과 nickName을 가져와서 SignUpType값을 기준으로 Key를 만들고 해당 key에 append 하는 형식을 가지고 있습니다. 또한 SignUpType을 기준으로 해당 하는 key의 value를 읽는 get 메서드도 함께 구현하였습니다.

class UserSignUpTypeRepository {

    val signUpTypeAssociationUser: MutableMap<String, MutableList<String>> = mutableMapOf()

    fun add(signUpType: SignUpType, nickname: String) {
        val userList = get(signUpType)
        userList.add(nickname)

        signUpTypeAssociationUser[getKey(signUpType)] = userList
    }

    fun get(signUpType: SignUpType): MutableList<String> {
        return signUpTypeAssociationUser[getKey(signUpType)] ?: mutableListOf()
    }

    fun getKey(signUpType: SignUpType): String = KEY.replace(SIGN_UP_TYPE, signUpType.name)

    companion object {
        const val SIGN_UP_TYPE = "{SIGN_UP_TYPE}"
        const val KEY = "User::$SIGN_UP_TYPE"
    }
}

Projection 코드 작성

Write 모델과 Read 모델의 코드를 모두 작성했으면 이제 이 두개의 모델을 연결시켜주는 Projection 코드를 작성하도록 하겠습니다. Projection 코드란 Write 모델을 input으로 Read 모델을 output으로 가지는 코드라고 봐주시면 됩니다. 아래와 같은 형태를 가질 수 있습니다.

@Component
class UserProjector(
    private val userSignUpTypeRepository: UserSignUpTypeRepository // user의 정보를 redis로 저장하고 읽기
) {

    fun project(user: User) {
        userSignUpTypeRepository.add(user.signUpType, user.nickname)
    }
}

동기화 코드 작성

위에서 함께 Write 모델, Read 모델 그리고 이를 변환시켜주는 Projection 코드를 작성해보았습니다. 다음으로 필요한 것이 Projection이 되는 트리거 코드가 필요합니다. 특정 트리거를 통해서 Projection 코드가 실행되어 Write 모델이 Read 모델로 변환되도록 해야하기 때문입니다. 트리거로 사용할 수 있는 방법으로는 아래와 같습니다.

  • 주기적으로 write를 읽어서 처리 ( 아웃박스패턴 )
  • write 할 때 read 에 넣도록 코드 작성
  • DB cdc를 이용하여 처리
  • 만약 Read 모델, Write 모델이 모두 MySQL Repository를 이용하고 있다면 내부의 trigger를 이용할 수도 있음

오늘 샘플에서는 Spring Scheduler를 통해 주기적으로 write를 읽어서 처리해보도록 하겠습니다. 코드는 아래와 같습니다. 1분에 1번씩 write 모델을 읽어서 readModel로 전달하는 것을 알 수 있습니다.

@Component
class UserSyncScheduler(
    private val userService: UserService,
    private val userProjector: UserProjector
) {

    @Scheduled(cron = "* */1 * * * *")
    suspend fun sync() {
        userService.getNeededSyncUser()
            .collect { user ->
                userProjector.project(user)
                userService.updateToSync(user)
            }
    }
}

실습용 테스트 코드 작성

테스트 코드를 작성해보도록 하겠습니다. 테스트 코드는 아래의 시나리오에 따라서 처리됩니다.

  1. user에 대한 save -> write 모델 등록
  2. 스케줄러 실행으로 read 모델 처리
  3. read 했을 때 모델 확인

2번의 경우 1분 스케줄러로 실행이 됩니다. 하지만 실제 테스트에서 1분씩 기다릴 수 없으므로 직접 실행하여 시간을 단축시킵니다.

@Autowired
private lateinit var userApiService: UserApiService

@Autowired
private lateinit var userSyncScheduler: UserSyncScheduler

@Autowired
private lateinit var userReadService: UserReadService

@Test
fun cqrsTest() = runTest {

    userApiService.save(
        nickname = "sabarada",
        signUpType = SignUpType.GOOGLE
    )

    userSyncScheduler.sync()

    val read = userReadService.read(SignUpType.GOOGLE)

    log.info { "read : $read" } // "read : [sabarada]"

}

마무리

오늘은 이렇게 CQRS를 실습해보는 시간을 가져보았습니다.

감사합니다.

참조

[1] https://www.baeldung.com/cqrs-event-sourcing-java

댓글