1. Installation

1.1 Create a Topic on Docker

$ docker exec -it kafka /bin/sh
$ cd /opt/kafka/bin/
$ kafka-topics.sh --create \
                  --zookeeper zookeeper:2181 \
                  --replication-factor 1 \
                  --partitions 1 \
                  --topic "test-topic"
$ kafka-topics.sh --list --zookeeper zookeeper:2181

1.2 Installing Confluent Connectors (Optional)

먼저 connector는 confluent 를 통해서 검색할수 있습니다.
Confluent 를 통해서는 commercial 서비스를 받을 수 있고, 기본적으로 제공되는 plugins 외에 다양한 connectors 를 설치 할 수 있습니다.

다운로드 받을 connectors는 다음과 같습니다.
Download 버튼을 눌러서 직접 다운로드 받고 설치 하면 됩니다.

  1. 다운로드 받습니다.
  2. unzip 시킵니다.
  3. /usr/local/kafka/plugins/ 위치로 lib 디렉토리를 이동시킵니다.
  4. 이후 plugin.path=/usr/local/kafka/plugins 를 설정파일에 추가하면 됩니다.

중요한건 lib 디렉토리안에 들어있는 jar 파일들이고
/usr/local/kafka/plugins 아래에 있는 디렉토리들은 바로 jar 파일들이 존재해야 합니다.
즉 /usr/local/kafka/plugins/new-connector/lib/aaa.jar 이런식으로 존재하면 안됩니다.

2. Running Connector

2.1 Standalone

connect-file-source.properties 파일 생성

  • connector.class=FileStreamSource : 파일 커넥터 클래스를 사용한다고 정의
  • topic=test-topic : 보낼 Topic 이름을 지정
  • FileStreamSource 뒷쪽에 스페이스가 존재할경우 에러가 날수 있습니다. ㅠㅠ 4시간 날려먹음
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=/home/anderson/Downloads/test.txt
topic=test-topic

connect-file-sink.properties

name=local-file-sink  
connector.class=FileStreamSink
tasks.max=1
file=/home/anderson/Downloads/test.sink.txt  
topics=test-topic

connect-standalone.properties

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect-file.offsets

# 아래설정을 해주면 기본값보다 더 빠르게 메세지를 보냅니다. 
offset.flush.timeout.ms=1000
buffer.memory=100
plugin.path=/usr/local/kafka/plugins

또는 StringConverter 도 사용할수 있습니다.
위의 코드를 대체하면 됩니다.

key.converter=org.apache.kafka.connect.storage.StringConverter  
value.converter=org.apache.kafka.connect.storage.StringConverter  

Standalone 모드 실행

$ connect-standalone.sh connect-standalone.properties connect-file-source.properties connect-file-sink.properties

터미널을 새로 열고..

$ tail -f test.sink.txt

또 새로운 터미널을 열고 테스트를 해봅니다.

$ echo Hello >> test.txt
$ echo "I am Anderson" >> test.txt
$ echo "I am a boy you are a girl" >> test.txt
$ echo '{"hello": 123.4, "key": "Stock Market"}' >> test.txt

2.2 Distributed Mode

Distributed Mode 에서는 여러개의 workers가 동일한 group.id 로 Kafka Connect를 실행하게 됩니다.
또한 offset, task configuration, status 등을 topic 에 저장해놓기 때문에 이것도 standalone 과 다릅니다.

connect-distributed.properties

bootstrap.servers=localhost:9092

# unique name for the cluster
# consumer group ID와 중복되면 안됨
group.id=connect-cluster

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

# offsets을 저장할 topic / the topic should have many partitions and be replicated and compacted. 
# 해당 토픽은 자동으로 생성됨 
# default replication factor = 3 이며 특별한 케이스에서는 더 커질수도 있다
# 개발환경에서는 single-broker 로 움직이기 때문에 1로 설정
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
#offset.storage.partitions=25

# connector 그리고 task configuration을 저장할 Topic을 설정합니다. 
# this should be a single partition, highly replicated and compacted topic. 
# 해당 토픽은 자동으로 생성됨
# default replication factor = 3 이며 특별한 케이스에서는 더 커질수도 있다
# 개발환경에서는 single-broker 로 움직이기 때문에 1로 설정
config.storage.topic=connect-configs
config.storage.replication.factor=1

# status를 저장할 Topic 설정. 
# this topic can have multiple partitions and should be replicated and compacted. 
# 해당 토픽은 자동으로 생성됨 
# default replication factor = 3 이며 특별한 케이스에서는 더 커질수도 있다
# 개발환경에서는 single-broker 로 움직이기 때문에 1로 설정
status.storage.topic=connect-status
status.storage.replication.factor=1
#status.storage.partitions=5

