Post

카프카핵심가이드 3장 - 카프카 프로듀서

카프카 핵심가이드 책을 읽은 내용 정리

프로듀서

카프카를 사용하는 용도와 요구사항은 카프카에 메시지를 쓰는 프로듀서 API를 사용하는 방법과 구성에 영향을 준다.

예를 들어 신용카드 트랜잭션 처리 시스템은 메시지에 어떠한 유실이나 중복도 허용되지않는다. 반면에 웹사이트에서 생성되는 클릭정보를 저장하는 경우는 메세지가 유실되거나 중복되는것은 크게 문제되지않는다.

아래의 그림은 카프카에 데이터를 전송할 때 수행되는 주요 단계들이다.

aaa

producerRecord 객체를 생성함으로써 카프카에 메시지를 쓰는 작업은 시작된다.

레코드가 저장될 때 key와 partition지정은 선택, topic, value는 필수사항으로 지정해야한다.

프로듀서 생성

카프카에 메시지를 쓰려면 우선 아래의 3개 필수 속성값을 갖는다.

bootstrap.server

카프카 클러스터와 첫 연결을 생성하기 위해 제공해야하는 브로커의 host목록이다.

프로듀서가 첫 연결을 생성한 후, 추가 정보를 받아오기 때문에 모든 브로커의 주소를 제공할 필요는 없지만 브로커에 장애가 발생하더라도 계속해서 클러스터에 연결될 수 있도록, 최소 2개 이상의 브로커를 지정할 것을 권장한다.

key.serializer

레코드의 key를 직렬화하기 위해 사용하는 Serializer이다. 기본적으로 여러 구현체((ByteArray, String, Integer, …))를 제공한다. 레코드에 키를 사용하지 않더라도 key.serializer는 반드시 설정해야 함

value.serializer

레코드의 value를 직렬화하기 위해 사용하는 Serializer이다. 직렬화 클래스 이름을 제공해야 한다.

java app에서는 아래와 같이 설정할 수 있다.

1
2
3
4
5
6
Properties prop = new Properties();
prop.put("bootstrap.servers", "localhost:9092"); // server, kafka host
prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");   
prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 

KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);

카프카로 메시지 전달

메시지 전송방법에는 크게 3가지가 있다.

  1. Fire and Forget

클러스터(브로커)에 메시지를 전송하고, 성공·실패 여부는 신경 쓰지 않는다.

  1. Synchronous send

다음 메시지를 보내기 위해선, 이전 메시지 전송의 성공·실패 여부를 확인해야한다.

  1. Asynchronous send

기본적으로 프로듀서는 항상 비동기적으로 작동하며 응답을 받는 시점에 콜백 메서드를 자동으로 호출

위의 3가지 방법에 대해 java code를 보자.

  1. Fire and Forget

    1
    2
    3
    
     ProducerRecord<String, String> record = new ProducerRecord<>("A", "B", "C"); // topic, key, value
        
     producer.send(record);
    
  2. Synchronous send

다음은 동기적으로 메시지를 전송하는 방법이다. 동기적0으로 메시지를 전송할 경우 전송을 요청한 스레드는 기다려야하기 때문에 실제 어플리케이션에서는 잘 사용하지 않는다.

Future.get 을 사용하여 대기한다. 레코드 전송 실패하면 get()은 예외를 발생시키며 전송 성공하면 RecordMetaData 객체를 return한다.

1
2
3
ProducerRecord<String, String> record = new ProducerRecord<>("A", "B", "C"); // topic, key, value

producer.send(record).get();
  1. Asynchronous send

다음은 비동기적으로 메시지를 전송하는 방법이다.에러를 처리하기 위해 콜백을 사용하였다.

카프카가 에러를 리턴하면, onCompletion() 메서드는 null이 아닌 Exception객체를 받게 된다.

1
2
3
4
5
6
7
8
9
10
11
12
private class DemoProducerCallBack implements Callback {
	@Override
	public void onCompletion(RecordMetadata recordMetadata, Exception e) {
		if(e != null {
			e.printStackTrace();
		}
	}	
}

ProducerRecord<String, String> record = new ProducerRecord<>("A", "B", "C"); // topic, key, value

producer.send(record, new DemoProducerCallback());

프로듀서 설정하기

필수 설정값 이외에 다양한 설정값이 존재한다.

1. client.id

프로듀서와 애플리케이션을 구분하기 위한 논리적 식별자.

