博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
43、内置函数及每日uv、销售额统计案例
阅读量:5220 次
发布时间:2019-06-14

本文共 5687 字,大约阅读时间需要 18 分钟。

一、spark1.5内置函数

在Spark 1.5.x版本,增加了一系列内置函数到DataFrame API中,并且实现了code-generation的优化。与普通的函数不同,DataFrame的函数并不会执行后立即返回一个结果值,而是返回一个Column对象,用于在并行作业中进行求值。Column可以用在DataFrame的操作之中,比如select,filter,groupBy等。函数的输入值,也可以是Column。

   种类

                                                           函数

聚合函数

approxCountDistinct, avg, count, countDistinct, first, last, max, mean, min, sum, sumDistinct

集合函数

array_contains, explode, size, sort_array

日期/时间函数

日期时间转换

unix_timestamp, from_unixtime, to_date, quarter, day, dayofyear, weekofyear, from_utc_timestamp, to_utc_timestamp
从日期时间中提取字段
year, month, dayofmonth, hour, minute, second

日期/时间函数

日期/时间计算

datediff, date_add, date_sub, add_months, last_day, next_day, months_between
获取当前时间等
current_date, current_timestamp, trunc, date_format

数学函数

abs, acros, asin, atan, atan2, bin, cbrt, ceil, conv, cos, sosh, exp, expm1, factorial, floor, hex, hypot, log, log10, log1p, log2, pmod,

pow, rint, round, shiftLeft, shiftRight, shiftRightUnsigned, signum, sin, sinh, sqrt, tan, tanh, toDegrees, toRadians, unhex

混合函数

array, bitwiseNOT, callUDF, coalesce, crc32, greatest, if, inputFileName, isNaN, isnotnull, isnull, least, lit, md5, monotonicallyIncreasingId,

nanvl, negate, not, rand, randn, sha, sha1, sparkPartitionId, struct, when

字符串函数

ascii, base64, concat, concat_ws, decode, encode, format_number, format_string, get_json_object, initcap, instr, length, levenshtein, locate, lower, lpad, ltrim,

printf, regexp_extract, regexp_replace, repeat, reverse, rpad, rtrim, soundex, space, split, substring, substring_index, translate, trim, unbase64, upper

窗口函数

cumeDist, denseRank, lag, lead, ntile, percentRank, rank, rowNumber

二、案例

案例实战:根据每天的用户访问日志和用户购买日志,统计每日的uv和销售额

1、UV案例scala实现

