1. KubeRay

1.1 Install KubeRay

현재 KubeRay 의 master는 실험적인 nightly 버젼으로 사용하고 있으며,
v0.3.0 이 stable 버젼입니다. (뭔가 좀 이상한.. ㅋ)
따라서 v0.3.0 설치는 다음과 같이 합니다.

YAML 방식의 공식 설치문서를 참고 합니다.

# KubeRay Operator 설치
$ export KUBERAY_VERSION=v0.3.0
$ kubectl create -k "github.com/ray-project/kuberay/manifests/cluster-scope-resources?ref=${KUBERAY_VERSION}&timeout=90s"
$ kubectl apply -k "github.com/ray-project/kuberay/manifests/base?ref=${KUBERAY_VERSION}&timeout=90s"

# 설치 확인
$ kubectl -n ray-system get pod --selector=app.kubernetes.io/component=kuberay-operator
NAME                                READY   STATUS    RESTARTS   AGE
kuberay-operator-799899ff46-dgv6p   1/1     Running   0          2m44s

Ray Cluster 설치는 다음과 같이 합니다.
만약 default namespace에 설치하고자 한다면.. apply 할때 -n ray-cluster 빼면 됩니다.

# 먼저 다운을 받고, yaml파일안의 리소스 수정을 할 수 있습니다. 
$ wget https://raw.githubusercontent.com/ray-project/kuberay/release-0.3/ray-operator/config/samples/ray-cluster.autoscaler.yaml
$ kubectl create namespace ray-cluster
$ kubectl apply -f ray-cluster.autoscaler.yaml -n ray-cluster

# 설치 확인
$ kubectl get raycluster -n ray-cluster

이후 KubeRay operator가 자동으로 RayCluster object를 감지하고, Operator 는 Head Node 그리고 Worker Node 를 실행시켜서 Cluster를 실행시키기 시작합니다.

# Head Node 그리고 Worker Node 가 제대로 생성되는지 확인합니다. 
$ kubectl get pods --selector=ray.io/cluster=raycluster-autoscaler -n ray-cluster
NAME                                             READY   STATUS    RESTARTS   AGE
raycluster-autoscaler-head-wsdsc                 2/2     Running   0          94s
raycluster-autoscaler-worker-small-group-74rnx   1/1     Running   0          94s

일단 Head Node가 정상작동하는지 확인하는 방법은 다음과 같이 합니다.
방법은 Head Node에 직접 연결해서 python을 실행시킬 것 입니다.

# 먼저 head node의 pod 이름을 알아냅니다. 
$ kubectl get pods --selector=ray.io/cluster=raycluster-autoscaler --selector=ray.io/node-type=head -o custom-columns=POD:metadata.name --no-headers -n ray-cluster
raycluster-autoscaler-head-wsdsc

# 해당 이름을 사용해서 python을 다이렉트로 실행시킵니다. 
$ kubectl exec raycluster-autoscaler-head-wsdsc -n ray-cluster -it -c ray-head -- python -c "import ray; ray.init(); print('Done')"
2022-08-20 00:08:49,139	INFO worker.py:1224 -- Using address 127.0.0.1:6379 set in the environment variable RAY_ADDRESS
2022-08-20 00:08:49,139	INFO worker.py:1333 -- Connecting to existing Ray cluster at address: 192.168.85.95:6379...
2022-08-20 00:08:49,146	INFO worker.py:1515 -- Connected to Ray cluster. View the dashboard at http://192.168.85.95:8265 
Done

제대로 된 테스트 방법은 Job submission을 하는 것 입니다.
아래에 job submission 하는 방법에 대해서 참고 합니다.

1.2 Cluster Configuration

Head Node 그리고 Worker Node 의 container 설정부터 cpu, memory 등의 설정등을 할 수 있습니다.
이 모든 설정은 rayclusters.ray.io 의 custom resource 에 올라가 있습니다.

위의 예제에서 ray-cluster.autoscaler.yaml 예제를 사용해서 Cluster 를 설치했고, 해당 yaml 파일안에
headGroupSpec 그리고 workerGroupSpecs 등에서 메모리, cpu, container image 등을 수정할 수 있습니다.
ray-cluster.autoscaler.yaml 예제 이외에 다른 예제들은 KubeRay Official Config Samples를 참고 합니다.

