1. Architecture

  • Broker Nodes
    • MSK 생성시 각각의 Availability Zone 안에 몇개의 Broker Node를 생성할지 설정 할 수 있다.
    • 위의 그림에서는 각각의 availability zone마다 1개의 broker node가 존재하고 있으며,
      각각의 availability zone은 각각의 VPC subnet을 갖고 있다
  • ZooKeeper Nodes
    • Highly reliable distributed coordination 을 구현
  • Producers, Consumers and Topic Creators
    • Topic 생성 및 데이터 produce 또는 consume 을 하는데 사용
  • Cluster Operations
    • CLI 또는 SDK를 통해서 control-plane operations 을 수행할수 있습니다.
    • 예를 들어 cluster 생성, 삭제 또는 클러스터 리스팅, 클러스터 속성 보기, 또는 Broker 변경등을 수행할 수 있습니다.

2. AWS MSK Cluster

2.1 Create a VPC for MSK Cluster

https://console.aws.amazon.com/vpc/ 링크로 들어가서 Launch VPC Wizrd 를 선택합니다.

먼저 첫번째 Subnet을 생성합니다.

  1. 설정
    1. VPC name: kafka_vpc_test
    2. Availability Zone: us-east-2a 선택
    3. Subnet name: kafka_vpc_subnet_01
    4. Create VPC 선택

Kafka VPC ID(vpc-123456789abcdefgh) 는 따로 적어놓습니다.

2.2 High Availability and Fault Tolerance

High availability 그리고 Fault Tolerance를 구현하기 위해서 추가적으로 다른 availability zones 에 2개의 subnet을 추가합니다.

두번째 Subnet을 생성합니다.
두번째 subnet은 다음과 같이 생성합니다.

  1. Subnets 메뉴를 선택하고 이전에 생성한 kafka_vpc_subnet_01 을 찾고 -> Route Table을 찾고 복사 (rtb-0fabcd123456789ff)합니다.
  2. Create Subnet 버튼을 누릅니다.
    1. VPC ID: 이전에 생성한 VPC ID (kafka_vpc_test) 를 선택
    2. Subnet name: kafka_vpc_subnet_02
    3. Availability Zone: us-east-2b
    4. CIDR block: 10.0.1.0/24
    5. 생성!

바로 이전에 만든 subnet을 선택하고, 아래 메뉴에서 Route table을 선택후 Edit route table association을 선택

Route Table ID 드랍박스에서, 첫번째 만들었던 route table id(rtb-0fabcd123456789ff)을 선택하고 저장합니다.

동일한 방식으로 Availability Zone 만 바꿔서 생성합니다.

  1. VPC ID: 이전에 생성한 VPC ID (kafka_vpc_test) 를 선택
  2. Subnet name: kafka_vpc_subnet_03
  3. Availability Zone: us-east-2c
  4. CIDR block: 10.0.2.0/24

2.3 Create Cluster

MSK 서비스로 이동후 Create Cluster 를 누릅니다.

  1. Custom Create 선택
  2. Cluster name: kafka-test
  3. Apache Kafka Version: 2.8.1
  4. Configuration: MSK default configuration
  5. Networking: 이전의 생성한 Subnets을 선택합니다.

2.4 Create EC2 & Security Group Configuration

AWS MSK는 좀 특이하게도.. 외부 접속이 되지를 않습니다.
내부적으로, MSK는 VPC zone 내부에 존재하고 있으며 Zookeep, broker url은 private ip로 사용됩니다.
VPC 내부의 EC2를 생성해서 접속해야 합니다.

  1. Configure Instance
    1. Network: 이전에 설정한 kafka_vpc_test 를 선택합니다.
    2. Auto-assign Public IP: Enable

