博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark: Custom UDF Example
阅读量:7153 次
发布时间:2019-06-29

本文共 3640 字,大约阅读时间需要 12 分钟。

  hot3.png

UDF (User defined functions) and UDAF (User defined aggregate functions) are key components of big data languages such as Pig and Hive. They allow to extend the language constructs to do adhoc processing on distributed dataset. Previously I have blogged about how to write custom UDF/UDAF in Pig () and Hive(Part & ) . In this post I will focus on writing custom UDF in spark. UDF and UDAF is fairly new feature in spark and was just released in Spark 1.5.1. So its still in evolution stage and quite limited on things you can do, especially when trying to write generic UDAFs. I will talk about its current limitations later on. 

As a motivating example assume we are given some student data containing student’s name, subject and score and we want to convert numerical score into ordinal categories based on the following logic:

  • A –> if score >= 80
  • B –> if score >= 60
  • C –> if score >= 35
  • D –> otherwise

Below is the relevant python code if you are using pyspark.

# Generate Random Dataimport itertoolsimport randomstudents = ['John', 'Mike','Matt']subjects = ['Math', 'Sci', 'Geography', 'History']random.seed(1)data = [] for (student, subject) in itertools.product(students, subjects):    data.append((student, subject, random.randint(0, 100))) # Create Schema Objectfrom pyspark.sql.types import StructType, StructField, IntegerType, StringTypeschema = StructType([            StructField("student", StringType(), nullable=False),            StructField("subject", StringType(), nullable=False),            StructField("score", IntegerType(), nullable=False)    ]) # Create DataFrame from pyspark.sql import HiveContextsqlContext = HiveContext(sc)rdd = sc.parallelize(data)df = sqlContext.createDataFrame(rdd, schema) # Define udffrom pyspark.sql.functions import udfdef scoreToCategory(score):    if score >= 80: return 'A'    elif score >= 60: return 'B'    elif score >= 35: return 'C'    else: return 'D' udfScoreToCategory=udf(scoreToCategory, StringType())df.withColumn("category", udfScoreToCategory("score")).show(10)

 2-10 is the basic python stuff. We are generating a random dataset that looks something like this:

STUDENT SUBJECT SCORE
John Math 13
Mike Sci 45
Mike Geography 65

Next line 12-24 are dealing with constructing the dataframe. The main part of the code is in line 27-34. We first define our function in a normal python way.

Below is scala example of the same:

// Construct Dummy Dataimport util.Randomimport org.apache.spark.sql.Rowimplicit class Crossable[X](xs: Traversable[X]) {  def cross[Y](ys: Traversable[Y]) = for { x <- xs; y <- ys } yield (x, y)}val students = Seq("John", "Mike","Matt")val subjects = Seq("Math", "Sci", "Geography", "History")val random = new Random(1)val data =(students cross subjects).map{x  =>  Row(x._1, x._2,random.nextInt(100))}.toSeq // Create Schema Objectimport org.apache.spark.sql.types.{StructType, StructField, IntegerType, StringType}val schema = StructType(Array(            StructField("student", StringType, nullable=false),            StructField("subject", StringType, nullable=false),            StructField("score", IntegerType, nullable=false)    )) // Create DataFrame import org.apache.spark.sql.hive.HiveContextval rdd = sc.parallelize(data)val df = sqlContext.createDataFrame(rdd, schema) // Define udfimport org.apache.spark.sql.functions.udfdef udfScoreToCategory=udf((score: Int) => {        score match {        case t if t >= 80 => "A"        case t if t >= 60 => "B"        case t if t >= 35 => "C"        case _ => "D"    }})df.withColumn("category", udfScoreToCategory(df("score"))).show(10)

 

转载于:https://my.oschina.net/qzhli/blog/1573021

你可能感兴趣的文章
反转指向字符串反转C++实现源码(带测试用例)
查看>>
JAVA中basepath的应用-相对路径问题
查看>>
优化网站设计(十一):避免重定向
查看>>
myeclipse连接数据库
查看>>
重部署与热部署杂记
查看>>
C++中#if #ifdef 的作用
查看>>
SQLServer - 约束
查看>>
在交换机上划分Vlan
查看>>
DB2 1069错误 由于登陆失败而无法启动服务的解决方法
查看>>
反射发出 Emit
查看>>
Apache Tajo:一个运行在YARN上支持SQL的分布式数据仓库
查看>>
使用SQL语句清空数据库所有表的数据
查看>>
[置顶] 搭建一个流媒体服务器--引子
查看>>
poj1504
查看>>
什么是ICMPv4协议?
查看>>
Android开发入门必看的知识体系
查看>>
让你的linux操作系统更加安全【转】
查看>>
我是如何写作一本软件+哲学式的书籍的(上)
查看>>
Storm-源码分析- Disruptor在storm中的使用
查看>>
vim 的小幅移动
查看>>