星星博客 »  > 

SparkSQL

SparkSQL

  • SparkSQL概述
    • 什么是SparkSQL
    • 特点
    • DataFrame
    • DataSet
  • SparkSQL编程
    • SparkSession
    • DataFrame
      • 创建
        • 从Spark数据源进行创建
        • 1.我们先创建一个json文件,并上传到linux
          • 2.spark.read
          • 3.展示结果
        • 从RDD转换、HiveTable返回
      • SQL风格语法
      • DSL风格语法
      • RDD转换为DataFrame

SparkSQL概述

什么是SparkSQL

SparkSQL是Spark用来结构化数据的一个模块,它提供了两个编程抽象:DataFrame、DataSet,并且作为分布式SQL查询引擎的作用。
将SparkSQL转换为RDD,然后提交至集群,效率非常高

特点

1.易整合
2.统一的数据访问方式
3.兼容Hive
4.标准的数据连接

DataFrame

与RDD类似,DataFrame也是一个分布式的数据容器。而DataFrame更像传统数据库的二维表格,除记录数据以外,还记录数据的结构信息schema,同时,和Hive类似,DataFrame也支持嵌套数据类型(map、array、struct)
在这里插入图片描述
RDD类型以Person类型作为参数,但是Spark框架本身并不了解Person的内部结构,而右侧的DataFrame却提供了详细的结构信息,使得SparkSQL清楚的直到数据集中包含了哪些列(字段)

DataSet

1)是Dataframe API的一个扩展,是Spark最新的数据抽象。
2)用户友好的API风格,既具有类型安全检查也具有Dataframe的查询优化特性。
3)Dataset支持编解码器,当需要访问非堆上的数据时可以避免反序列化整个对象,提高了效率。
4)样例类被用来在Dataset中定义数据的结构信息,样例类中每个属性的名称直接映射到DataSet中的字段名称。
5) Dataframe是Dataset的特列,DataFrame=Dataset[Row] ,所以可以通过as方法将Dataframe转换为Dataset。Row是一个类型,跟Car、Person这些的类型一样,所有的表结构信息都用Row来表示。
6)DataSet是强类型的。比如可以有Dataset[Car],Dataset[Person].
7)DataFrame只是知道字段,但是不知道字段的类型,所以在执行这些操作的时候是没办法在编译的时候检查是否类型失败的,比如你可以对一个String进行减法操作,在执行的时候才报错,而DataSet不仅仅知道字段,而且知道字段类型,所以有更严格的错误检查。就跟JSON对象和类对象之间的类比。

SparkSQL编程

SparkSession

旧的版本中,SparkSQL提供了两种SQL:SQLContext、HiveContext
SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合,所以SQLContext和HiveContext上可用的API在SparkSession上也是可用的。SparkSession内部封装了sparkContext,所以实际上是由SparkContext完成的
spark中的SparkSession对象

scala> spark
res0: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@4d22265c

DataFrame

创建

可以读取的格式:

scala> spark.read.
csv   format   jdbc   json   load   option   options   orc   parquet   schema   table   text   textFile

创建DataFrame有三种方式:通过Spark数据源创建、从一个存在的RDD进行转换、还可以从HiveTable查询返回

从Spark数据源进行创建

1.我们先创建一个json文件,并上传到linux

{“age”:18,“name”:“yyx”}
{“age”:19,“name”:“dd”}
{“age”:20,“name”:“nn”}

2.spark.read

本地路径读取:

可以看到创建成功DataFrame
scala> spark.read.json("file:///opt/module/spark/data/test_sparkSQL/people.json")
res10: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
3.展示结果
scala> val df = spark.read.json("file:///opt/module/spark/data/test_sparkSQL/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.show
+---+----+
|age|name|
+---+----+
| 18| yyx|
| 19|  dd|
| 20|  nn|
+---+----+

从RDD转换、HiveTable返回

SQL风格语法

1.创建一个DataFrame
2.对DataFrame创建一个临时视图(视图名为person)

之所以是试图而不是表格,是因为表格可以修改,而Spark RDD通常都是val,不支持修改

scala> df.createOrReplaceTempView("person")

3.通过SQL语句查询:

scala> val dfSQL = spark.sql("select * from person")
dfSQL: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> dfSQL.show
+---+----+
|age|name|
+---+----+
| 18| yyx|
| 19|  dd|
| 20|  nn|
+---+----+

scala> val dfSQL_morethan19 = spark.sql("select * from person where age > 19")
dfSQL_morethan19: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> dfSQL
dfSQL   dfSQL_morethan19

scala> dfSQL_morethan19.show
+---+----+
|age|name|
+---+----+
| 20|  nn|
+---+----+

注意,临时表是Session范围内的,Session退出后,表就会失效,如果想应用范围内有效,就要使用全局表,使用全局表时要全路径访问,比如:global_temp.people
创建一个全局表,并使用

scala> df.createGlobalTempView("people")

scala> spark.sql("select * from global_temp.people").show
+---+----+
|age|name|
+---+----+
| 18| yyx|
| 19|  dd|
| 20|  nn|
+---+----+

在新的Session查询:

scala> spark.newSession().sql("select * from global_temp.people").show
+---+----+
|age|name|
+---+----+
| 18| yyx|
| 19|  dd|
| 20|  nn|
+---+----+

而之前的person表在新的Session查询:

scala> spark.newSession().sql("select * from person").show
org.apache.spark.sql.AnalysisException: Table or view not found: person; line 1 pos 14

DSL风格语法

创建一个DateFrame

scala> val df = spark.read.json("file:///opt/module/spark/data/test_sparkSQL/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

查看DataFrame信息

scala> df.printSchema
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

查看name列数据

scala> df.select("name").show()
+----+
|name|
+----+
| yyx|
|  dd|
|  nn|
+----+

查看name列并且age列+1,错误用法:
会被认为是一整个字段age1

scala> df.select("name","age"+1).show()
org.apache.spark.sql.AnalysisException: cannot resolve '`age1`' given input columns: [age, name];;
'Project [name#80, 'age1]

正确方法:
用$必须都用

scala> df.select($"name",$"age"+1).show
+----+---------+
|name|(age + 1)|
+----+---------+
| yyx|       19|
|  dd|       20|
|  nn|       21|
+----+---------+

查询age大于等于19的数据filter

scala> df.filter($"age">=19).show
+---+----+
|age|name|
+---+----+
| 19|  dd|
| 20|  nn|
+---+----+

按照age分组:

scala> df.groupBy("age").count.show
+---+-----+                                                                     
|age|count|
+---+-----+
| 19|    1|
| 18|    1|
| 20|    1|
+---+-----+

RDD转换为DataFrame

RDD与DF、DS之间的操作,要引入import spark.implicits._

scala> import spark.implicits._
import spark.implicits._
此处spark是sparkSession对象名

创建RDD

scala> val rdd = sc.makeRDD(Array(1,2,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[68] at makeRDD at <console>:27

转换

scala> rdd.toDF.show
+-----+
|value|
+-----+
|    1|
|    2|
|    3|
|    4|
+-----+

scala> rdd.toDF("id").show
+---+
| id|
+---+
|  1|
|  2|
|  3|
|  4|
+---+

再来一次

scala> val rdd = sc.makeRDD(Array((1,"zhangsan"),(2,"lisi"),(3,"wangwu")))
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[75] at makeRDD at <console>:27

scala> rdd.toDF("id","name").show
+---+--------+
| id|    name|
+---+--------+
|  1|zhangsan|
|  2|    lisi|
|  3|  wangwu|
+---+--------+

相关文章