SPARK에서 dataframe을 다루다보면 종종 UDF(User Defined Functions)를 사용해야 하는 경우 있습니다.
RDD에서는 map(lambda x: my_function(x))의 형태로 간단하게 사용할 수 있지만 dataframe에서는 추가적인 과정이 필요하다보니 이를 정리해 보았습니다.
def convertFirstcharToUppercase(x) :
res_x = ""
for idx, token in enumerate(res_x.split(' ')) :
if idx > 0 :
res_x = res_x + ' '
res_x = res_x + token[0:1].upper() + token[1:]
return res_x
위와 같이 단어의 첫 글자를 대문자로 변경하는 python 함수를 만들어 보았는데요. 이것을 SPARK의 dataframe에서 쓰려면 PySpark SQL의 UDF로 변경해야 합니다.
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
...
my_udf = F.udf(lambda x: convertFirstcharToUppercase(x), StringType())
여기서 StringType()은 udf의 default return value여서 다음과 같이 생략이 가능합니다.
my_udf = F.udf(lambda x: convertFirstcharToUppercase(x))
이후로는 아래와 같이 호출하여 사용할 수 있습니다.
df.withColumn("UserName", my_udf(F.col("username"))).show()
#혹은
df.select("username", my_udf(F.col("username")).alias("UserName")).show()
또 annotation으로 좀 더 코드를 단순화 할수 있습니다.
@udf(returnType=StringType())
def convertFirstcharToUppercase(x) :
res_x = ""
for idx, token in enumerate(res_x.split(' ')) :
if idx > 0 :
res_x = res_x + ' '
res_x = res_x + token[0:1].upper() + token[1:]
return res_x
df.withColumn("UserName", convertFirstcharToUppercase(F.col("username"))).show()
그리고 마지막으로 SQL에서는 아래처럼 query string에서도 udf를 인식할 수 있도록 등록 후 사용할 수 있습니다.
spark.udf.register("my_udf", convertFirstcharToUppercase, StringType())
df.createOrReplaceTempView("our_username_table")
spark.sql("select username, my_udf(username) as UserName from our_username_table") \
.show()