그 다음에 할일은 Kafka Cluster 의 Security Group 의 inbound 를 편집합니다.
여기서 All Traffic 으로 하되 Source를 새로 생성한 EC2 instance의 security group 을 넣으면 됩니다.
자세한 방법은 아래와 같습니다.

  1. EC2 Instance
    1. Instances 에서 생성된 instance를 클릭하고 Security 메뉴에서 Security Groups 를 어딘가에 복사합니다.
    2. sg-instance-security-group 으로 이름을 예제로 사용하겠습니다.
  2. Kafka Cluster
    1. Cluster 선택
    2. Networking -> Security groups applied 를 찾습니다.
    3. 예제에서는 sg-kafka-cluster-security-group 으로 사용하겠습니다.
    4. 그냥 클릭합니다.
  3. Kafka Cluster 의 Security Group의 INBOUND 편집
    1. Security Group
    2. Inbound Rules 탭
    3. Edit Inbound Rules 선택
    4. Add Rule 선택
      1. Type: All traffic
      2. Source: Custom 선택후 복사한 Security Group을 넣습니다.

3. Kafka Client and Topic

3.1 Installing Kafka Client

.bashrc 에 다음과 같이 JAVA_HOME을 설정해 줍니다.

$ sudo apt-get install openjdk-11-jdk
$ sudo apt install mlocate

추가적으로 aws cli 그리고 aws-mfa 설치해야 합니다.
다음의 내용을 ~/.bashrc 에 넣습니다.

# KAFKA 
export PATH=$PATH:/home/ubuntu/.local/bin/:/usr/local/kafka/bin
export JAVA_HOME=/usr/lib/jvm/java-1.11.0-openjdk-amd64
# EC2 Instance
# 2.8.1 인 경우
$ wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.12-2.8.1.tgz

# 2.6.2 인 경우
$ wget https://archive.apache.org/dist/kafka/2.6.2/kafka_2.12-2.6.2.tgz

$ tar -xzf kafka_2.12-2.8.1.tgz
$ sudo mv kafka_2.12-2.8.1 /usr/local/kafka
$ sudo chown anderson:anderson -R /usr/local/kafka

3.2 Installing Local Kafka Server (optional)

Kafka Server를 설치합니다.
이 부분은 local 실행을 어떻게 하는지 남기기 위해서 남기는 것이고, 현재 MSK를 사용하기 때문에 그냥 패스해도 됩니다.
Zookeeper 와 Kafka service 에 대한 systemd unit files 을 생성합니다.
이를 통해서 kafka service 를 start/stop 명령어로 좀 더 쉽게 관리할수 있도록 합니다.

Kafka Systemd Unit File 을 생성합니다.
sudo vim /etc/systemd/system/zookeeper.service 으로 열고 아래와 같이 설정합니다.

[Unit]
Description=Apache Zookeeper server
Documentation=http://zookeeper.apache.org
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
ExecStart=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
ExecStop=/usr/local/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

sudo vim /etc/systemd/system/kafka.service 으로 열고 아래와 같이 설정합니다.

[Unit]
Description=Apache Kafka Server
Documentation=http://kafka.apache.org/documentation.html
Requires=zookeeper.service

[Service]
Type=simple
Environment="JAVA_HOME=/usr/lib/jvm/java-1.11.0-openjdk-amd64"
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh

[Install]
WantedBy=multi-user.target

Systemd daemon을 리로드 해줍니다.

$ sudo systemctl daemon-reload

$ sudo systemctl start zookeeper
$ sudo systemctl start kafka

$ sudo systemctl status zookeeper
$ sudo systemctl status kafka

3.3 Create Topic

다음의 명령어로 Zookeeper Connect 를 알아냅니다.

ZookeeperConnectString 값을 확인합니다.

# Local Computer
# aws kafka describe-cluster --region <region> --cluster-arn <CLUSTER_ARN> 
$ aws kafka describe-cluster --region ap-northeast-2 --cluster-arn <CLUSTER_ARN> | jq .ClusterInfo.ZookeeperConnectString

다음과 같이 생성합니다.
ZookeeperConnectString 부분은 위에서 grep으로 잡은 전체 정보를 넣어야 합니다.

# EC2 Instance 
$ kafka-topics.sh --create --zookeeper <ZookeeperConnectString> --replication-factor 3 --partitions 1 --topic TestTopic
Created topic TestTopic.

$ kafka-topics.sh  --list --zookeeper <ZookeeperConnectString>
TestTopic
__amazon_msk_canary
__consumer_offsets

3.4 Broker URL

