PySpark dataframe에서 UDF 사용하기

SPARK에서 dataframe을 다루다보면 종종 UDF(User Defined Functions)를 사용해야 하는 경우 있습니다.

RDD에서는 map(lambda x: my_function(x))의 형태로 간단하게 사용할 수 있지만 dataframe에서는 추가적인 과정이 필요하다보니 이를 정리해 보았습니다.

def convertFirstcharToUppercase(x) :

	res_x = ""
	for idx, token in enumerate(res_x.split(' ')) :
		if idx > 0 :
			res_x = res_x + ' '
		res_x = res_x + token[0:1].upper() + token[1:]

return res_x

위와 같이 단어의 첫 글자를 대문자로 변경하는 python 함수를 만들어 보았는데요. 이것을 SPARK의 dataframe에서 쓰려면 PySpark SQL의 UDF로 변경해야 합니다.

import pyspark.sql.functions as F
from pyspark.sql.types import StringType
...

my_udf = F.udf(lambda x: convertFirstcharToUppercase(x), StringType())

여기서 StringType()은 udf의 default return value여서 다음과 같이 생략이 가능합니다.

my_udf = F.udf(lambda x: convertFirstcharToUppercase(x))

이후로는 아래와 같이 호출하여 사용할 수 있습니다.

df.withColumn("UserName", my_udf(F.col("username"))).show()

#혹은
df.select("username", my_udf(F.col("username")).alias("UserName")).show()

또 annotation으로 좀 더 코드를 단순화 할수 있습니다.

@udf(returnType=StringType())
def convertFirstcharToUppercase(x) :

	res_x = ""
	for idx, token in enumerate(res_x.split(' ')) :
		if idx > 0 :
			res_x = res_x + ' '
		res_x = res_x + token[0:1].upper() + token[1:]

return res_x

df.withColumn("UserName", convertFirstcharToUppercase(F.col("username"))).show()

그리고 마지막으로 SQL에서는 아래처럼 query string에서도 udf를 인식할 수 있도록 등록 후 사용할 수 있습니다.

spark.udf.register("my_udf", convertFirstcharToUppercase, StringType())
df.createOrReplaceTempView("our_username_table")

spark.sql("select username, my_udf(username) as UserName from our_username_table") \
.show()