  • IP 104.27.155.134에서 인증 실패가 자주 발생하네?
  • 주문 확인 서비스가 인증에 실패하고 있는 듯한데

2. acks

프로듀서가 임의의 쓰기 작업이 성공했다고 판별하기 위해 얼마나 많은 파티션 레플리카가 해당 레코드를 받아야 하는지 결정. 메시지 유실 가능성 즉, 신뢰성에 영향을 끼침.

  • acks = 0

    프로듀서는 메시지가 성공적으로 전달되었다고 간주하고 중개인의 응답을 기다리지 않음

  • acks = 1

    프로듀서는 리더 레플리카가 메시지를 받는 순간 성공 응답을 받음. 만약, 리더에 메시지를 쓸 수 없다면 프로듀서는 에러를 응답받고, 데이터 유실을 피하기 위해 재전송.

  • acks = all

    프로듀서는 모든 인-싱크 레플리카(in-sync replica)에 메시지가 전달된 후에야 중개인으로부터 성공했다는 응답을 받음.

⇒ acks설정을 내려잡아서 신뢰성을 낮추면 그만큼 레코드를 빠르게 보낼 수 있다. 하지만 신뢰성과 프로듀서 지연사이에는 trade off 관계가 있다는 말이다.

3. 메시지 전달 시간

send()를 호출했을 때 성공 혹은 실패하기 까지 얼마나 시간이 걸리는가? 에대한 설정을 할 수 있다.

아파치 카프카 2.1부터 개발진은 ProducerRecord를 보낼 때 걸리는 시간을 두 구간으로 나누어 따로 처리할 수 있도록 하였다.

  • send()에 대한 비동기 호출이 이뤄진 시각부터 결과를 리턴할 때까지 걸리는 시간 (이 시간동안 send()를 호출한 스레드는 블록)
  • send()에 대한 비동기 호출이 성공적으로 리턴한 시각부터(성공했든 실패했든) 콜백이 호출될 때 까지 걸리는 시간

image

  • max.blocks.ms

    프로듀서가 얼마나 오랫동안 블록되는지를 결정한다.

  • delivery.timeout.ms

    레코드 전송 준비가 완료된 시점(send()가 무사히 리턴되고 레코드가 배치에 저장된 시점)부터 브로커의 응답을 받거나 전송을 포기하게 되는 시점까지의 제한시간을 결정한다.

    위의 그림에서 볼 수 있듯이 이는 linger.ms + request.timeout.ms보다 delivery.timeout.ms 의 값이 커야한다.

    카프카가 재시도를 하는 도중 delivery.timeout.ms가 넘어가버린다면, 마지막으로 재시작 하기 전에 브로커가 리턴한 에러에 해당하는 예외와 함께 콜백이 호출된다.

  • request.timeout.ms

    서버로부터 응답을 받기 위해 얼마나 기다릴지를 정의한다. 재시도 시간이나, 실제 전송 이전에 소요되는 시간은 포함하지 않고, 실패한다면 TimeoutException콜백을 호출한다.

  • retries, retry.backoff.ms

