AWS MSK (Managed Streaming for Apache Kafka)

1. Architecture
- Broker Nodes
- MSK 생성시 각각의 Availability Zone 안에 몇개의 Broker Node를 생성할지 설정 할 수 있다.
- 위의 그림에서는 각각의 availability zone마다 1개의 broker node가 존재하고 있으며,
각각의 availability zone은 각각의 VPC subnet을 갖고 있다
- ZooKeeper Nodes
- Highly reliable distributed coordination 을 구현
- Producers, Consumers and Topic Creators
- Topic 생성 및 데이터 produce 또는 consume 을 하는데 사용
- Cluster Operations
- CLI 또는 SDK를 통해서 control-plane operations 을 수행할수 있습니다.
- 예를 들어 cluster 생성, 삭제 또는 클러스터 리스팅, 클러스터 속성 보기, 또는 Broker 변경등을 수행할 수 있습니다.
2. AWS MSK Cluster
2.1 Create a VPC for MSK Cluster
https://console.aws.amazon.com/vpc/ 링크로 들어가서 Launch VPC Wizrd 를 선택합니다.
먼저 첫번째 Subnet을 생성합니다.
- 설정
- VPC name:
kafka_vpc_test
- Availability Zone:
us-east-2a
선택 - Subnet name:
kafka_vpc_subnet_01
- Create VPC 선택
- VPC name:
Kafka VPC ID(vpc-123456789abcdefgh
) 는 따로 적어놓습니다.
2.2 High Availability and Fault Tolerance
High availability 그리고 Fault Tolerance를 구현하기 위해서 추가적으로 다른 availability zones 에 2개의 subnet을 추가합니다.
두번째 Subnet을 생성합니다.
두번째 subnet은 다음과 같이 생성합니다.
- Subnets 메뉴를 선택하고 이전에 생성한
kafka_vpc_subnet_01
을 찾고 -> Route Table을 찾고 복사 (rtb-0fabcd123456789ff
)합니다. - Create Subnet 버튼을 누릅니다.
- VPC ID: 이전에 생성한 VPC ID (
kafka_vpc_test
) 를 선택 - Subnet name:
kafka_vpc_subnet_02
- Availability Zone:
us-east-2b
- CIDR block:
10.0.1.0/24
- 생성!
- VPC ID: 이전에 생성한 VPC ID (
바로 이전에 만든 subnet을 선택하고, 아래 메뉴에서 Route table
을 선택후 Edit route table association
을 선택
Route Table ID 드랍박스에서, 첫번째 만들었던 route table id(rtb-0fabcd123456789ff
)을 선택하고 저장합니다.
동일한 방식으로 Availability Zone 만 바꿔서 생성합니다.
- VPC ID: 이전에 생성한 VPC ID (
kafka_vpc_test
) 를 선택 - Subnet name:
kafka_vpc_subnet_03
- Availability Zone:
us-east-2c
- CIDR block:
10.0.2.0/24
2.3 Create Cluster
MSK 서비스로 이동후 Create Cluster 를 누릅니다.
- Custom Create 선택
- Cluster name:
kafka-test
- Apache Kafka Version: 2.8.1
- Configuration: MSK default configuration
- Networking: 이전의 생성한 Subnets을 선택합니다.
2.4 Create EC2 & Security Group Configuration
AWS MSK는 좀 특이하게도.. 외부 접속이 되지를 않습니다.
내부적으로, MSK는 VPC zone 내부에 존재하고 있으며 Zookeep, broker url은 private ip로 사용됩니다.
VPC 내부의 EC2를 생성해서 접속해야 합니다.
- Configure Instance
- Network: 이전에 설정한 kafka_vpc_test 를 선택합니다.
- Auto-assign Public IP: Enable
그 다음에 할일은 Kafka Cluster 의 Security Group 의 inbound 를 편집합니다.
여기서 All Traffic 으로 하되 Source를 새로 생성한 EC2 instance의 security group 을 넣으면 됩니다.
자세한 방법은 아래와 같습니다.
- EC2 Instance
- Instances 에서 생성된 instance를 클릭하고 Security 메뉴에서 Security Groups 를 어딘가에 복사합니다.
sg-instance-security-group
으로 이름을 예제로 사용하겠습니다.
- Kafka Cluster
- Cluster 선택
- Networking -> Security groups applied 를 찾습니다.
- 예제에서는
sg-kafka-cluster-security-group
으로 사용하겠습니다. - 그냥 클릭합니다.
- Kafka Cluster 의 Security Group의 INBOUND 편집
- Security Group
- Inbound Rules 탭
- Edit Inbound Rules 선택
- Add Rule 선택
- Type: All traffic
- Source: Custom 선택후 복사한 Security Group을 넣습니다.
3. Kafka Client and Topic
3.1 Installing Kafka Client
.bashrc 에 다음과 같이 JAVA_HOME
을 설정해 줍니다.
$ sudo apt-get install openjdk-11-jdk
$ sudo apt install mlocate
추가적으로 aws cli 그리고 aws-mfa 설치해야 합니다.
다음의 내용을 ~/.bashrc 에 넣습니다.
# KAFKA
export PATH=$PATH:/home/ubuntu/.local/bin/:/usr/local/kafka/bin
export JAVA_HOME=/usr/lib/jvm/java-1.11.0-openjdk-amd64
# EC2 Instance
# 2.8.1 인 경우
$ wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.12-2.8.1.tgz
# 2.6.2 인 경우
$ wget https://archive.apache.org/dist/kafka/2.6.2/kafka_2.12-2.6.2.tgz
$ tar -xzf kafka_2.12-2.8.1.tgz
$ sudo mv kafka_2.12-2.8.1 /usr/local/kafka
$ sudo chown anderson:anderson -R /usr/local/kafka
3.2 Installing Local Kafka Server (optional)
Kafka Server를 설치합니다.
이 부분은 local 실행을 어떻게 하는지 남기기 위해서 남기는 것이고, 현재 MSK를 사용하기 때문에 그냥 패스해도 됩니다.
Zookeeper 와 Kafka service 에 대한 systemd unit files 을 생성합니다.
이를 통해서 kafka service 를 start/stop 명령어로 좀 더 쉽게 관리할수 있도록 합니다.
Kafka Systemd Unit File 을 생성합니다.
sudo vim /etc/systemd/system/zookeeper.service
으로 열고 아래와 같이 설정합니다.
[Unit]
Description=Apache Zookeeper server
Documentation=http://zookeeper.apache.org
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=simple
ExecStart=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
ExecStop=/usr/local/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
sudo vim /etc/systemd/system/kafka.service
으로 열고 아래와 같이 설정합니다.
[Unit]
Description=Apache Kafka Server
Documentation=http://kafka.apache.org/documentation.html
Requires=zookeeper.service
[Service]
Type=simple
Environment="JAVA_HOME=/usr/lib/jvm/java-1.11.0-openjdk-amd64"
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh
[Install]
WantedBy=multi-user.target
Systemd daemon을 리로드 해줍니다.
$ sudo systemctl daemon-reload
$ sudo systemctl start zookeeper
$ sudo systemctl start kafka
$ sudo systemctl status zookeeper
$ sudo systemctl status kafka
3.3 Create Topic
다음의 명령어로 Zookeeper Connect 를 알아냅니다.
ZookeeperConnectString 값을 확인합니다.
# Local Computer
# aws kafka describe-cluster --region <region> --cluster-arn <CLUSTER_ARN>
$ aws kafka describe-cluster --region ap-northeast-2 --cluster-arn <CLUSTER_ARN> | jq .ClusterInfo.ZookeeperConnectString
다음과 같이 생성합니다.
ZookeeperConnectString 부분은 위에서 grep으로 잡은 전체 정보를 넣어야 합니다.
# EC2 Instance
$ kafka-topics.sh --create --zookeeper <ZookeeperConnectString> --replication-factor 3 --partitions 1 --topic TestTopic
Created topic TestTopic.
$ kafka-topics.sh --list --zookeeper <ZookeeperConnectString>
TestTopic
__amazon_msk_canary
__consumer_offsets
3.4 Broker URL
아래에서 ClusterArn 는 수정해야 합니다.
# Local
$ aws kafka get-bootstrap-brokers --region us-east-2 --cluster-arn ClusterArn
{
"BootstrapBrokerString": "b-2.kafka-test.allwn4.c3.kafka.us-east-2.amazonaws.com:9092,b-1.kafka-test.allwn4.c3.kafka.us-east-2.amazonaws.com:9092,b-3.kafka-test.allwn4.c3.kafka.us-east-2.amazonaws.com:9092",
"BootstrapBrokerStringTls": "b-2.kafka-test.allwn4.c3.kafka.us-east-2.amazonaws.com:9094,b-1.kafka-test.allwn4.c3.kafka.us-east-2.amazonaws.com:9094,b-3.kafka-test.allwn4.c3.kafka.us-east-2.amazonaws.com:9094",
"BootstrapBrokerStringSaslIam": "b-2.kafka-test.allwn4.c3.kafka.us-east-2.amazonaws.com:9098,b-1.kafka-test.allwn4.c3.kafka.us-east-2.amazonaws.com:9098,b-3.kafka-test.allwn4.c3.kafka.us-east-2.amazonaws.com:9098"
}
3.3 SSH Tunnelling
MSK의 가장 큰 문제점은 역시.. 외부에서 topic 연결이 안된다는 것이고.
이를 해결하는 방법은 많은데.. 제가 사용하는 방법은 그냥 ssh tunneling을 사용하는 것 입니다.
기본적으로 ssh -i aws.pem -N -L {local port}:{MSK Broker}:9092 ubuntu@{EC2 address}
이렇게 사용합니다.
예제에서는 3개의 broker를 사용하기 때문에.. “-L {local port}:{MSK Broker}:9092” 방식으로 추가를 더 해주면 됩니다.
ssh -i ~/.ssh/aws.pem -N -L 9093:b-3.kafka-test.allwn4.c3.kafka.us-east-2.amazonaws.com:9092 ubuntu@ec2-5-20-100-100.us-east-2.compute.amazonaws.com
근데 실제 사용해보니.. SSH Tunneling은 매우 느립니다.
가장 쉽게 해결할수 있는 방법은 EC2에 OpenVPN을 설치해서 연결하는게 가장 편리합니다.
3.3 Python Example
pip3 install kafka-python
producer.py 는 다음과 같이 작성합니다.
bootstrap_servers
에는 aws kafka get-bootstrap-brokers
명령어로 알아낸 BootstrapBrokerString
을 적어 넣으면 됩니다.
import json
from kafka import KafkaProducer
def produce():
producer = KafkaProducer(
acks=0,
bootstrap_servers=[
'a-1.kafka-dev.a11111.c4.kafka.ap-northeast-2.amazonaws.com:9092',
'a-2.kafka-dev.a11111.c4.kafka.ap-northeast-2.amazonaws.com:9092',
'a-3.kafka-dev.a11111.c4.kafka.ap-northeast-2.amazonaws.com:9092',
],
api_version=(2, 8, 1),
value_serializer=lambda x: json.dumps(x).encode('utf-8'))
for i in range(10):
print(f'Sending: {i}')
r = producer.send('TestTopic', value=f'test {i}')
producer.flush()
if __name__ == '__main__':
produce()
Consumer 코드는 다음과 같습니다.
import json
from kafka import KafkaConsumer
def consume():
consumer = KafkaConsumer(
'TestTopic',
bootstrap_servers=[
'a-1.kafka-dev.a11111.c4.kafka.ap-northeast-2.amazonaws.com:9092',
'a-2.kafka-dev.a11111.c4.kafka.ap-northeast-2.amazonaws.com:9092',
'a-3.kafka-dev.a11111.c4.kafka.ap-northeast-2.amazonaws.com:9092',
],
api_version=(2, 8, 1),
auto_offset_reset='earliest',
group_id='my-group',
enable_auto_commit=True,
value_deserializer=lambda x: json.dumps(x.decode('utf-8')))
for message in consumer:
print(message)
if __name__ == '__main__':
consume()