# 테스트/디버깅을 위해서 더 빠르게 설정해 놓습니다. 
offset.flush.interval.ms=1000
plugin.path=/usr/local/kafka/plugins

Distributed Mode 로 Connector를 실행시킵니다.

# production 에서 daemon 으로 돌릴려면.. -daemon 을 맨 앞에 넣어서 돌리면 됨
# connect-distributed.sh -daemon connect-distributed.properties
$ connect-distributed.sh connect-distributed.properties

정상 작동 확인합니다.

# 정상적으로 작동하는지 확인합니다.
$ curl -s localhost:8083 | jq
{
  "version": "2.8.1",
  "commit": "839b886f9b732b15",
  "kafka_cluster_id": "zrADZwVuRQ2CnAMIW_6DZw"
}

# 사용 가능한 plugins 리스트 
$ curl -s localhost:8083/connector-plugins | jq
[
  {
    "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
    "type": "sink",
    "version": "2.8.1"
  },
  {
    "class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
    "type": "source",
    "version": "2.8.1"
  }
]

connect-file-source.json

{
    "name": "local-file-source",
    "config": {
        "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
        "tasks.max": 1,
        "file": "/home/anderson/Downloads/input.txt",
        "topic": "test-topic"
    }
}
$ curl  -d @"connect-file-source.json" \
        --header 'Content-Type: application/json' \
        --request POST 'localhost:8083/connectors' | jq

# 등록 확인
$ curl -s localhost:8083/connectors/local-file-source | jq

connect-file-sink.json

{
    "name": "local-file-sink",
    "config": {
        "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "tasks.max": 1,
        "file": "/home/anderson/Downloads/output.txt",
        "topics": "test-topic"
    }
}
$ curl  -d @"connect-file-sink.json" \
        --header 'Content-Type: application/json' \
        --request POST 'localhost:8083/connectors' | jq

# 등록 확인 
$ curl -s localhost:8083/connectors/local-file-sink | jq

등록을 확인합니다.

$ curl -s localhost:8083/connectors | jq
[
  "local-file-source",
  "local-file-sink"
]

# 로그 확인
$ docker logs -f kafka

커넥터 삭제는 다음과 같이 합니다.

$ curl -X DELETE http://localhost:8083/connectors/file-source-connector

확인은 다음과 같이 합니다.
먼저 터미널을 새로 열고..

$ touch output.txt
$ tail -f output.txt

새로운 터미널을 열고..

$ touch input.txt
$ echo hello >> input.txt

2.3 ESK Distributed Mode

  • NOT_ENOUGH_REPLICAS 에러가 발생하면 connect-offsets, connect-configs, 그리고 connect-status
    이 3개의 topics의 replication.factor 의 값이 min.insync.replica 보다 작아서 발생하는 문제입니다.
    이 문제가 발생할시 3개의 토픽을 삭제하고 replication.factor=3 으로 맞추고 다시 생성하면 됩니다 .
  • 2.2의 distributed mode 와 모두 동일하지만 replication 설정만 조금 다릅니다.

먼저 min.insync.replicas 를 설정한 Topic을 생성합니다.

$ kafka-topics.sh --zookeeper $ZooKeeperConnect --create --topic test-topic \
                  --config min.insync.replicas=1 --partitions 10 --replication-factor 3

connect-distributed.properties

bootstrap.servers=<Bootstrap Connect>

# unique name for the cluster
# consumer group ID와 중복되면 안됨
group.id=connect-cluster

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

# offsets을 저장할 topic / the topic should have many partitions and be replicated and compacted. 
# 해당 토픽은 자동으로 생성됨 
# default replication factor = 3 이며 특별한 케이스에서는 더 커질수도 있다
# 개발환경에서는 single-broker 로 움직이기 때문에 1로 설정
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
#offset.storage.partitions=25

# connector 그리고 task configuration을 저장할 Topic을 설정합니다. 
# this should be a single partition, highly replicated and compacted topic. 
# 해당 토픽은 자동으로 생성됨
# default replication factor = 3 이며 특별한 케이스에서는 더 커질수도 있다
# 개발환경에서는 single-broker 로 움직이기 때문에 1로 설정
config.storage.topic=connect-configs
config.storage.replication.factor=3

# status를 저장할 Topic 설정. 
# this topic can have multiple partitions and should be replicated and compacted. 
# 해당 토픽은 자동으로 생성됨 
# default replication factor = 3 이며 특별한 케이스에서는 더 커질수도 있다
# 개발환경에서는 single-broker 로 움직이기 때문에 1로 설정
status.storage.topic=connect-status
status.storage.replication.factor=3
#status.storage.partitions=5

# 테스트/디버깅을 위해서 더 빠르게 설정해 놓습니다.
offset.flush.interval.ms=1000
plugin.path=/usr/local/kafka/plugins