UDF (User defined functions) and UDAF (User defined aggregate functions) are key components of big data languages such as Pig and Hive. They allow to extend the language constructs to do adhoc processing on distributed dataset. Previously I have blogged about how to write custom UDF/UDAF in Pig () and Hive(Part & ) . In this post I will focus on writing custom UDF in spark. UDF and UDAF is fairly new feature in spark and was just released in Spark 1.5.1. So its still in evolution stage and quite limited on things you can do, especially when trying to write generic UDAFs. I will talk about its current limitations later on.
As a motivating example assume we are given some student data containing student’s name, subject and score and we want to convert numerical score into ordinal categories based on the following logic:
- A –> if score >= 80
- B –> if score >= 60
- C –> if score >= 35
- D –> otherwise
Below is the relevant python code if you are using pyspark.
# Generate Random Dataimport itertoolsimport randomstudents = ['John', 'Mike','Matt']subjects = ['Math', 'Sci', 'Geography', 'History']random.seed(1)data = [] for (student, subject) in itertools.product(students, subjects): data.append((student, subject, random.randint(0, 100))) # Create Schema Objectfrom pyspark.sql.types import StructType, StructField, IntegerType, StringTypeschema = StructType([ StructField("student", StringType(), nullable=False), StructField("subject", StringType(), nullable=False), StructField("score", IntegerType(), nullable=False) ]) # Create DataFrame from pyspark.sql import HiveContextsqlContext = HiveContext(sc)rdd = sc.parallelize(data)df = sqlContext.createDataFrame(rdd, schema) # Define udffrom pyspark.sql.functions import udfdef scoreToCategory(score): if score >= 80: return 'A' elif score >= 60: return 'B' elif score >= 35: return 'C' else: return 'D' udfScoreToCategory=udf(scoreToCategory, StringType())df.withColumn("category", udfScoreToCategory("score")).show(10)
2-10 is the basic python stuff. We are generating a random dataset that looks something like this:
STUDENT | SUBJECT | SCORE |
---|---|---|
John | Math | 13 |
… | … | … |
Mike | Sci | 45 |
Mike | Geography | 65 |
… | … | … |
Next line 12-24 are dealing with constructing the dataframe. The main part of the code is in line 27-34. We first define our function in a normal python way.
Below is scala example of the same:
// Construct Dummy Dataimport util.Randomimport org.apache.spark.sql.Rowimplicit class Crossable[X](xs: Traversable[X]) { def cross[Y](ys: Traversable[Y]) = for { x <- xs; y <- ys } yield (x, y)}val students = Seq("John", "Mike","Matt")val subjects = Seq("Math", "Sci", "Geography", "History")val random = new Random(1)val data =(students cross subjects).map{x => Row(x._1, x._2,random.nextInt(100))}.toSeq // Create Schema Objectimport org.apache.spark.sql.types.{StructType, StructField, IntegerType, StringType}val schema = StructType(Array( StructField("student", StringType, nullable=false), StructField("subject", StringType, nullable=false), StructField("score", IntegerType, nullable=false) )) // Create DataFrame import org.apache.spark.sql.hive.HiveContextval rdd = sc.parallelize(data)val df = sqlContext.createDataFrame(rdd, schema) // Define udfimport org.apache.spark.sql.functions.udfdef udfScoreToCategory=udf((score: Int) => { score match { case t if t >= 80 => "A" case t if t >= 60 => "B" case t if t >= 35 => "C" case _ => "D" }})df.withColumn("category", udfScoreToCategory(df("score"))).show(10)