当前位置: 首页 > 编程日记 > 正文

Spark SQL基本操作以及函数的使用

2019独角兽企业重金招聘Python工程师标准>>> hot3.png

引语:

本篇博客主要介绍了Spark SQL中的filter过滤数据、去重、集合等基本操作,以及一些常用日期函数,随机函数,字符串操作等函数的使用,并列编写了示例代码,同时还给出了代码当中用到的一些数据,放在最文章最后。

SparkSQL简介

Spark SQL是Spark生态系统中非常重要的组件,其前身为Shark。Shark是Spark上的数据仓库,最初设计成与Hive兼容,但是该项目于2014年开始停止开发,转向Spark SQL。Spark SQL全面继承了Shark,并进行了优化。 Spark SQL增加了SchemaRDD(即带有Schema信息的RDD),使用户可以在Spark SQL中执行SQL语句,数据既可以来自RDD,也可以来自Hive、HDFS、Cassandra等外部数据源,还可以是JSON格式的数据。Spark SQL目前支持Scala、Java、Python三种语言,支持SQL-92规范。

Spark SQL的优点

Spark SQL可以很好地支持SQL查询,一方面,可以编写Spark应用程序使用SQL语句进行数据查询,另一方面,也可以使用标准的数据库连接器(比如JDBC或ODBC)连接Spark进行SQL查询 。

Spark SQL基本操作

去重

distinct:根据每条数据进行完整去重。

dropDuplicates:根据字段去重。

package spark2x
​
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
​
/*** 类名  DistinctDemo* 作者   彭三青* 创建时间  2018-11-29 15:02* 版本  1.0* 描述: $ 去重操作:distinct、drop*/
​
object DistinctDemo {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[2]").appName("Operations").getOrCreate()import spark.implicits._
​val employeeDF: DataFrame = spark.read.json("E://temp/person.json")val employeeDS: Dataset[Employee] = employeeDF.as[Employee]
​println("--------------------distinct---------------------")// 根据每条数据进行完整的去重employeeDS.distinct().show()
​println("--------------------dropDuplicates---------------------")// 根据字段进行去重employeeDS.dropDuplicates(Seq("name")).show()}
}
case class Employee(name: String, age: Long, depId: Long, gender: String, salary: Double)

过滤

filter():括号里的参数可以是过滤函数、函数返回的Boolean值(为true则保留,false则过滤掉)、列名或者表达式。

except:过滤出当前DataSet中有,但在另一个DataSet中不存在的。

intersect:获取两个DataSet的交集。

提示:except和intersect使用的时候必须要是相同的实例,如果把另外一个的Employee换成一个同样的字段的Person类就会报错。

package spark2x
​
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
​
/*** 类名  FilterDemo* 作者   彭三青* 创建时间  2018-11-29 15:09* 版本  1.0* 描述: $*/
​
object FilterDemo {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[2]").appName("FilterDemo").getOrCreate()import spark.implicits._
​val employeeDF: DataFrame = spark.read.json("E://temp/employee.json")val employeeDS: Dataset[Employee] = employeeDF.as[Employee]val employee2DF: DataFrame = spark.read.json("E://temp/employee2.json")val employee2DS: Dataset[Employee] = employee2DF.as[Employee]
​println("--------------------employee--------------------")employeeDS.show()
​println("--------------------employee2--------------------")employee2DS.show()
​println("       ┏┓   ┏┓\n" +"     ┏┛┻━━━┛┻┓\n" +"   ┃       ┃\n" +"   ┃   ━   ┃\n" +"   ┃ ┳┛ ┗┳ ┃\n" +"   ┃       ┃\n" +"   ┃   ┻   ┃\n" +"   ┃       ┃\n" +"   ┗━┓   ┏━┛\n" +"     ┃   ┃\n" +"      ┃   ┃\n" +"     ┃   ┗━━━┓\n" +"     ┃       ┣┓\n" +"     ┃       ┏┛\n" +"     ┗┓┓┏━┳┓┏┛\n" +"      ┃┫┫ ┃┫┫\n" +"      ┗┻┛ ┗┻┛\n")
​println("-------------------------------------------------")
​// 如果参数返回true,就保留该元素,否则就过滤掉employeeDS.filter(employee => employee.age == 35).show()employeeDS.filter(employee => employee.age > 30).show()// 获取当前的DataSet中有,但是在另外一个DataSet中没有的元素employeeDS.except(employee2DS).show()// 获取两个DataSet的交集employeeDS.intersect(employee2DS).show()
​spark.stop()}
}
case class Employee(name: String, age: Long, depId: Long, gender: String, salary: Double)

