Post

카프카핵심가이드 5장 - 프로그램 내에서 코드로 카프카 관리하기

카프카 핵심가이드 책을 읽은 내용 정리

AdminClient

클라이언트 어플리케이션에서 직접 Kafka 관리 명령을 내려야 하는 경우(아래의 예시)에 사용

  • 사용자 입력에 기반한 동적 토픽 생성
  • 토픽 존재 여부 확인 : 이벤트 기록 전 토픽이 존재하는지 검사

-> 이러한 작업을 쉽고 유연하게 처리하기 위해 AdminClient를 사용

비동기적이고 최종적 일관성을 가지는 API

카프카의 AdminClient비동기 적으로 작동하며 최종적 일관성을 지님

  • 비동기적으로 작동
    클러스터 컨트롤러로 요청을 전송한 뒤 Future객체를 리턴

Future 객체
비동기 작업의 결과이며 작업 결과 확인, 취소, 완료 대기, 완료 후 실행 함수 등을 지정하는 메서드를 가짐.

  • 최종적 일관성
    카프카 컨트롤러로부터 브로커의 메타데이터 전파가 비동기적으로 이루어지기 때문에, AdminClient API가 리턴하는 Future 객체들은 컨트롤러의 상태가 완전히 업데이트된 시점에서 완료된 것으로 간주

    그러나 이 시점에서 모든 브로커가 전부 다 새로운 상태에 대해 알고 있지는 못할 수 있기 때문에, listTopics 요청은 최신상태를 전달받지 않은 크로커에 의해 처리될 수 있음.

    작업이 처리되도 모든 브로커가 즉시 일관성있는 상태를 유지하지는 않지만, 시간이 지남에 따라 일관성을 유지하는 최종적 일관성의 특징을 지님

옵션

Options객체
AdminClinet의 메서드별로 특정한 Options객체를 인수로 받음

  • listTopics 메서드 -> ListTopicsOptions
  • describeCluster 메서드 -> DescribeClusterOptions

timeoutMs
모든 AdminClient 메서드가 가진 매개변수

  • 클러스터로부터 응답을 기다리는 시간 -> 이후 TimeoutException

수평구조

모든 어드민 작업은 KafkaAdminClient에 구현되어 있는 아파치 카프카 프로토콜을 사용해서 이루어짐

추가 참고 사항

대부분의 어드민 작업은 AdminClient를 통해서 수행되거나 아니면 주키퍼에 저장되어있는 메타데이터를 직접 수정하는 방식으로 이루어짐

주키퍼를 직접 수정하는 것을 절대 쓰면 안됨.

현재 주키퍼보다는 KRaft모드를 사용하는 것으로 업데이트 됨. 카프카 4.0 버전 부터는 Kraft라는 방식으로 주키퍼 없이 운영이 가능

AdminClient 사용법: 생성, 설정, 닫기

  • AdminClient 생성, 설정, 닫기
    1
    2
    3
    4
    5
    6
    7
    
      // 설정
      Properties props = new Properties();
      props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
      // 생성
      AdminClient admin = AdminClient.create(props);
      // 닫기
      admin.close(Duration.ofSeconds(30));
    

client.dns.lookup

카프카 2.1.0에서 추가됨
부트스트랩 서버 설정에 포함된 호스트명을 기준으로 연결을 검증, 해석, 생성

DNS 별칭을 사용하는 경우

DNS alias 사용하는 경우

  • 브로커들 broker1.com, broker2.com … -> all-brokers.com
  • SASL 인증시 호스트명이 달라 문제 발생

    SASL
    Simple Authentication and Security Layer는 인터넷 프로토콜에서 인증과 데이터보안을 위한 프레임워크(Kerberos, Password Server, Zookeeper, Oauth Server)

다수의 IP 주소로 연결되는 DNS 이름을 사용하는 경우

하나의 DNS에 여러 IP가 매핑된 경우

  • 첫번째 해석된 호스트명으로만 연결 시도
  • client.dns.lookup=use_all_dns_ips

request.timeout.ms

AdminClient가 응답을 기다리는 최대 시간을 정의(default: 120초)

