一、spark1.5内置函数
在Spark 1.5.x版本,增加了一系列内置函数到DataFrame API中,并且实现了code-generation的优化。与普通的函数不同,DataFrame的函数并不会执行后立即返回一个结果值,而是返回一个Column对象,用于在并行作业中进行求值。Column可以用在DataFrame的操作之中,比如select,filter,groupBy等。函数的输入值,也可以是Column。
种类 | 函数 |
聚合函数 | approxCountDistinct, avg, count, countDistinct, first, last, max, mean, min, sum, sumDistinct |
集合函数 | array_contains, explode, size, sort_array |
日期/时间函数 | 日期时间转换 unix_timestamp, from_unixtime, to_date, quarter, day, dayofyear, weekofyear, from_utc_timestamp, to_utc_timestamp 从日期时间中提取字段 year, month, dayofmonth, hour, minute, second |
日期/时间函数 | 日期/时间计算 datediff, date_add, date_sub, add_months, last_day, next_day, months_between 获取当前时间等 current_date, current_timestamp, trunc, date_format |
数学函数 | abs, acros, asin, atan, atan2, bin, cbrt, ceil, conv, cos, sosh, exp, expm1, factorial, floor, hex, hypot, log, log10, log1p, log2, pmod, pow, rint, round, shiftLeft, shiftRight, shiftRightUnsigned, signum, sin, sinh, sqrt, tan, tanh, toDegrees, toRadians, unhex |
混合函数 | array, bitwiseNOT, callUDF, coalesce, crc32, greatest, if, inputFileName, isNaN, isnotnull, isnull, least, lit, md5, monotonicallyIncreasingId, nanvl, negate, not, rand, randn, sha, sha1, sparkPartitionId, struct, when |
字符串函数 | ascii, base64, concat, concat_ws, decode, encode, format_number, format_string, get_json_object, initcap, instr, length, levenshtein, locate, lower, lpad, ltrim, printf, regexp_extract, regexp_replace, repeat, reverse, rpad, rtrim, soundex, space, split, substring, substring_index, translate, trim, unbase64, upper |
窗口函数 | cumeDist, denseRank, lag, lead, ntile, percentRank, rank, rowNumber |
二、案例
案例实战:根据每天的用户访问日志和用户购买日志,统计每日的uv和销售额
1、UV案例scala实现
package cn.spark.study.sqlimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.sql.SQLContextimport org.apache.spark.sql.Rowimport org.apache.spark.sql.types.StructTypeimport org.apache.spark.sql.types.StructFieldimport org.apache.spark.sql.types.StringTypeimport org.apache.spark.sql.types.IntegerTypeimport org.apache.spark.sql.functions._object DailyUV { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("DailyUV") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) // 要使用Spark SQL的内置函数,就必须在这里导入SQLContext下的隐式转换 import sqlContext.implicits._ // 构造用户访问日志数据,并创建DataFrame // 模拟用户访问日志,日志用逗号隔开,第一列是日期,第二列是用户id val userAccessLog = Array( "2019-08-04,1122", "2019-08-04,1122", "2019-08-04,1123", "2019-08-04,1124", "2019-08-04,1124", "2019-08-05,1122", "2019-08-05,1121", "2019-08-05,1123", "2019-08-05,1123"); val userAccessLogRDD = sc.parallelize(userAccessLog, 5) // 将模拟出来的用户访问日志RDD,转换为DataFrame // 首先,将普通的RDD,转换为元素为Row的RDD // String到Int : toInt val userAccessLogRowRDD = userAccessLogRDD .map { log => Row(log.split(",")(0), log.split(",")(1).toInt) } // 构造DataFrame的元数据 // 将一个RDD转换为DataFrame,这一步经常需要生成一个StructType来生成DataFrame的schema // 通过StructType直接指定每个字段的schema val structType = StructType(Array( StructField("date", StringType, true), StructField("userid", IntegerType, true))) // 使用SQLContext创建DataFrame val userAccessLogRowDF = sqlContext.createDataFrame(userAccessLogRowRDD, structType) // 这里讲解一下uv的基本含义和业务 // 每天都有很多用户来访问,但是每个用户可能每天都会访问很多次 // 所以,uv,指的是,对用户进行去重以后的访问总数 // 这里,正式开始使用Spark 1.5.x版本提供的最新特性,内置函数,countDistinct // 讲解一下聚合函数的用法 // 首先,对DataFrame调用groupBy()方法,对某一列进行分组 // 然后,调用agg()方法 ,第一个参数,必须传入之前在groupBy()方法中出现的字段,前面要写一个单引号 // 第二个参数,传入countDistinct、sum、first等,Spark提供的内置函数 // 内置函数中,传入的参数,也是用单引号作为前缀的,其他的字段 userAccessLogRowDF.groupBy("date") .agg('date, countDistinct('userid)) .map { row => Row(row(1), row(2)) } .collect() .foreach(println) }}
2、销售额案例scala实现
package cn.spark.study.sqlimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.sql.SQLContextimport org.apache.spark.sql.Rowimport org.apache.spark.sql.types.StructTypeimport org.apache.spark.sql.types.StructFieldimport org.apache.spark.sql.types.StringTypeimport org.apache.spark.sql.types.DoubleTypeimport org.apache.spark.sql.functions._/** * @author Administrator */ object DailySale { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local") .setAppName("DailySale") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ // 说明一下,业务的特点 // 实际上呢,我们可以做一个,单独统计网站登录用户的销售额的统计 // 有些时候,会出现日志的上报的错误和异常,比如日志里丢了用户的信息,那么这种,我们就一律不统计了 // 模拟数据 val userSaleLog = Array("2019-08-04,55.05,1122", "2019-08-04,23.15,1133", "2019-08-04,15.20,", "2019-08-05,56.05,1144", "2019-08-05,78.87,1155", "2019-08-05,113.02,1123") val userSaleLogRDD = sc.parallelize(userSaleLog, 5) // 进行有效销售日志的过滤 val filteredUserSaleLogRDD = userSaleLogRDD .filter { log => if (log.split(",").length == 3) true else false } val userSaleLogRowRDD = filteredUserSaleLogRDD .map { log => Row(log.split(",")(0), log.split(",")(1).toDouble) } val structType = StructType(Array( StructField("date", StringType, true), StructField("sale_amount", DoubleType, true))) val userSaleLogDF = sqlContext.createDataFrame(userSaleLogRowRDD, structType) // 开始进行每日销售额的统计 userSaleLogDF.groupBy("date") .agg('date, sum('sale_amount)) .map { row => Row(row(1), row(2)) } .collect() .foreach(println) } }