1. Installation

1.1 Docker Compose Installation

https://docs.docker.com/compose/install/ 를 참고합니다.

$ sudo curl -L "https://github.com/docker/compose/releases/download/v2.2.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
$ sudo chmod +x /usr/local/bin/docker-compose
$ docker-compose --version
Docker Compose version v2.2.2

1.2 docker-compose.yml

KAFKA_ADVERTISED_HOST_NAME 를 host ip 로 설정합니다.
이때 localhost 또는 127.0.0.1 같은 도메인으로 잡으면, multiple brokers를 실행할 수 없게 됩니다.
하지만 multi brokers를 생성하지 않고 그냥 개발용으로 쓸 것이니 그냥 127.0.0.1 을 사용합니다.

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
$ docker-compose -f docker-compose.yml up -d

1.3 Creating Topic

먼저 kafka container 에서 shell을 실행시켜서, 이 안에서 topic을 만들 수 있습니다.

$ docker exec -it kafka /bin/sh
$ cd /opt/kafka/bin/

# topic 생성
$ kafka-topics.sh --create \
                  --zookeeper zookeeper:2181 \
                  --replication-factor 1 \
                  --partitions 1 \
                  --topic test_topic

# 확인합니다.
$ kafka-topics.sh --list --zookeeper zookeeper:2181

1.4 Example

producer.py

import json
from kafka import KafkaProducer

def produce():
    producer = KafkaProducer(
        acks=0,
        bootstrap_servers=[
            'localhost:9092',
        ],
        api_version=(2, 8, 1),
        value_serializer=lambda x: json.dumps(x).encode('utf-8'))

    for i in range(10):
        print(f'Sending: {i}')
        r = producer.send('TestTopic', value=f'test {i}')
        producer.flush()

if __name__ == '__main__':
    produce()

consumer.py

import json
from kafka import KafkaConsumer

def consume():
    consumer = KafkaConsumer(
        'TestTopic',
        bootstrap_servers=[
            'localhost:9092',
        ],
        api_version=(2, 8, 1),
        auto_offset_reset='earliest',
        group_id='my-group',
        enable_auto_commit=True,
        value_deserializer=lambda x: json.dumps(x.decode('utf-8')))

    for message in consumer:
        print(message)

if __name__ == '__main__':
    consume()