필수적인 토픽 관리 기능

  • 클러스터에 있는 토픽 목록 조회
    get()은 서버가 토픽 이름 집합을 리턴할 때까지 대기
    1
    2
    
      ListTopicsResult topics = admin.listTopics();
      topics.names().get().forEach(System.out::println);
    
  • 토픽이 존재하는지 확인하고, 없으면 만드는 예제
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    
      // 토픽 확인
      DescribeTopicsResult demoTopic = admin.describeTopics(TOPIC_LIST);
    
      try {
          // topicDescription = demoTopic.values().get(TOPIC_NAME).get();
          // kafka 3.1부터는 values()는 중단. topicNameValues()를 사용
          topicDescription = demoTopic.topicNameValues().get(TOPIC_NAME).get();
          System.out.println("Description of demo topic:" + topicDescription);
    
          if (topicDescription.partitions().size() != NUM_PARTITIONS) {
              System.out.println("Topic has wrong number of partitions. Exiting.");
              System.exit(-1);
          }
      } catch (ExecutionException e) {
          // 종류를 막론하고 예외가 발생하면 바로 종료
          if (! (e.getCause() instanceof UnknownTopicOrPartitionException)) {
              e.printStackTrace();
              throw e;
          }
        
          // 여기까지 진행됐다면, 토픽은 존재하지 않음
          System.out.println("Topic " + TOPIC_NAME + " does not exist. Going to create it now");
    
          // 파티션 수와 레플리카 수는 선택 사항임에 유의. 만약 이 값들을 지정하지않으면 카프카 브로커에 설정된 기본값이 사용됨
          CreateTopicsResult newTopic = admin.createTopics(Collections.singletonList(new NewTopic(TOPIC_NAME, NUM_PARTITIONS, REP_FACTOR)));
            
          // 토픽이 제대로 생성됐는지 확인
          if (newTopic.numPartitions(TOPIC_NAME).get() != NUM_PARTITIONS) {
              System.out.println("Topic has wrong number of partitions.");
              System.exit(-1);
          }
      }
    
  • 토픽 삭제
    토픽은 삭제시 되돌릴 수 없으니 유의
    1
    2
    3
    4
    5
    6
    7
    8
    
      // 토픽이 삭제되었는지 확인.
      // 삭제작업이 비동기적으로 이루어지는 만큼 이 시점에서 토픽이 여전히 남아 있을 수 있음
      try {
          topicDescription = demoTopic.values().get(TOPIC_NAME).get();
          System.out.println("Topic " + TOPIC_NAME + " is still around");
      } catch (ExecutionException e) {
          System.out.println("Topic " + TOPIC_NAME + " is gone");
      }
    
  • 많은 어드민 요청을 처리해야하는 경우
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    
      vertx.createHttpServer().requestHandler(request -> {
          String topic = request.getParam("topic");
          String timeout = request.getParam("timeout");
          int timeoutMs = NumberUtils.toInt(timeout, 1000);
          // 토픽 설명을 비동기적으로 조회
          // 설정된 timeoutMs로 세팅
          DescribeTopicsResult demoTopic = admin.describeTopics(Collections.singletonList(topic), new DescribeTopicsOptions().timeoutMs(timeoutMs));
          // Future 작업이 완료되면 호출될 함수 생성
          // 비동기적으로 결과 처리 
          demoTopic.values().get(topic).whenComplete(new KafkaFuture.BiConsumer<TopicDescription, Throwable>() {
              @Override
              public void accept(final TopicDescription topicDescription, final Throwable throwable) {
                  if (throwable != null) {
                      request.response().end("Error trying to describe topic " + topic + " due to " + throwable.getMessage());
                  } else {
                      request.response().end(topicDescription.toString());
                  }
              }
          });
      }).listen(8080);
    

설정관리

ConfigResource 객체를 사용해서 할 수 있음

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient admin = AdminClient.create(props);
String TOPIC_NAME = "test";

// ConfigResource - 특정 토픽 설정 확인
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME);
DescribeConfigsResult configsResult = admin.describeConfigs(Collections.singleton(configResource));
Config configs = configsResult.all().get().get(configResource);

// isDefault() - 기본 값 여부 확인
// 기본값이 아닌 설정을 출력
configs.entries().stream().filter(entry -> !entry.isDefault()).forEach(System.out::println);