    retries는 기본적으로 에어를 발생시킬 때까지 메시지를 재전송하는 횟수를 정의한다. 기본적으로 프로듀서는 각각의 재시도 사이에 100ms동안 대기하는데 이는 trety.backoff.ms 매게변수를 사용하여 이 간격을 조정할 수 있다.

4. linger.ms

배치를 전송하기 전까지 대기하는 시간을 정의한다. linger.ms값을 0보다 크게 설정하면 브로커에 메시지 배치를 전송하기 전에 메시지를 추가할 수 있도록 추가한 값만큼 더 기다릴 수 있다.

5. buffer.memory

프로듀서가 메시지를 전송하기 전 메시지를 대기시키는 버퍼의 크기를 결정한다. max.block.ms만큼 기다리고 버퍼가 안비워지면 에러가 발생한다.

이 타임아웃 에러는 send()에서 발생하는 것이지, send() 가 리턴하는 Future 객체에서 발생하지않는다.

6. compression.type

snappy, gzip, lz4 …등의 압축 타입을 지정한다.

snappy는 cpu 부하가 작으면서 성능이 좋다. gzip은 cpu와 시간을 더 많이 사용하지만 압축률이 좋다.

7. batch.size

각각의 배치에 사용될 메모리의 양을 결정한다.(갯수가 아니고 바이트 단위)

같은 파티션에 다수의 레코드가 전송될 경우 프로듀서는 배치 단위를 모아서 한꺼번에 전송한다.

배치가 가득 차면 배치에 들어 있는 모든 메시지가 한꺼번에 전송되지만, 배치가 가득 찰 때까지 기다리는 것은 아니고, 한개만 들어있는 배치도 전송한다. batch.size를 큰 값으로 해도 메시지 전송에 지연이 발생하지 않는다.

배치 크기보다 큰 메시지가 있으면 배치에 들어가지 않고 바로 전송된다.

8. max.in.flight.requests.per.connection

프로듀서가 서버로부터 응답을 받지 못한 상태에서 전송할 수 있는 최대 메시지 수를 결정한다.

이 값이 올라가면 메모리 사용량이 증가하지만 처리량 또한 증가한다.

순서보장: p.63을 보면 retires값을 0보다 크게 설정해놓은 상태에서 첫번째 배치때는 실패 두번째 배치때 성공한다면 메시지들의 순서가 뒤집어진다. 이를 대비하기 위해 enable.idepotence=ture 로 설정을 하면 메시지들의 순서를 보장할 수 있다.

9. max.request.size

프로듀서가 한번에 전송하는 쓰기 요청의 크기를 결정한다. default값은 1MB이다.

브로커에도 메시지 최대 사이즈 설정값인 message.max.bytes 의 값도 동일하게 맞추어야 브로커에서 프로듀서가 보내는 메시지를 온전히 받을 수 있다.

10. receive.buffer.bytes, send.buffer.bytes

TCP 소켓 송수신 버퍼 크기를 결정한다. -1 일경우 OS의 기본값이 사용된다.

11. enable.idempotence

정확히 한 번 전송할 수 있는 기능을 제공한다. 프로듀서가 레코드를 보낼 때 마다 순차적인 번호를 붙여서 보냅니다. 만약 브로커가 동일한 번호를 가진 레코드를 2개 이상 받을 경우 하나만 저장하게되며, 프로듀서는 별다른 문제를 발생시키지않는 DuplicationSequenceException을 받는다.

시리얼라이저

String, Integer, Byte 등의 시리얼라이저를 기본으로 제공하지만, 이것만으로 모든 데이터를 직렬화할 수 없다. 이러한 경우 직렬화를 하는 방법 은 크게 두 가지가 있을 수 있다.

  1. avro, thrift, protobuf와 같은 직렬화 라이브러리 사용
    • example.avsc

      1
      2
      3
      4
      5
      6
      7
      8
      9
      
        {"namespace": "example.avro",
         "type": "record",
         "name": "User",
         "fields": [
             {"name": "name", "type": "string"},
             {"name": "favorite_number",  "type": ["int", "null"]},
             {"name": "favorite_color", "type": ["string", "null"]}
         ]
        }
      
  2. 객체를 직렬화하기 위한 커스텀 직렬화 로직 작성

1번 방법이 보다 선호되는 방식이다.

파티션

카프카의 토픽들은 여러 파티션으로 나눠진다. 토픽이 카프카에서 일종의 논리적인 개념이라면, 파티션은 토픽에 속한 레코드를 실제 저장소에 저장하는 가장 작은 단위를 의미한다.

1
2
3
4
5
 // topic, key, value
ProducerRecord<String, String> record = new ProducerRecord<>("A", "B", "C");

// key is null
ProducerRecord<String, String> record = new ProducerRecord<>("B", "C");

키 값은 하나의 토픽에 속한 여러개의 파티션 중 해당 메시지가 저장될 파티션을 결정짓는 기준점이기도 하다.

기본 파티셔너는 키값이 null인 레코드가 주어지는 경우, 레코드는 현재 사용 가능한 토픽의 파티션 중 하나에 랜덤하게 저장된다. 각 파티션 별로 저장되는 메시지 갯수의 균형을 맞추기 위해 라운드 로빈 알고리즘을 사용한다. 키값이 존재할 경우는 키값을 해싱하여 파티션을 결정한다.

위의 과정을 처리하는 중에 sticky처리를 지원한다. ( kafka 2.4 버전부터 default partition은 sticky partition이다.) https://www.confluent.io/blog/apache-kafka-producer-improvements-sticky-partitioner/

p.74 그림을 보면 키 값이 null인 메시지들이 키 값이 있는 메시지들 뒤에 붙어서 라운드 로빈 방식으로 배치된 것을 볼 수 있다. 이렇게 할 경우 한번에 브로커로 보낼 수 있는 배치 수를 줄일 수 있다.

⇒ sticky 처리를 하는방식은

