您好,欢迎来到化拓教育网。
搜索
您的当前位置:首页SparkSQL(二)——基本操作

SparkSQL(二)——基本操作

来源:化拓教育网
SparkSQL(⼆)——基本操作

SparkSession新的起点

在⽼的版本中,SparkSQL提供两种SQL查询起始点:⼀个叫SQLContext,⽤于Spark⾃⼰提供的SQL查询;⼀个叫HiveContext,⽤于连接Hive的查询。

SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContext和HiveContext上可⽤的API在SparkSession上同样是可以使⽤的。SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext或者HiveContext完成的。

DataFrame基本操作

创建在Spark SQL中SparkSession是创建DataFrame和执⾏SQL的⼊⼝,创建DataFrame有三种⽅式:通过Spark的数据源进⾏创建;从⼀个存在的RDD进⾏转换;还可以从HiveTable进⾏查询返回。1)通过spark的数据源创建

查看SparkSession⽀持哪些⽂件格式创建dataframe(在spark shell中,spark.read.+tab)csv format jdbc json load option options orc parquet schema table text textFile以json格式为例:

{\"name\":\"zhangsan\{\"name\":\"lisi\

{\"name\":\"wangwu\

scala> spark.read.json(\"file:///home/chxy/spark/user.json\")

res2: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

它可以⾃动地判断出数据的字段和字段类型

2)从⼀个存在的RDD中进⾏转换

注意:如果需要RDD与DF或者DS之间操作,那么都需要引⼊ import spark.implicits._(1)⼿动转换

//⾸先引⼊隐式转换

scala> import spark.implicits._ import spark.implicits._

//创建⼀个RDD

scala> def rdd = spark.sparkContext.makeRDD(List((\"zhangsan\rdd: org.apache.spark.rdd.RDD[(String, Int)]

//⼿动指定dataframe的数据结构

scala> val dataframe = rdd.toDF(\"name\

dataframe: org.apache.spark.sql.DataFrame = [name: string, age: int]

(2)通过case类来转换⾸先创建样例类

scala> case class People(name:String, age:Int)defined class People

将rdd中的数据转换为样例类的实例,rdd中的数据类型变为People

scala> val peopleRdd = rdd.map{ d => {People(d._1,d._2)}}

peopleRdd: org.apache.spark.rdd.RDD[People] = MapPartitionsRDD[3] at map at

将peopleRdd转换为dataframe,此时⽆需指定数据结构,spark可以直接将含有case类的RDD转换为DataFrame

scala> val peopleDataframe = peopleRdd.toDF

peopleDataframe: org.apache.spark.sql.DataFrame = [name: string, age: int]

将dataframe转换为rdd

scala> peopleDataframe.rdd

res3: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[7] at rdd at :32

注意:转换后的数据类型已经不是People,⽽是Row,也就是⾏,它⽆法还原出原来的数据类型。

3)从hive查询的tab中反馈()

基本操作查看数据

scala> dataframe.show()+--------+---+| name|age|+--------+---+|zhangsan| 21|| lisi| 22|| wangwu| 23|+--------+---+

创建临时视图

scala> dataframe.createTempView(\"user\")

从临时视图查询数据

//从临时视图返回的数据会组成⼀个新的DataFramescala> spark.sql(\"select * from user\")

res8: org.apache.spark.sql.DataFrame = [name: string, age: int]scala> spark.sql(\"select * from user\").show+--------+---+| name|age|+--------+---+|zhangsan| 21|| lisi| 22|| wangwu| 23|+--------+---+

scala> spark.sql(\"select name from user\").show+--------+| name|+--------+|zhangsan|| lisi|| wangwu|+--------+

创建⼀个全局临时视图

scala> dataframe.createGlobalTempView(\"emp\")

访问该全局临时视图

scala> spark.sql(\"select * from global_temp.emp\").show+--------+---+| name|age|+--------+---+|zhangsan| 21|| lisi| 22|| wangwu| 23|+--------+---+

临时表是Session范围内的,Session退出后,表就失效了。如果想应⽤范围内有效,可以使⽤全局表。注意使⽤全局表时需要全路径访问,如:global_temp.emp在另⼀个session范围内访问该视图:

scala> spark.newSession.sql(\"select * from global_temp.emp\").show+--------+---+| name|age|+--------+---+|zhangsan| 21|| lisi| 22|| wangwu| 23|+--------+---+

注意:

1)视图⼀旦定义则不可修改的;2)session的概念:

⼴义:连接状态,⽐如⼀次通信。狭义:内存中的⼀块存储空间

DataSet

Dataset是具有强类型的数据集合,需要提供对应的类型信息。

创建创建⼀个样例类

scala> case class People(name:String, age:Int)defined class People

创建DataSet(直接从Seq中创建)

scala> val peopleDataset = Seq(People(\"zhangsan\",20),People(\"lisi\",21),People(\"wangwu\",22)).toDS()peopleDataset: org.apache.spark.sql.Dataset[People] = [name: string, age: int]

RDD转换为DataSet

SparkSQL能够⾃动将包含有case类的RDD转换成DataSet直接从peopleRdd开始演⽰:

scala> peopleRdd

res10: org.apache.spark.rdd.RDD[People] = MapPartitionsRDD[3] at map at //RDD中的People case类直接可以映射为DataSet的类型scala> peopleRdd.toDS

res11: org.apache.spark.sql.Dataset[People] = [name: string, age: int]

DataSet转换成RDD

直接调⽤rdd⽅法,⽽且可以保留RDD的case类的类型

scala> res11.rdd

res12: org.apache.spark.rdd.RDD[People] = MapPartitionsRDD[9]

DataFrame与DataSet的互转

DataFrame转换成DataSet:DataFrame有结构,但是没有类型,DataSet既有结构也有类型,因此只需要加上类型

scala> peopleDataframe.as[People]

res14: org.apache.spark.sql.Dataset[People] = [name: string, age: int]

DataSet转换成DataFrame:同样的道理,只需要忽略类型

scala> peopleDataset.toDF

res15: org.apache.spark.sql.DataFrame = [name: string, age: int]

RDD DataFrame,DataSet三者之间的互转总结如下:

重要补充:

1.增删改查,四⼤sql常⽤操作,增、删、改是否被dataframel所⽀持呢?⾸先从⽂件创建⼀个dataframe,并创建临时视图:

scala> val userDF = spark.read.json(\"file:///home/chxy/spark/user.json\")

userDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string] scala> userDF.createTempView(\"userView\")

执⾏插⼊操作,抛出异常:

scala> spark.sql(\"insert into userView values('sasa',24)\")

org.apache.hadoop.fs.ParentNotDirectoryException: Parent path is not a directory: file:/home/chxy/spark/user.json

org.apache.hadoop.fs.ParentNotDirectoryException.这个异常是由hdfs⽂件系统抛出的。很容易理解,因为hdfs天⽣不⽀持⽂件的插⼊操作。对于增加和删除操作,因该会得到相同的结果。

执⾏更新操作,抛出异常:

spark.sql(\"update userView set name = 'sasa' where id = 1\")org.apache.spark.sql.catalyst.parser.ParseException:

mismatched input 'update' expecting {'(', 'SELECT', 'FROM', 'ADD', 'DESC', 'WITH', 'VALUES', 'CREATE', 'TABLE', 'INSERT', 'DELETE', 'DESCRIBE', 'EXPLAIN', 'SHOW', 'USE', 'DROP', 'ALTER', 'MAP', 'SET', 'RESET', 'START', 'COMMIT

sparksql不⽀持update执⾏删除操作,抛出异常:

spark.sql(\"delete from user where age = 20\")

org.apache.spark.sql.catalyst.parser.ParseException:Operation not allowed: delete from(line 1, pos 0)== SQL ==

delete from user where age = 20^^^

at org.apache.spark.sql.catalyst.parser.ParserUtils$.operationNotAllowed(ParserUtils.scala:39)

该操作不被允许。

2.关于视图:

视图在driver端是不可见的

scala> userView

:24: error: not found: value userView userView ^

如何删除⼀个视图

spark.sql(\"drop table userView\")

3.关于dataset与dataframe中的算⼦如何使⽤以map算⼦为例:

package sparksql

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.{DataFrame, Row, SparkSession}

object Demo1 {

def main(args: Array[String]): Unit = { //创建SparkConf()并设置App名称 val spark = SparkSession .builder()

.appName(\"Spark SQL basic example\")

.config(\"spark.some.config.option\", \"some-value\") .master(\"local[*]\") .getOrCreate() import spark.implicits._

val raw: RDD[(String, Int)] = spark.sparkContext.makeRDD(List((\"zhangsan\", 21), (\"lisi\", 22), (\"wangwu\", 23)))//创建dataframe

val df: DataFrame = raw.toDF(\"name\", \"age\") df.show()

//调⽤map⽅法,数据数据类型是:Row(col1,col2...coln) df.map{

case Row(col1:String,col2:Int)=> println(col1);println(col2) col1 case _=> \"\" }.show()

//同RDD,会⽣成⼀个新的DataFrame

spark.stop() }}

dataset:

package sparksql

import org.apache.spark.sql.{Dataset, SparkSession}

object Demo2 {

case class People(name:String, age:Int)//声明case类 def main(args: Array[String]): Unit = { //创建SparkConf()并设置App名称 val spark = SparkSession .builder()

.appName(\"Spark SQL basic example\")

.config(\"spark.some.config.option\", \"some-value\") .master(\"local[*]\") .getOrCreate() import spark.implicits._

val peopleDataset = Seq(People(\"zhangsan\",20),People(\"lisi\",21),People(\"wangwu\",22)).toDS()//创建dataset val newDataset: Dataset[String] = peopleDataset.map { case People(name: String, age: Int) => println(name) name case _ => \"\" }

newDataset.show() spark.stop() }}

遇到的坑:

因篇幅问题不能全部显示,请点此查看更多更全内容

Copyright © 2019- huatuo9.cn 版权所有 赣ICP备2023008801号-1

违法及侵权请联系:TEL:199 18 7713 E-MAIL:2724546146@qq.com

本站由北京市万商天勤律师事务所王兴未律师提供法律服务