// 토픽에 압착 설정이 되어있는지 확인
ConfigEntry compaction = new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);

if (!configs.entries().contains(compaction)) {
    // 트픽에 압착 설정이 되어 있지 않을 경우 해줌
    Collection<AlterConfigOp> configOp = new ArrayList<AlterConfigOp>();
    configOp.add(new AlterConfigOp(compaction, AlterConfigOp.OpType.SET));
    Map<ConfigResource, Collection<AlterConfigOp>> alterConf = new HashMap<>();
    alterConf.put(configResource, configOp);
    admin.incrementalAlterConfigs(alterConf).all().get();
} else {
    System.out.println("Topic " + TOPIC_NAME + " is compacted topic");
}

컨슈머 그룹 관리

AdminClinet를 사용해서 프로그램적으로 컨슈머 그룹과 이 그룹들이 커밋한 오프셋을 조회, 수정하는 방법에 대해 설명

컨슈머 그룹 살펴보기

  • 컨슈머 그룹 목록 조회
    1
    2
    3
    4
    5
    
      Properties props = new Properties();
      props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
      AdminClient admin = AdminClient.create(props);
    
      admin.listConsumerGroups().valid().get().forEach(System.out::println);
    
  • 특정 그룹 상세정보 조회
    1
    2
    3
    4
    5
    6
    7
    8
    
      Properties props = new Properties();
      props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
      AdminClient admin = AdminClient.create(props);
    
      ConsumerGroupDescription groupDescription = admin
              .describeConsumerGroups(Arrays.asList("test1_consumer_group"))
              .describedGroups().get("test1_consumer_group").get();
      System.out.println("Description of group " + "test1_consumer_group" + ":" + groupDescription);
    
  • 커밋 정보 조회
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    
      Properties props = new Properties();
      props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
      AdminClient admin = AdminClient.create(props);
    
      // listConsumerGroupOffsets - 하나의 컨슈머 그룹의 마지막 커밋된 오프셋 벨류
      // key : 사용중인 모든 토픽 파티션
      // value : 마지막으로 커밋된 오프셋
    
      Map<TopicPartition, OffsetAndMetadata> offsets = admin.listConsumerGroupOffsets("test1_consumer_group").partitionsToOffsetAndMetadata().get();
      Map<TopicPartition, OffsetSpec> requestLatestOffsets = new HashMap<>();
    
      for(TopicPartition tp: offsets.keySet()) {
          // OffestSpec.latest() : 마지막 오프셋
          // OffestSpec.earliest : 파티션의 첫번째 오프셋
          // OffestSpec.forTimestamp() - 특정 시간이후의 오프셋
          requestLatestOffsets.put(tp, OffsetSpec.latest());
      }
    
      Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> latestOffsets = admin.listOffsets(requestLatestOffsets).all().get();
                
      for (Map.Entry<TopicPartition, OffsetAndMetadata> e: offsets.entrySet()) {
          String topic = e.getKey().topic();
          int partition = e.getKey().partition(); 
          long committedOffset = e.getValue().offset(); // 마지막 커밋 오프셋
          long latestOffset = latestOffsets.get(e.getKey()).offset(); // 파티션의 마지막 오프셋
          System.out.println("Consumer group " + "test1_consumer_group"
                  + " has committed offset " + committedOffset
                  + " to topic " + topic + " partition " + partition
                  + ". The latest offset in the partition is "
                              +  latestOffset + " so consumer group is "
                              + (latestOffset - committedOffset) + " records behind");
      }
    

    컨슈머 그룹 수정하기

  • 오프셋 리셋
    오프셋을 강제로 변경하기 때문에, 오프셋 변경 전에 처리된 값에 대응해야함.
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    
      Properties props = new Properties();
      props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
      AdminClient admin = AdminClient.create(props);
    
      // 각 파티션의 맨 앞 오프셋을 가져옴
      Map<TopicPartition, OffsetSpec> requestEarliestOffsets = new HashMap<>();
      requestEarliestOffsets.put(new TopicPartition("test11", 0), OffsetSpec.earliest());
      requestEarliestOffsets.put(new TopicPartition("test11", 1), OffsetSpec.earliest());
    
      Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> earliestOffsets = admin.listOffsets(requestEarliestOffsets).all().get();
      Map<TopicPartition, OffsetAndMetadata> resetOffsets = new HashMap<>();
    
      // listOffests의 ListOffestResultInfo -> OffsetAndMetaData 객체로 변환
      // OffsetAndMetaData - alterConsumerGroupOffests 호출에 필요
      for (Map.Entry<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> e: earliestOffsets.entrySet()) {
          resetOffsets.put(e.getKey(), new OffsetAndMetadata(e.getValue().offset()));
      }
    
      try {
          // get()은 서버가 토픽 이름 집합을 리턴할 때까지 대기
          admin.alterConsumerGroupOffsets("test1_consumer_group", resetOffsets).all().get();
      } catch (ExecutionException e) {
          System.out.println("Failed to update the offsets committed by group " + "test1_consumer_group" + " with error " + e.getMessage());
            
          // 컨슈머 애플리케이션을 정지시키지 않고 작업시 Exception 발생
          if (e.getCause() instanceof UnknownMemberIdException) {
              System.out.println("Check if consumer group is still active.");
          }
      }
    