集合

collect_set:将一个分组内指定字段的值都收集到一起,不去重

collect_list:讲一个分组内指定字段的值都收集到一起,会去重

package spark2x
​
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
​
/*** 类名  CollectSetAndList* 作者   彭三青* 创建时间  2018-11-29 15:24* 版本  1.0* 描述: $ collect_list、 collect_set*/
​
object CollectSetAndList {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[2]").appName("FilterDemo").getOrCreate()import spark.implicits._import org.apache.spark.sql.functions._
​val employeeDF: DataFrame = spark.read.json("E://temp/employee.json")val employeeDS: Dataset[Employee] = employeeDF.as[Employee]
​// collect_list:将一个分组内指定字段的值都收集到一起,不去重// collect_set:同上,但唯一区别是会去重employeeDS.groupBy(employeeDS("depId")).agg(collect_set(employeeDS("name")), collect_list(employeeDS("name"))).show()}
}
case class Employee(name: String, age: Long, depId: Long, gender: String, salary: Double)

joinWith和sort

package spark2x
​
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
​
/*** 类名  JoinAndSort* 作者   彭三青* 创建时间  2018-11-29 15:19* 版本  1.0* 描述: $*/
​
object JoinAndSort {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[2]").appName("FilterDemo").getOrCreate()import spark.implicits._
​val employeeDF: DataFrame = spark.read.json("E://temp/employee.json")val employeeDS: Dataset[Employee] = employeeDF.as[Employee]val departmentDF: DataFrame = spark.read.json("E://temp/department.json")val departmentDS: Dataset[Department] = departmentDF.as[Department]
​println("----------------------employeeDS----------------------")employeeDS.show()println("----------------------departmentDS----------------------")departmentDS.show()println("------------------------------------------------------------")
​// 等值连接employeeDS.joinWith(departmentDS, $"depId" === $"id").show()// 按照年龄进行排序,并降序排列employeeDS.sort($"age".desc).show()}
}
case class Department(id: Long, name: String)
case class Employee(name: String, age: Long, depId: Long, gender: String, salary: Double)

函数的使用

日期函数:

current_time():获取当前日期。

current_timestamp():获取当前时间戳。

数学函数

rand():生成0~1之间的随机数

round(e: column,scale: Int ):column列名,scala精确到小数点的位数。

round(e: column):一个参数默认精确到小数点1位。

字符串函数

concat_ws(seq: String, exprs: column*):字符串拼接。参数seq传入的拼接的字符,column传入的需要拼接的字符,可以指定多个列,不同列之间用逗号隔开。

package spark2x
​
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
​
/*** 类名  FunctionsDemo* 作者   彭三青* 创建时间  2018-11-29 15:56* 版本  1.0* 描述: $*/
​
object FunctionsDemo {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[2]").appName("Operations").getOrCreate()import spark.implicits._import org.apache.spark.sql.functions._
​val employeeDF: DataFrame = spark.read.json("E://temp/employee.json")val employeeDS: Dataset[Employee] = employeeDF.as[Employee]
​employeeDS.select(employeeDS("name"), current_date(), current_timestamp(),rand(), round(employeeDS("salary"), 2),// 取随机数,concat(employeeDS("gender"), employeeDS("age")),concat_ws("|", employeeDS("gender"), employeeDS("age"))).show()
​spark.stop()}
}
case class Employee(name: String, age: Long, depId: Long, gender: String, salary: Double)

数据

employee.json

