1. Introduction

대용량 데이터를 처리하다보면 주로 Spark에서 나온 Parquet 파일을 다루는 일이 많습니다.
이런 데이터를 통해서 모델을 학습시키게 되는데, 워낙 데이터 사이즈가 크다 보니 데이터를 모두 메모리에 올려서 사용하는 방법은 적절하지 않습니다.
예를 들어 시스템에 메모리가 수천 기가바이트가 아닌 이상 Pandas 또는 Numpy를 통해 인메모리로 모두 올리는 것인 매우 비효율적입니다.
본문에서는 PyArrow 의 주요 기능들에 대해서 레퍼런스 형태로 빠르게 설명하도록 하겠습니다.

2 ParquetDataset

2.1 Basic Usage

ParquetDataset 은 다음의 특징을 갖고 있습니다.

  1. 모든 데이터를 인메모리로 올리지 않음 (아래 Pyarrow size 보면 64이고, Pandas는 692,478)
  2. 디렉토리 지정시 Partition된 데이터를 한꺼번에 가져옴 (그 안에 dt=20230101 디렉토리 모두 읽음)
  3. fragments 통해서 각 파일의 메타정보를 꺼낼수 있음 (row 갯수)
  4. schema 통해서 어떤 컬럼들이 있는지 확인 가능

주요 parameters

  • filters: rows 를 필터링 할 수 있음!! 매우 유용
    ('x', '=', 0)
    ('y', 'in', ['a', 'b', 'c'])
    ('z', 'not in', {'a','b'})
    
  • memory_map: source가 file path 인 경우, memory_map 사용시 performance 증가 함
  • coerce_int96_timestamp_unit: INT96 으로 저장된 timestamp 를 특정 resolution으로 변환 (‘ms’)
  • metadata_nthreads: 메타데이터 읽을때 쓰레드 갯수를 지정하며, 특히 partitioned dataset 읽을 때 속도 증가 (현재 use_legacy_dataset 사용시 지원 안됨)
  • use_legacy_dataset: false 을 경우 새로운 Arrow Dataset API 를 사용함 -> filters 사용 가능하게 해줌

예제 코드

from pyarrow.parquet import ParquetDataset

dataset = ParquetDataset('./data',
                         memory_map=True,                         
                         use_legacy_dataset=False)
df = pd.read_parquet('./data')

file_rows = [frag.count_rows() for frag in dataset.fragments]

print('Pandas shape :', df.shape)
print('Pandas  size :', sys.getsizeof(df))
print('Pyarrow size :', sys.getsizeof(dataset))
print('files        :', dataset.files)
print('fragments    :', dataset.fragments)
print('files rows   :', file_rows)
print('column size  :', len(dataset.schema))
Pandas shape : (1000, 14)
Pandas  size : 692478
Pyarrow size : 64
files        : ['./data/dt=20230101/userdata.parquet']
fragments    : [<pyarrow.dataset.ParquetFileFragment path=./data/dt=20230101/userdata.parquet partition=[dt=20230101]>]
files rows   : [1000]
column size  : 14

2.2 Iteration

왜 이렇게 해야하냐하면.. 데이터 사이즈가 크기 때문입니다.
그냥 쉽게 pandas 로 불러올 수가 없기 때문입니다.
frag.to_batches() 사용시 batch iteration 을 할 수 있고, 이때 중요한 건 batch 는 실제로 데이터를 읽어온 객체 입니다. 따라서 해당 배치 사이즈 만큼의 데이터가 메모리에 올라가게 됩니다.
따라서 순차적 읽기에는 해당 iteration을 사용해서 처리시 효과적으로 처리 가능하나.. random access 가 필요할 시에는 문제가 발생 할 수 있습니다.

  • to_pandas() 함수 사용해서 pandas 로 꺼낼 수 있습니다.
  • Fragment Wiki
dataset = ParquetDataset('./data', use_legacy_dataset=False)

for frag in dataset.fragments:
    for batch in frag.to_batches():
        df = batch.to_pandas()
        row = batch.take(pa.array([0]))  # 특정 row 를 꺼낼 수 있음
        
        print('num rows    :', batch.num_rows)
        print('Pandas shape:', df.shape)
        print(row['gender'])
num rows    : 1000
Pandas shape: (1000, 13)
[
  "Female"
]
	registration_dttm	id	first_name	last_name	email	gender	ip_address	cc	country	birthdate	salary	title	comments
0	2016-02-03 07:55:29	1	Amanda	Jordan	ajordan0@com.com	Female	1.197.201.2	6759521864920116	Indonesia	3/8/1971	49756.53	Internal Auditor	1E+02

위에서 설명한대로, 좀 더 random access 가 필요하다고 한다면.. 저라면 array안에 메타 정보를 넣어놓고..
binary search 를 통해서 문제를 해결하겠습니다.

idx = 0
parquet_indices = []
for frag in dataset.fragments:
    parquet_file = ParquetFile(frag.path)
    for i, row_group in enumerate(frag.row_groups):
        start_idx = idx
        end_idx = idx + row_group.num_rows
        parquet_indices.append((i, start_idx, end_idx, parquet_file, row_group.id, row_group.num_rows))
        idx += row_group.num_rows

나중에 binary search로 찾은뒤에 다음 예문처럼 꺼내 쓸수 있습니다.
포인트는 read_row_group 에서 row group 의 id를 사용시 parquet file의 일부분만 쪼개서 읽어 올 수 있습니다.

pf = ParquetFile(frag.path)
table = pf.read_row_group(0)

2.3 Pytorch Dataset

링크 에 위의 예제를 사용한 코드를 넣어놨습니다.