클러스터 메타데이터

  • 클러스터 정보 조회
    1
    2
    3
    4
    5
    
      DescribeClusterResult cluster = admin.describeCluster();
      System.out.println("Connected to cluster " + cluster.clusterId().get());
      System.out.println("The brokers in the cluster are:");
      cluster.nodes().get().forEach(node -> System.out.println(" * " + node));
      System.out.println("The controller is: " + cluster.controller().get());
    

고급 어드민 작업

SRE에게는 매우 중요하지만 잘 쓰이지도 않고, 위험한 메소드들.

토픽에 파티션 추가하기

여러 토픽을 한번에 확정할 경우 일부 토픽은 성공하고 나머지는 실패할 수도 있음

1
2
3
4
5
6
Map<String, NewPartitions> newPartitions = new HashMap<>();
// 파티션 수 지정.
// 토픽을 확자앟ㄹ 때는 새로 추가될 파티션의 수가 아닌,
// 파티션이 추가된 두의 파티션 수를 지정해주어야함.
newPartitions.put(TOPIC_NAME, NewPartitions.increaseTo(NUM_PARTITIONS + 2));
admin.createPartitions(newPartitions).all().get();

토픽에서 레코드 삭제하기

레코드를 디스크에서 실제로 지우는 작업은 비동기적으로 일어남 deleteRecords()listOffsets()를 사용하면 특정 시각 이전에 쓰여진 레코드들을 지울 수 있음

1
2
3
4
5
6
7
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> olderOffsets = admin.listOffsets(requestOlderOffsets).all().get();
Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<>();
for (Map.Entry<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> e: olderOffsets.entrySet()) {
    recordsToDelete.put(e.getKey(), RecordsToDelete.beforeOffset(e.getValue().offset()));
}
// 지정된 오프셋 보다 오래된 레코드 삭제 
admin.deleteRecords(recordsToDelete).all().get();

리더 선출

아래의 두 가지 서로 다른 형태의 리더 선출 가능

1. 선호 리더 선출

  • 각 파티션은 선호 리더라 불리는 레플리카를 갖음.
  • 모든 파티션의 리더가 선호 리더 레플리카인 경우 -> 브로커마다 할당되는 리더의 개수가 균형을 이룸
  • auto.leader.rebalacne.enable -> false
  • electLeader() 메소드 호출

2. 언클린 리더 선출

  • 리더 레플리카가 사용 불능인 경우 -> 리더가 될 수 없는 레플리카를 리더로 (데이터 손실)

레플리카 재할당

alterPartitionReassignments()를 사용하면 파티션에 속한 각각의 레플리카의 위치를 정밀하게 제어 가능
레플리카 위치가 맘에 안들때, 브로커에 너무 많은 레플리카가 있을 때, 몇몇 토픽에 대한 요청이 너무 많아서 나머지에서 따로 분리해 놓고 싶을 때…