{"name": "Leo", "age": 25, "depId": 1, "gender": "male", "salary": 20000.123}
{"name": "Marry", "age": 30, "depId": 2, "gender": "female", "salary": 25000}
{"name": "Jack", "age": 35, "depId": 1, "gender": "male", "salary": 15000}
{"name": "Tom", "age": 42, "depId": 3, "gender": "male", "salary": 18000}
{"name": "Kattie", "age": 21, "depId": 3, "gender": "female", "salary": 21000}
{"name": "Jen", "age": 30, "depId": 2, "gender": "female", "salary": 28000}
{"name": "Jen", "age": 19, "depId": 2, "gender": "male", "salary": 8000}
{"name": "Tom", "age": 42, "depId": 3, "gender": "male", "salary": 18000}
{"name": "XiaoFang", "age": 18, "depId": 3, "gender": "female", "salary": 58000}

employee2.json

{"name": "Leo", "age": 25, "depId": 1, "gender": "male", "salary": 20000.123}
{"name": "Marry", "age": 30, "depId": 2, "gender": "female", "salary": 25000}
{"name": "Jack", "age": 35, "depId": 1, "gender": "male", "salary": 15000}
{"name": "Tom", "age": 42, "depId": 3, "gender": "male", "salary": 18000}
{"name": "Kattie", "age": 21, "depId": 3, "gender": "female", "salary": 21000}
{"name": "Jen", "age": 30, "depId": 2, "gender": "female", "salary": 28000}

department.json

{"id": 1, "name": "Technical Department"}
{"id": 2, "name": "Financial Department"}
{"id": 3, "name": "HR Department"}

转载于:https://my.oschina.net/u/3875806/blog/2964314

相关文章:

OpenCV提取轮廓(去掉面积小的轮廓)

转自&#xff1a;http://www.kaixuela.net/?p23 #include <stdio.h> #include "cv.h" #include "cxcore.h" #include "highgui.h" #include <iostream> using namespace std; #pragma comment(lib,"cv.lib") #pra…

软工作业 5:词频统计——增强功能

一、基本信息 1.1 编译环境、项目名称、作者 1 #编译环境:python3.6 2 #项目名称&#xff1a;软工作业5-词频统计—增强功能 3 #作者&#xff1a;1613072055 潘博 4 # 1613072056 侯磊 1.2项目地址 本次作业地址: https://www.cnblogs.com/panboo/项目git地址: https://g…

Linux之文件权限管理

chmod ux转载于:https://www.cnblogs.com/chaoren399/archive/2013/03/11/2953727.html

如果三十年前有这些AI技术,可可西里的悲剧不会发生

作者 | 神经小姐姐来源 | HyperAI超神经&#xff08;ID&#xff1a;HyperAI&#xff09;而被盗猎者大量的非法捕杀。多种野生动物都处于濒临灭绝的局面&#xff0c;人工智能等技术&#xff0c;能够在帮助保护野生动物上&#xff0c;发挥比较大的作用&#xff0c;让我们能够生存…

Percona-Server-5.5.30安装

1、安装系统环境 yum install -y gcc gcc-c autoconf automake zlib* libxml* ncurses-devel libmcrypt* libtool-ltdl-devel* cmake bison 2、下载源码包 1 http://www.percona.com/downloads/ 2 3 wget -c http://www.percona.com/redir/downloads/Percona-Server-5.5/Perc…

OpenCV中SVM的使用

转自&#xff1a;http://download.csdn.net/download/gaogaogao124/3125857 略有改动&#xff1a; #include"stdafx.h" #include<opencv2/opencv.hpp> #include<cmath> #include<ctime> using namespace std; int _tmain(int argc,_TCHAR…

数据不够,用GAN来凑!

作者 | CV君来源 | 我爱计算机视觉&#xff08;ID&#xff1a;aicvml&#xff09;在计算机视觉领域&#xff0c;深度学习方法已全方位在各个方向获得突破&#xff0c;这从近几年CVPR 的论文即可看出。但这往往需要大量的标注数据&#xff0c;比如最著明的ImageNet数据集&#x…

MySQL的登陆错误:ERROR 1049 (42000): Unknown database 'root'

刚刚装上数据库的时候&#xff0c;直接按照这个格式就登陆上去了&#xff0c;突然莫名其妙登陆不上去了 但是现在突然死活登陆不上去了 于是拿着这个报错信息在网上找啊找&#xff0c;终于找了了错误的原因 -p和密码是连在一起的&#xff0c;赶紧一试&#xff0c;果然可以登陆&…

