Athena 만으로 nested JSON을 파헤쳐 보자

JSON 데이터를 RDB의 테이블로 만들기 위해서는 json schema 그대로 nested 된 자료형을 명시하는 것이 정석이지만

CREATE EXTERNAL TABLE financials_raw (
	symbol string, 
	financials 
	array< 
	    struct<
	        reportdate: string, 
	        grossprofit: bigint, 
	        totalrevenue: bigint, 
	        totalcash: bigint, 
	        totaldebt: bigint, 
	        researchanddevelopment: bigint
	    >
	>,
	...
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 's3://athena-json/~~~~~'

실무 시 접하는 대부분의 JSON은 schema가 복잡하다보니 이를 처리하는 데에는 많은 시간과 노력이 필요합니다.
실제 예를 들어가며 말씀을 이어갈텐데요, 예시에 사용될 데이터는 shopify의 주문 이력이며 형태는 다음과 같습니다.

root
 |-- billing_address: struct (nullable = true)
...
 |-- cart_token: string (nullable = true)
 |-- checkout_id: long (nullable = true)
 |-- checkout_token: string (nullable = true)
...
 |-- created_at: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- current_subtotal_price: string (nullable = true)
 |-- current_subtotal_price_set: struct (nullable = true)
...
 |-- current_total_discounts: string (nullable = true)
 |-- current_total_discounts_set: struct (nullable = true)
...
 |-- current_total_price: string (nullable = true)
 |-- current_total_price_set: struct (nullable = true)
...
 |-- current_total_tax: string (nullable = true)
 |-- current_total_tax_set: struct (nullable = true)
...
 |-- customer: struct (nullable = true)
 |    |-- accepts_marketing: boolean (nullable = true)
 |    |-- accepts_marketing_updated_at: string (nullable = true)
...
 |    |-- email: string (nullable = true)
 |    |-- first_name: string (nullable = true)
 |    |-- id: long (nullable = true)
...
 |    |-- marketing_opt_in_level: string (nullable = true)
...
 |-- id: long (nullable = true)
 |-- line_items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
...
 |    |    |-- id: long (nullable = true)
 |    |    |-- name: string (nullable = true)
...
 |    |    |-- price: string (nullable = true)
...
 |    |    |-- quantity: long (nullable = true)
...
 |-- name: string (nullable = true)
 |-- order_number: long (nullable = true)
 ...
 |-- shipping_address: struct (nullable = true)
...
 |-- taxes_included: boolean (nullable = true)
...
 |-- total_discounts: string (nullable = true)
 |-- total_discounts_set: struct (nullable = true)
...
 |-- total_line_items_price: string (nullable = true)
 |-- total_line_items_price_set: struct (nullable = true)
...
 |-- total_price: string (nullable = true)
 |-- total_price_set: struct (nullable = true)
...
 |-- total_price_usd: string (nullable = true)
 |-- total_shipping_price_set: struct (nullable = true)
...
 |-- total_tax: string (nullable = true)
 |-- total_tax_set: struct (nullable = true)
...
 |-- updated_at: string (nullable = true)
 |-- user_id: string (nullable = true)

예시에서는 필요한 것들만 추스려 보여드렸지만 실제 컬럼은 500여개 가까이 되고 또 많은 컬럼이 struct, array 형태로 중첩되어(nested) 있다보니 이를 위해 create 문을 만들기도 또 table을 구성하기에도 어려움이 많습니다. 그래서 대부분의 데이터 조직에서 이런 귀찮음 때문에 ETL 작업은 spark에서 dataframe으로 해소하고 있을 것으로 예상하고 있습니다.

(spark에서의 처리 꿀팁에 대해 좀 더 말씀드리면 spark에서 JSON을 불러와 s3에 parquet로 저장한 후 athena의 DDL문에서 like parquet ~ 구문을 사용하면 스키마를 일일히 입력할 필요가 없습니다. 여기에 처리 용량이 크지 않다면 lambda와 glue의 조합으로 EMR을 띄우지 않고도 처리가능 할테구요.)

그렇다보니 분석가가 각 상품의 구매금액을 알기 위해서는
(json’s root→line_items → item[n].price 찾기 위해서는)

(1) 정보계 조직에게 정규화 작업을 요청하여 BI툴 등을 통해 접근하거나
(2) adhoc 데이터 추출을 별도 요청하여 엑셀파일 형태 등으로 공유받거나
(3) 직접 빅데이터 시스템에 접속하여 JSON을 파싱하여 금액 부분을 추출하거나

하는 식의 방식으로 진행될 것 같습니다.

(1)은 리소스가 많이 들어가기 때문에 조직의 정규작업에 반영하는 등의 의사결정이 필요할 정도로 부담스러운 일이고 (2)는 업무의 우선순위에서 밀리는 요청으로 간주되는 경우가 많아 적절한 시점에 데이터를 공급받기 어렵고 (3)은 시스템 접근권한과 더불어 데이터 엔지니어링 소양이 있어야 가능하기 때문에 사실상 불가능 합니다.

이 때 제가 접근하는 방식을 소개해 드려볼텐데요.
컨셉은 “미리 만들어 놓지 말고 필요한 시점에 필요한 필드만 쉽게 빼오자!” 입니다.​

예를들어 고객의 LTV와 상품추천 등을 작업을 수행하려면 “고객ID, 주문일자, 주문번호, 주문금액, 상품”이 필요합니다.
위의 방대한 Shopify json에서 해당 필드만 추스려 athena에 table을 만들어 보겠습니다.

create external table order_json_raw (
    id string,
    created_at string,
    updated_at string,
    total_price string,
    line_items string
)
partitioned by (
    year string,
    month string,
    day string
)
row format serde 'org.openx.data.jsonserde.JsonSerDe'
with serdeproperties (
    'case.insensitive'='false'
)
location 's3://airguy/shopify_json_order_sample/'

msck repair table order_json_raw
or
alter table order_json_raw add partition(~) location '~'

JSON 파일을 역직렬화 하면서 JSON 내의 속성명 중 id, created_at, updated_at, total_price, line_items만을 사용하여 테이블을 생성하는 구문입니다.

조회해보면 필요한 항목만 잘 추스려 졌습니다.

다만 line_items(장바구니)는 자세히 살펴보면 함께 구매한 상품들이 array<struct<>> 구조로 구성되어 있어서 후속 처리가 필요해 보입니다.

WITH shopify_order AS (
    select
        *
    from (
        select 
            id, created_at, updated_at, total_price, 
            cast(json_extract(line_items, '$') as array(map(varchar, json))) as line_items_array,
            row_number() over(partition by id order by updated_at desc) as seq
        from
            order_json_raw
    ) a
    where
        seq = 1
)

중복되는 주문데이터가 존재할 경우 가장 최신의 것만 사용하며 (row_number & seq=1) string으로 선언된 line_items을 json으로 변경한 후 각 속성을 key→value구조(map)로 변경하는 구문입니다. value의 자료형은 다양한 모양이 나올 수 있으므로 json으로 선언 했습니다. 사용하는 시점에 cast 하면서 사용하겠습니다.

​line_items을 string → json → array<map>의 과정으로 변경하면서 iterable하게 만들었으니 이제 장바구니를 아이템 별로 쪼개어(unnest) 보겠습니다.

이렇게 쪼개진 데이터는 map형태이기에 아래 쿼리 처럼 map[key] => value 형식으로 조회하거나 참조할 수 있습니다.

select
    id, created_at, updated_at,
    line_item['name'] as item_name,
    line_item['product_id'] as item_id,
    cast(line_item['price'] as double) as item_price,
    cast(line_item['quantity'] as integer) as item_quantity
from
    shopify_order cross join unnest(line_items_array) as t(line_item)

한 주문을 아이템 갯수만큼 분할해 놨으며 id를 기준으로 다시 합칠 수도 있습니다. (array_agg)
이제 전처리가 완료되어 필요한 필드만 단순한 형태로 추스려 가독성을 높였으니 주문관련 AI 학습용 데이터로 사용할 수 있게 되었습니다.​

HADOOP 클러스터나 EMR을 구성하여 로그를 집중시킨 후 이곳에서 SPARK 프로그래밍을 하여 ETL 작업을 거친 후 테이블화하여 mart에 적재하는, 많은 비용과 시간이 소요되는 빅데이터 과정을 “Cloud Native”환경에서 단지 4개의 쿼리와 몇 $의 비용으로 처리해 봤습니다.

또 최소한의 권한만 부여받으면 되는 외부참조(external table)여서 정보 시스템에서 흔히 경험하는 “휴먼에러”로부터 안전하고 조회 시 알아서 분산 처리되기에 무거운 작업이라도 기존 환경에 영향을 미치지 않는, 클라우드의 장점을 충분히 활용할 수도 있습니다.

이를 위해 사전에 한 일(아마도 정보시스템을 담당하시는 분들이 하는 일) 이라곤 정해진 디렉토리 형식(파티션)에 맞춰 파일을 S3에 저장하는 것 뿐이였습니다. 이 과정도 필수는 아니나 이렇게 하면 쿼리 시 full scan을 회피하여 비용 또한 상당히 절감할 수 있습니다.

특히 데이터 엔지니어링 전문가가 아니여도 충분히 할 수 있구요!!!!

말씀드리고나니 JAVA 혹은 SPARK의 Lazy Evaluation이 연상되네요.
데이터를 활용하기 위한 과정에는 DW→Stage→Mart 등을 설계하며 나가는 전통적인 방식도 있겠지만, 또 이러한 전통적인 방식이 필수인 곳이 있겠지만 다른 한 편에서는 빠르게 MVP를 만들고 살을 붙여나가며 대응하는 방식도 급변하는 환경에 필요한 듯하여 이에 어울리는 모델링 과정을 정리해 봤습니다.

조금이나마 도움이 되셨기를 바라며 긴 글 읽어주셔서 감사합니다.