  1. 첫 번째 메시지를 보낼 때 특정 파티션을 선택하고, 그 파티션에 연속해서 메시지를 전송한다.
  2. 메시지 배치가 차거나 또는 다른 트리거 조건(lingers.ms 시간 초과)이 발생하면, 새로운 파티션을 선택하여 그 파티션에 메시지를 전송한다.

위의 방식대로 처리가된다.

🙋‍♀️ 라운드 로빈으로 다음 파티션을 선택할 때 메시지의 크기로 결정하나요? 메시지의 갯수로 결정하나요?

파티션을 선택하는 것이 결국 라운드 로빈 방식으로 지정(sticky 처리를 지원하던 안하던)이 되는데 파티션의 크기를 보고 결정하는 것이 아니라 메시지의 갯수를 보고 라운드 로빈으로 다음 파티션을 선택한다고한다. 이러한 방식은 파티션 간의 부하 분산보다는 프로듀서 성능 최적화에 중점을 두고 있으며, 네트워크 요청 횟수를 줄여 성능을 향상시킬 수 있습니다. 그렇지만 파티션간의 용량 편차는 발생할수밖에 없으며 이를 해결하기 위해 메시지의 key값을 설정하여 파티션이 고정되도록 처리하거나, 특정한 파티셔닝 전략(https://factored.ai/power-of-kafka-partitioning/)을 사용해 용량 분배를 좀 더 균일하게 만듭니다.

헤더

레코드의 키/밸류값을 건들지 않고 메타데이터를 심을 때 사용된다.

메시지의 전달 내역을 기록하는 것이 주목적이며 메시지를 파싱 할 필요 없이 헤더에 심어진 정보만으로 메시지를 라우팅 하거나 출처를 추적할 수 있다.

1
2
ProducerRecord<String, String> record = new ProducerRecord<>("A", "B", "C"); // topic, key, value
record.headers().add("privvacy-leve", "YOLO".getBytes(StandradCharsets.UTF_8);

인터셉터

카프카 클라이언트의 코드를 고치지 않으면서 작동을 변경해야 하는 경우, ProducerInterceptor를 사용한다.

ProducerRecord <K, V> onSend(ProducerRecord <K, V> record)

프로듀서가 레코드를 보내기전, 직렬화되기 직전에 호출된다. 레코드에 담긴 정보를 볼 수 있고, 고칠 수 도 있다.

void onAcknowledgement(RecordMetadata metadata, Exception e)

브로커의 응답을 클라이언트가 받았을 때 호출된다. 카프카 브로커가 보낸 응답을 수정할 수는 없지만, 그에 담긴 정보를 읽을 수 있음.

쿼터, 스로틀링

카프카 프로커에는 쓰기/일긱 속도를 제한할 수 있다. 한도(쿼터)를 설정해주면 된다. 3가지 종류가 있다.

  1. 쓰기 쿼터
  2. 읽기 쿼터
  3. 요청쿼터

일기와 쓰기 쿼터는 클라이언트가 데이터를 전송하거나 받는 속도를 초당 바이트 수로 제한하며, 요청 쿼터는 브로커가 요청을 처리하는 시간 비율 단위로 제한한다.

쿼터는 기본값을 설정하거나, 특정한 client.id값에 대해 설정하거나, 특정한 사용자에 대해 설정할 수 있다.

This post is licensed under CC BY 4.0 by the author.