分布式缓存系统Memcached简介与实践

缘起: 在数据驱动的web开发中&#xff0c;经常要重复从数据库中取出相同的数据&#xff0c;这种重复极大的增加了数据库负载。缓存是解决这个问题的好办法。但是ASP.NET中的虽然已经可以实现对页面局部进行缓存&#xff0c;但还是不够灵活。此时Memcached或许是你想要的。Memca…

Windows7 libsvm库中grid.py的使用步骤

1、从http://www.csie.ntu.edu.tw/~cjlin/libsvm/下载最新的libsvm-3.12库(libsvm-3.12.tar.gz或libsvm-3.12.zip)&#xff0c;将其放到F:\libsvm文件夹下解压缩&#xff0c;生成一个libsvm-3.12文件夹&#xff1b; 2、从http://www.gnuplot.info/下载最新的gnuplot即gp460-wi…

基于GEMM实现的CNN底层算法被改?Google提出全新间接卷积算法

作者 | Marat Dukhan from Google Research译者 | 凯隐责编 | Jane出品 | AI科技大本营&#xff08;ID: rgznai100&#xff09;【导读】本文介绍的内容主要聚焦Google 的一项最新工作&#xff1a;改变基于 GEMM 实现的 CNN底层算法提出的新方法。通用矩阵乘法&#xff08;Gener…

共享内存跨进程通信

通过共享内存通信是最快的&#xff0c;不过既然是共享资源&#xff0c;那么就必须要有同步机制。 创建共享内存有两种方式shm和mmap的方式。 mmap是在磁盘上建立一个文件&#xff0c;每个进程地址空间中开辟出一块空间进行映射。而对于shm而言&#xff0c;shm每个进程最终会映射…

扶稳!四大步“上手”超参数调优教程,就等你出马了 | 附完整代码

作者 | Matthew Stewart译者 | Monanfei责编 | Jane出品 | AI科技大本营&#xff08;ID: rgznai100&#xff09;【导读】在本文中&#xff0c;我们将为大家介绍如何对神经网络的超参数进行优化调整&#xff0c;以便在 Beale 函数上获得更高性能&#xff0c;Beale 函数是评价优化…

读好书,写好程序

本人是做.NET开发的&#xff0c;以企业应用为主&#xff0c;以互联网为爱好&#xff0c;这里给大家推荐一些适合.NET程序员的书&#xff1a; 软件设计《企业应用架构模式》 Martin Fowler 的大作之一&#xff0c;总结了多种常见的企业应用架构模式&#xff0c;这些模式是脱离具…

SIFT特征点匹配中KD-tree与Ransac算法的使用

转自&#xff1a;http://blog.csdn.net/ijuliet/article/details/4471311 Step1:BBF算法&#xff0c;在KD-tree上找KNN。第一步做匹配咯~ 1.什么是KD-tree&#xff08;fromwiki&#xff09; K-Dimension tree&#xff0c;实际上是一棵平衡二叉树。 一般的KD-tree构造过程&a…

jQuery带缩略图的宽屏焦点图插件

在线演示 本地下载

追溯XLNet的前世今生:从Transformer到XLNet

作者丨李格映来源 | 转载自CSDN博客导读&#xff1a;2019 年 6 月&#xff0c;CMU 与谷歌大脑提出全新 XLNet&#xff0c;基于 BERT 的优缺点&#xff0c;XLNet 提出一种泛化自回归预训练方法&#xff0c;在 20 个任务上超过了 BERT 的表现&#xff0c;并在 18 个任务上取得了当…

微软MCITP系列课程

http://liushuo890.blog.51cto.com/5167996/d-1转载于:https://blog.51cto.com/showcart/1156172

在Ubuntu11.10中安装配置OpenCV2.3.1和CodeBlocks

1、 打开终端&#xff1b; 2、 执行指令&#xff0c;删除ffmpeg and x264旧版本&#xff1a;sudo apt-get removeffmpeg x264 libx264-dev 3、下载安装x264和ffmpeg所有的依赖&#xff1a;sudo apt-get update sudo apt-get installbuild-essential checkinstall git cmake…