아래에서 ClusterArn 는 수정해야 합니다.

# Local 
$ aws kafka get-bootstrap-brokers --region us-east-2 --cluster-arn ClusterArn
{
    "BootstrapBrokerString": "b-2.kafka-test.allwn4.c3.kafka.us-east-2.amazonaws.com:9092,b-1.kafka-test.allwn4.c3.kafka.us-east-2.amazonaws.com:9092,b-3.kafka-test.allwn4.c3.kafka.us-east-2.amazonaws.com:9092",
    "BootstrapBrokerStringTls": "b-2.kafka-test.allwn4.c3.kafka.us-east-2.amazonaws.com:9094,b-1.kafka-test.allwn4.c3.kafka.us-east-2.amazonaws.com:9094,b-3.kafka-test.allwn4.c3.kafka.us-east-2.amazonaws.com:9094",
    "BootstrapBrokerStringSaslIam": "b-2.kafka-test.allwn4.c3.kafka.us-east-2.amazonaws.com:9098,b-1.kafka-test.allwn4.c3.kafka.us-east-2.amazonaws.com:9098,b-3.kafka-test.allwn4.c3.kafka.us-east-2.amazonaws.com:9098"
}

3.3 SSH Tunnelling

MSK의 가장 큰 문제점은 역시.. 외부에서 topic 연결이 안된다는 것이고.
이를 해결하는 방법은 많은데.. 제가 사용하는 방법은 그냥 ssh tunneling을 사용하는 것 입니다.

기본적으로 ssh -i aws.pem -N -L {local port}:{MSK Broker}:9092 ubuntu@{EC2 address} 이렇게 사용합니다.
예제에서는 3개의 broker를 사용하기 때문에.. “-L {local port}:{MSK Broker}:9092” 방식으로 추가를 더 해주면 됩니다.

ssh -i ~/.ssh/aws.pem -N -L 9093:b-3.kafka-test.allwn4.c3.kafka.us-east-2.amazonaws.com:9092 ubuntu@ec2-5-20-100-100.us-east-2.compute.amazonaws.com

근데 실제 사용해보니.. SSH Tunneling은 매우 느립니다.
가장 쉽게 해결할수 있는 방법은 EC2에 OpenVPN을 설치해서 연결하는게 가장 편리합니다.

3.3 Python Example

pip3 install kafka-python

producer.py 는 다음과 같이 작성합니다.
bootstrap_servers 에는 aws kafka get-bootstrap-brokers명령어로 알아낸 BootstrapBrokerString 을 적어 넣으면 됩니다.

import json
from kafka import KafkaProducer

def produce():
    producer = KafkaProducer(
        acks=0,
        bootstrap_servers=[
            'a-1.kafka-dev.a11111.c4.kafka.ap-northeast-2.amazonaws.com:9092',
            'a-2.kafka-dev.a11111.c4.kafka.ap-northeast-2.amazonaws.com:9092',
            'a-3.kafka-dev.a11111.c4.kafka.ap-northeast-2.amazonaws.com:9092',
        ],
        api_version=(2, 8, 1),
        value_serializer=lambda x: json.dumps(x).encode('utf-8'))

    for i in range(10):
        print(f'Sending: {i}')
        r = producer.send('TestTopic', value=f'test {i}')
        producer.flush()

if __name__ == '__main__':
    produce()

Consumer 코드는 다음과 같습니다.

import json
from kafka import KafkaConsumer

def consume():
    consumer = KafkaConsumer(
        'TestTopic',
        bootstrap_servers=[
            'a-1.kafka-dev.a11111.c4.kafka.ap-northeast-2.amazonaws.com:9092',
            'a-2.kafka-dev.a11111.c4.kafka.ap-northeast-2.amazonaws.com:9092',
            'a-3.kafka-dev.a11111.c4.kafka.ap-northeast-2.amazonaws.com:9092',
        ],
        api_version=(2, 8, 1),
        auto_offset_reset='earliest',
        group_id='my-group',
        enable_auto_commit=True,
        value_deserializer=lambda x: json.dumps(x.decode('utf-8')))

    for message in consumer:
        print(message)

if __name__ == '__main__':
    consume()