# 현재 KubeRay Cluster Configuration 확인
$ kubectl get crd rayclusters.ray.io -o yaml

2. Job Submission

2.1 Ray Job Submission for Test

Ray는 Job requests를 Dashboard Server를 통해서 받습니다.
Dashboard는 Kubernetes Service 를 통해서 접속을 할 수 있으며, 해당 서비스에 port-forward 시켜서 접속해서 dashboard 와 연결할 수 있습니다.

Submitting Job -> Service (ClusterIP) -> Head Node Pod -> ray-head container -> Dashboard (Port: 8265)

기본값으로 ClusterIP 로 제공되기 때문 같은 EKS 클러스터내의 같은 namespace에서 접속이 가능합니다.
다른 방식의 접근은 Ingress 또는 LoadBalancer 등이 될 수 있지만, LoadBalancer의 경우 public으로 만든다는 단점이 있습니다. 자세한 내용은 Services and Networking 을 참조 합니다.

# 서비스 -> Dashboard 를 port-forward 시킵니다.
$ kubectl port-forward -n ray-cluster service/raycluster-autoscaler-head-svc 8265:8265

# Submit Job
$ ray job submit --address http://localhost:8265 -- python -c "import ray; ray.init(); print(ray.cluster_resources())"

2.2 Submitting a Ray Job in CLI

위와 동일하게 8265서버를 port-forward로 오픈합니다.

$ kubectl port-forward -n ray-cluster service/raycluster-autoscaler-head-svc 8265:8265

script.py 로 파이썬 파일을 아래의 코드로 제작을 합니다.

import ray

@ray.remote
def hello_world():
    print(ray.cluster_resources())
    return "hello world"

ray.init()
print(ray.get(hello_world.remote()))

Kubernetes 에 던져서 실행시키 위해서는 먼저 ray address 를 설정해줘야 하며,
RAY_ADDRESS=http://localhost:8265 환경변수 설정
또는 모든 ray 명령어마다 --address=http://localhost:8265 를 써주면 됩니다.

$ export RAY_ADDRESS="http://127.0.0.1:8265"
$ ray job submit --working-dir . -- python script.py

2.3 Submitting a Ray Job in Python

위에서는 ray job submit CLI 명령어로 script.py를 클러스터에 올리고 실행했다면,
Python에서도 동일하게 job submission을 할 수 있습니다.

JobSubmissionClient 를 사용해서 붙으며, submit_job 함수를 사용해서 job을 올릴수 있습니다.
이때 중요한 부분이 runtime_env={"working_dir": "./"를 설정해야지만 로컬 파일들이 업로드되며,
내부적으로 upload_working_dir_if_needed 함수가 호출되서 리모트 클러스터에 업로드 됩니다.

from ray.job_submission import JobSubmissionClient, JobStatus
import time

client = JobSubmissionClient("http://localhost:8265")
job_id = client.submit_job(
    entrypoint="python script.py",
    runtime_env={
        'working_dir': './' # 이게 있어야 리모트 클러스터에 업로드
    }
)
print(job_id)


def wait_until_status(job_id, status_to_wait_for, timeout_seconds=5):
    start = time.time()
    while time.time() - start <= timeout_seconds:
        status = client.get_job_status(job_id)
        print(f"status: {status}")
        if status in status_to_wait_for:
            break
        time.sleep(1)


wait_until_status(job_id, {JobStatus.SUCCEEDED, JobStatus.STOPPED, JobStatus.FAILED})
logs = client.get_job_logs(job_id)
print(logs)
raysubmit_a8xtDCpRqUHhEYAG
status: PENDING
status: PENDING
status: RUNNING
status: RUNNING
status: RUNNING
<생략> 
(hello_world pid=1856) {'memory': 1536870912.0, 'object_store_memory': 338895666.0, 'node:192.168.85.95': 1.0, 'CPU': 2.0, 'node:192.168.11.172': 1.0}
hello world

3. Clean Up

3.1 Deleting a Ray Cluster & KubeRay Operator

$ kubectl delete raycluster raycluster-autoscaler
$ kubectl delete -k "github.com/ray-project/kuberay/ray-operator/config/default?ref=v0.3.0&timeout=90s"