1
2
3
4
5
6
7
8
9
10
11
12
Map<TopicPartition, Optional<NewPartitionReassignment>> reassignment = new HashMap<>();
reassignment.put(new TopicPartition(TOPIC_NAME, 0), Optional.of(new NewPartitionReassignment(Arrays.asList(0,1))));
reassignment.put(new TopicPartition(TOPIC_NAME, 1), Optional.of(new NewPartitionReassignment(Arrays.asList(1))));
reassignment.put(new TopicPartition(TOPIC_NAME, 2), Optional.of(new NewPartitionReassignment(Arrays.asList(1,0))));
reassignment.put(new TopicPartition(TOPIC_NAME, 3), Optional.empty());

admin.alterPartitionReassignments(reassignment).all().get();
System.out.println("currently reassigning: " + admin.listPartitionReassignments().reassignments().get());

demoTopic = admin.describeTopics(TOPIC_LIST);
topicDescription = demoTopic.values().get(TOPIC_NAME).get();
System.out.println("Description of demo topic:" + topicDescription);

테스트하기

아파치 카프카는 원하는 수만큼의 브로커를 설정해서 초기화할 수 있는 MockAdminClienttest class를 제공

  • admin client를 사용하여 토픽을 생성하는 클래스 정의
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    
      public TopicCreator(AdminClient admin) {
          this.admin = admin;
      }
      // 토픽 이름이 "test"로 시작할 경우 생성하는 예제 메소드
      public void maybeCreateTopic(String topicName) throws ExecutionException, InterruptedException {
          Collection<NewTopic> topics = new ArrayList<>();
          topics.add(new NewTopic(topicName, 1, (short) 1));
          if (topicName.toLowerCase().startsWith("test")) {
              admin.createTopics(topics);
              // 설정 변경
              ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
              ConfigEntry compaction = new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
              Collection<AlterConfigOp> configOp = new ArrayList<AlterConfigOp>();
              configOp.add(new AlterConfigOp(compaction, AlterConfigOp.OpType.SET));
              Map<ConfigResource, Collection<AlterConfigOp>> alterConf = new HashMap<>();
              alterConf.put(configResource, configOp);
              admin.incrementalAlterConfigs(alterConf).all().get();
          }
      }
    
  • 목업 클라이언트를 생성하며 테스트 setup
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    
      @Before
      public void setUp() {
          Node broker = new Node(0,"localhost",9092);
          this.admin = spy(new MockAdminClient(Collections.singletonList(broker), broker));
    
          // 아래 내용이 없으면 테스트가
          // java.lang.UnsupportedOperationException: Not implemented yet 예외를 발생시킴
          AlterConfigsResult emptyResult = mock(AlterConfigsResult.class);
    
          doReturn(KafkaFuture.completedFuture(null)).when(emptyResult).all();
          doReturn(emptyResult).when(admin).incrementalAlterConfigs(any());
      }
    
  • maybecreateTopic()의 동작 확인

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    
      @Test
      public void testCreateTestTopic() throws ExecutionException, InterruptedException {
          TopicCreator tc = new TopicCreator(admin);
    
          tc.maybeCreateTopic("test.is.a.test.topic11");
    
          verify(admin, times(1)).createTopics(any());
    
          ListTopicsResult topics = admin.listTopics();
          topics.names().get().forEach(System.out::println);
    
          admin.close(Duration.ofSeconds(30));
      }
    
      @Test
      public void testNotTopic() throws ExecutionException, InterruptedException {
          TopicCreator tc = new TopicCreator(admin);
    
          tc.maybeCreateTopic("not.a.test");
    
          verify(admin, never()).createTopics(any());
      }
    
  • pom.xml
    MockAdminClient를 test jar에 담아서 공개하기 위해 dependency에 추가
    1
    2
    3
    4
    5
    6
    7
    
      <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-clients</artifactId>
          <version>2.5.0</version>
          <classifier>test</classifier>
          <scope>test</scope>
      </dependency>
    

    요약

    즉석에서 토픽을 생성하거나 애플리케이션이 사용할 토픽이 올바른 설정을 가지고 있는지를 확인해야 하는 애플리케이션 개발자들에게 유용.
    AdminClient는 툴을 개발해야 하거나, 카프카 작업을 자동화하거나, 사고가 발생했을 때 복구해야하는 운영자나 SRE(Site Reliability Engineer, 신뢰성 엔지니어)에게도 쓸모가 많음.

This post is licensed under CC BY 4.0 by the author.