Apache Kafka with Docker

1. Installation
1.1 Docker Compose Installation
https://docs.docker.com/compose/install/ 를 참고합니다.
$ sudo curl -L "https://github.com/docker/compose/releases/download/v2.2.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
$ sudo chmod +x /usr/local/bin/docker-compose
$ docker-compose --version
Docker Compose version v2.2.2
1.2 docker-compose.yml
KAFKA_ADVERTISED_HOST_NAME
를 host ip 로 설정합니다.
이때 localhost 또는 127.0.0.1 같은 도메인으로 잡으면, multiple brokers를 실행할 수 없게 됩니다.
하지만 multi brokers를 생성하지 않고 그냥 개발용으로 쓸 것이니 그냥 127.0.0.1 을 사용합니다.
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
$ docker-compose -f docker-compose.yml up -d
1.3 Creating Topic
먼저 kafka container 에서 shell을 실행시켜서, 이 안에서 topic을 만들 수 있습니다.
$ docker exec -it kafka /bin/sh
$ cd /opt/kafka/bin/
# topic 생성
$ kafka-topics.sh --create \
--zookeeper zookeeper:2181 \
--replication-factor 1 \
--partitions 1 \
--topic test_topic
# 확인합니다.
$ kafka-topics.sh --list --zookeeper zookeeper:2181
1.4 Example
producer.py
import json
from kafka import KafkaProducer
def produce():
producer = KafkaProducer(
acks=0,
bootstrap_servers=[
'localhost:9092',
],
api_version=(2, 8, 1),
value_serializer=lambda x: json.dumps(x).encode('utf-8'))
for i in range(10):
print(f'Sending: {i}')
r = producer.send('TestTopic', value=f'test {i}')
producer.flush()
if __name__ == '__main__':
produce()
consumer.py
import json
from kafka import KafkaConsumer
def consume():
consumer = KafkaConsumer(
'TestTopic',
bootstrap_servers=[
'localhost:9092',
],
api_version=(2, 8, 1),
auto_offset_reset='earliest',
group_id='my-group',
enable_auto_commit=True,
value_deserializer=lambda x: json.dumps(x.decode('utf-8')))
for message in consumer:
print(message)
if __name__ == '__main__':
consume()