package cn.spark.study.sqlimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.sql.SQLContextimport org.apache.spark.sql.Rowimport org.apache.spark.sql.types.StructTypeimport org.apache.spark.sql.types.StructFieldimport org.apache.spark.sql.types.StringTypeimport org.apache.spark.sql.types.IntegerTypeimport org.apache.spark.sql.functions._object DailyUV {  def main(args: Array[String]): Unit = {    val conf = new SparkConf().setMaster("local").setAppName("DailyUV")    val sc = new SparkContext(conf)    val sqlContext = new SQLContext(sc)        // 要使用Spark SQL的内置函数,就必须在这里导入SQLContext下的隐式转换    import sqlContext.implicits._        // 构造用户访问日志数据,并创建DataFrame        // 模拟用户访问日志,日志用逗号隔开,第一列是日期,第二列是用户id    val userAccessLog = Array(        "2019-08-04,1122",        "2019-08-04,1122",        "2019-08-04,1123",        "2019-08-04,1124",        "2019-08-04,1124",        "2019-08-05,1122",        "2019-08-05,1121",        "2019-08-05,1123",        "2019-08-05,1123");        val userAccessLogRDD = sc.parallelize(userAccessLog, 5)        // 将模拟出来的用户访问日志RDD,转换为DataFrame    // 首先,将普通的RDD,转换为元素为Row的RDD      // String到Int : toInt    val userAccessLogRowRDD = userAccessLogRDD        .map { log => Row(log.split(",")(0), log.split(",")(1).toInt) }        // 构造DataFrame的元数据    // 将一个RDD转换为DataFrame,这一步经常需要生成一个StructType来生成DataFrame的schema    // 通过StructType直接指定每个字段的schema    val structType = StructType(Array(        StructField("date", StringType, true),        StructField("userid", IntegerType, true)))          // 使用SQLContext创建DataFrame    val userAccessLogRowDF = sqlContext.createDataFrame(userAccessLogRowRDD, structType)          // 这里讲解一下uv的基本含义和业务    // 每天都有很多用户来访问,但是每个用户可能每天都会访问很多次    // 所以,uv,指的是,对用户进行去重以后的访问总数        // 这里,正式开始使用Spark 1.5.x版本提供的最新特性,内置函数,countDistinct    // 讲解一下聚合函数的用法    // 首先,对DataFrame调用groupBy()方法,对某一列进行分组    // 然后,调用agg()方法 ,第一个参数,必须传入之前在groupBy()方法中出现的字段,前面要写一个单引号    // 第二个参数,传入countDistinct、sum、first等,Spark提供的内置函数    // 内置函数中,传入的参数,也是用单引号作为前缀的,其他的字段    userAccessLogRowDF.groupBy("date")        .agg('date, countDistinct('userid))          .map { row => Row(row(1), row(2)) }           .collect()        .foreach(println)      }}

2、销售额案例scala实现

package cn.spark.study.sqlimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.sql.SQLContextimport org.apache.spark.sql.Rowimport org.apache.spark.sql.types.StructTypeimport org.apache.spark.sql.types.StructFieldimport org.apache.spark.sql.types.StringTypeimport org.apache.spark.sql.types.DoubleTypeimport org.apache.spark.sql.functions._/** * @author Administrator */ object DailySale {    def main(args: Array[String]): Unit = {    val conf = new SparkConf()        .setMaster("local")        .setAppName("DailySale")    val sc = new SparkContext(conf)    val sqlContext = new SQLContext(sc)      import sqlContext.implicits._        // 说明一下,业务的特点    // 实际上呢,我们可以做一个,单独统计网站登录用户的销售额的统计    // 有些时候,会出现日志的上报的错误和异常,比如日志里丢了用户的信息,那么这种,我们就一律不统计了        // 模拟数据    val userSaleLog = Array("2019-08-04,55.05,1122",        "2019-08-04,23.15,1133",        "2019-08-04,15.20,",        "2019-08-05,56.05,1144",        "2019-08-05,78.87,1155",        "2019-08-05,113.02,1123")    val userSaleLogRDD = sc.parallelize(userSaleLog, 5)        // 进行有效销售日志的过滤    val filteredUserSaleLogRDD = userSaleLogRDD        .filter { log => if (log.split(",").length == 3) true else false }          val userSaleLogRowRDD = filteredUserSaleLogRDD        .map { log => Row(log.split(",")(0), log.split(",")(1).toDouble) }        val structType = StructType(Array(        StructField("date", StringType, true),        StructField("sale_amount", DoubleType, true)))        val userSaleLogDF = sqlContext.createDataFrame(userSaleLogRowRDD, structType)          // 开始进行每日销售额的统计    userSaleLogDF.groupBy("date")        .agg('date, sum('sale_amount))        .map { row => Row(row(1), row(2)) }        .collect()        .foreach(println)    }  }

转载于:https://www.cnblogs.com/weiyiming007/p/11304198.html

你可能感兴趣的文章
cf519D. A and B and Interesting Substrings(前缀和)
查看>>
.net framework
查看>>
一致性哈希算法(consistent hashing)(转)
查看>>
【BZOJ3518】点组计数 欧拉函数
查看>>
redis分布式锁
查看>>
cf(数学思维题)
查看>>
hdu1690(最短路floyd)
查看>>
并行编程OpenMP基础及简单示例
查看>>
深度学习的并行问题
查看>>
约数的计算
查看>>
005 Fiddler get请求
查看>>
再生龙软件使用
查看>>
手机开发-安卓手机的更新换代
查看>>
ElasticSearch1.7.1 安装
查看>>
异常记录
查看>>
Laravel 设置语言不生效的问题
查看>>
PHP知识梳理
查看>>
pointer of 2d array and address
查看>>
php 中间件
查看>>
【转】sizeof 总结
查看>>