深入浅出Rust Future - Part 1

本文译自Rust futures: an uneducated, short and hopefully not boring tutorial - Part 1&#xff0c;时间&#xff1a;2018-12-02&#xff0c;译者:motecshine, 简介&#xff1a;motecshine 欢迎向Rust中文社区投稿,投稿地址,好文将在以下地方直接展示 Rust中文社区首页Rust…

cmd 修改文件属性

现在的病毒基本都会采用一种方式&#xff0c;就是将病毒文件的属性设置为系统隐藏属性以逃避一般用户的眼睛&#xff0c;而且由于Windows系统的关系&#xff0c;这类文件在图形界面下是不能修改其属性的。但是好在Windows还算做点好事&#xff0c;留下了一个attrib命令可以让我…

Django 视图

Django之视图 目录 一个简单的视图CBV和FBV FBV版&#xff1a;CBV版&#xff1a;给视图加装饰器 使用装饰器装饰FBV使用装饰器装饰CBVrequest对象 请求相关的常用值属性方法Response对象 使用属性JsonResponse对象Django shortcut functions render()redirect()Django的View&am…

喜大普奔!GitHub官方文档推出中文版

原创整理 | Python开发者&#xff08;ID&#xff1a;PythonCoder&#xff09;最近程序员交友圈出了一个大新闻&#xff0c;GitHub 帮助文档正式推出中文版了&#xff0c;之前一直都是只有英文文档&#xff0c;看起来费劲不方便。这份中文文当非常详尽&#xff0c;可以说有了它 …

Linux中获取当前程序路径的方法

1、命令行实现&#xff1a;转自&#xff1a;http://www.linuxdiyf.com/viewarticle.php?id84177 #!/bin/sh cur_dir$(pwd) echo $cur_dir 注意&#xff1a;在cur_dir后没空格&#xff0c;后面也不能有空格&#xff0c;不然它会认为空格不是路径而报错 2、程序实现&#xf…

android 关于字符转化问题

今日在写android的客户端&#xff0c;发现字符转化是个大问题。 下面是Unicode转UTF-8的转化&#xff0c;便于以后使用 private static String decodeUnicode(String theString) { char aChar; int len theString.length(); StringBuffer outBuffer new Strin…

30分钟看懂XGBoost的基本原理

作者 | 梁云1991转载自Python与算法之美&#xff08;ID: Python_Ai_Road&#xff09;一、XGBoost和GBDTxgboost是一种集成学习算法&#xff0c;属于3类常用的集成方法(bagging,boosting,stacking)中的boosting算法类别。它是一个加法模型&#xff0c;基模型一般选择树模型&…

Linux下遍历文件夹的实现

转自&#xff1a;http://blog.csdn.net/wallwind/article/details/7528474 linux C 遍历目录及其子目录 #include <stdio.h> #include <string.h> #include <stdlib.h> #include <dirent.h> #include <sys/stat.h> #include <unistd.h&…

如何用Python画一棵漂亮的树

Tree海龟绘图turtle 在1966年&#xff0c;Seymour Papert和Wally Feurzig发明了一种专门给儿童学习编程的语言——LOGO语言&#xff0c;它的特色就是通过编程指挥一个小海龟&#xff08;turtle&#xff09;在屏幕上绘图。 海龟绘图&#xff08;Turtle Graphics&#xff09;后来…

windows7下,Java中利用JNI调用c++生成的动态库的使用步骤

1、从http://www.oracle.com/technetwork/java/javase/downloads/jdk-7u2-download-1377129.html下载jdk-7u2-windows-i586.exe&#xff0c;安装到D:\ProgramFiles\Java&#xff0c;并将D:\ProgramFiles\Java\jdk1.7.0_02\bin添加到环境变量中&#xff1b; 2、从http://www.ec…

外观模式 - 设计模式学习

外观模式(Facade)&#xff0c;为子系统中的一组接口提供一个一致的界面&#xff0c;此模式定义了一个高层接口&#xff0c;这个接口使得这一子系统更加容易使用。 怎么叫更加容易使用呢&#xff1f;多个方法变成一个方法&#xff0c;在外观看来&#xff0c;只需知道这个功能完成…