Chapter 3 DataFram And SparkSQL
spark-DataFrame学习记录-[1]初识
(1)book.txt内容:
book_1,I am book1
book_2,red had
(2)develoment.json
{"name":"Develoment Dept","depId":"1"}
{"name":"Personnel Dept","depId":"2"}
{"name":"Testing","depId":"4"}
(3)newpeople.json
{"name":"John","job number":"007","age":32,"gender":"male","depId":1,"salary":4000}
{"name":"Herry","job number":"008","age":20,"gender":"female","depId":2,"salary":4000}
{"name":"Jack","job number":"009","age":26,"gender":"male","depId":3,"salary":4000}
(4)people.json
{"name":"Michael","job number":"001","age":33,"gender":"male","depId":1,"salary":3000}
{"name":"andy","job number":"002","age":30,"gender":"female","depId":2,"salary":4000}
{"name":"Justin","job number":"003","age":19,"gender":"male","depId":1,"salary":4000}
{"name":"John","job number":"004","age":32,"gender":"male","depId":1,"salary":4000}
{"name":"Herry","job number":"005","age":20,"gender":"female","depId":2,"salary":4000}
{"name":"Jack","job number":"006","age":26,"gender":"male","depId":3,"salary":4000}
object DataFrameSQL {
def main(args: Array[String]) {
val conf = new SparkConf()
conf.setAppName("test")
conf.setMaster("local")
val sc = new SparkContext(conf)
//设置日志级别
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark.sql").setLevel(Level.WARN)
val sqlContext = new HiveContext(sc)
//加载json文件
val people = sqlContext.read.json("./src/com/dt/spark/main/DataFrameLearn/srcFile/people.json")
val newpeople = sqlContext.read.json("./src/com/dt/spark/main/DataFrameLearn/srcFile/newpeople.json")
val dept = sqlContext.read.json("./src/com/dt/spark/main/DataFrameLearn/srcFile/department.json")
//==========================================
/*
表格形式显示DataFrame信息
show():
默认输出前20条
show(n):
可以设置参数
*/
people.show()
// +---+-----+------+----------+-------+-------+
// |age|depId|gender|job number| name|sarlary|
// +---+-----+------+----------+-------+-------+
// | 33| 1| male| 001|Michael| 3000|
// | 30| 2|female| 002| andy| 4000|
// | 19| 1| male| 003| Justin| 4000|
// | 32| 1| male| 004| John| 4000|
// | 20| 2|female| 005| Herry| 4000|
// | 26| 3| male| 006| Jack| 4000|
// +---+-----+------+----------+-------+-------+
/*
打印printSchema
*/
people.printSchema()
// root
// |-- age: long (nullable = true)
// |-- depId: long (nullable = true)
// |-- gender: string (nullable = true)
// |-- job number: string (nullable = true)
// |-- name: string (nullable = true)
// |-- sarlary: long (nullable = true)
//==========================================
/*
DataFrame基本信息的查询
columns:已数组的形式返回列名字
count: 返回总记录数
take: 已数组的形式返回前n记录信息
toJSON: 转换为JsonRDD
*/
// people.columns
people.columns.foreach(println(_))
// age
// depId
// gender
// job number
// name
// salary
println(people.count())
//6
people.take(2).foreach(print(_))
//[33,1,male,001,Michael,3000][30,2,female,002,andy,4000]
people.toJSON.collect().foreach(println(_))
// {"age":33,"depId":1,"gender":"male","job number":"001","name":"Michael","salary":3000}
// {"age":30,"depId":2,"gender":"female","job number":"002","name":"andy","salary":4000}
// {"age":19,"depId":1,"gender":"male","job number":"003","name":"Justin","salary":4000}
// {"age":32,"depId":1,"gender":"male","job number":"004","name":"John","salary":4000}
// {"age":20,"depId":2,"gender":"female","job number":"005","name":"Herry","salary":4000}
// {"age":26,"depId":3,"gender":"male","job number":"006","name":"Jack","salary":4000}
//==========================================
/*
DataFrame的条件查询
filter:
where:
*/
println(people.filter("gender = 'male'").count())
//4
println(people.filter("age > 25").count())
//4
println(people.filter("age > 25").filter("gender = 'male'").count())
// 3
println(people.where("age > 25").count())
//4
//==========================================
/*
排序和分区排序
*/
people.sort(people("age").asc).show
// +---+-----+------+----------+-------+------+
// |age|depId|gender|job number| name|salary|
// +---+-----+------+----------+-------+------+
// | 19| 1| male| 003| Justin| 4000|
// | 20| 2|female| 005| Herry| 4000|
// | 26| 3| male| 006| Jack| 4000|
// | 30| 2|female| 002| andy| 4000|
// | 32| 1| male| 004| John| 4000|
// | 33| 1| male| 001|Michael| 3000|
// +---+-----+------+----------+-------+------+
people.sortWithinPartitions("gender","age").show()
// +---+-----+------+----------+-------+------+
// |age|depId|gender|job number| name|salary|
// +---+-----+------+----------+-------+------+
// | 20| 2|female| 005| Herry| 4000|
// | 30| 2|female| 002| andy| 4000|
// | 19| 1| male| 003| Justin| 4000|
// | 26| 3| male| 006| Jack| 4000|
// | 32| 1| male| 004| John| 4000|
// | 33| 1| male| 001|Michael| 3000|
// +---+-----+------+----------+-------+------+
//==========================================
/*
增加列
*/
//select only "name" column
people.select("age").show()
// +---+
// |age|
// +---+
// | 33|
// | 30|
// | 19|
// | 32|
// | 20|
// | 26|
// +---+
//select everybody,but increment the age by 1
//选择同时定义
people.select(people("age"), (people("age")*1.0/100*100)).show()
// +---+---------+
// |age|(age + 1)|
// +---+---------+
// | 33| 34|
// | 30| 31|
// | 19| 20|
// | 32| 33|
// | 20| 21|
// | 26| 27|
// +---+---------+
people.withColumn("level",people("age")/10).show
// +---+-----+------+----------+-------+------+-----+
// |age|depId|gender|job number| name|salary|level|
// +---+-----+------+----------+-------+------+-----+
// | 33| 1| male| 001|Michael| 3000| 3.3|
// | 30| 2|female| 002| andy| 4000| 3.0|
// | 19| 1| male| 003| Justin| 4000| 1.9|
// | 32| 1| male| 004| John| 4000| 3.2|
// | 20| 2|female| 005| Herry| 4000| 2.0|
// | 26| 3| male| 006| Jack| 4000| 2.6|
// +---+-----+------+----------+-------+------+-----+
//==========================================
/*
修改列名字
*/
people.withColumnRenamed("job number","jobId").show()
// +---+-----+------+-----+-------+------+
// |age|depId|gender|jobId| name|salary|
// +---+-----+------+-----+-------+------+
// | 33| 1| male| 001|Michael| 3000|
// | 30| 2|female| 002| andy| 4000|
// | 19| 1| male| 003| Justin| 4000|
// | 32| 1| male| 004| John| 4000|
// | 20| 2|female| 005| Herry| 4000|
// | 26| 3| male| 006| Jack| 4000|
// +---+-----+------+-----+-------+------+
//==========================================
/*
unionAll操作
*/
people.unionAll(newpeople).show()
//
// +---+-----+------+----------+-------+------+
// |age|depId|gender|job number| name|salary|
// +---+-----+------+----------+-------+------+
// | 33| 1| male| 001|Michael| 3000|
// | 30| 2|female| 002| andy| 4000|
// | 19| 1| male| 003| Justin| 4000|
// | 32| 1| male| 004| John| 4000|
// | 20| 2|female| 005| Herry| 4000|
// | 26| 3| male| 006| Jack| 4000|
// | 32| 1| male| 007| John| 4000|
// | 20| 2|female| 008| Herry| 4000|
// | 26| 3| male| 009| Jack| 4000|
// +---+-----+------+----------+-------+------+
//==========================================
/*
聚合操作groupBy
*/
people.unionAll(newpeople).groupBy("name").count().show
// +-------+-----+
// | name|count|
// +-------+-----+
// | Jack| 2|
// | John| 2|
// |Michael| 1|
// | Justin| 1|
// | Herry| 2|
// | andy| 1|
// +-------+-----+
val depAgg = people.groupBy("depId").agg(
Map(
"age"->"max",
"gender"->"count"
)
)
depAgg.show()
//+-----+--------+-------------+
//|depId|max(age)|count(gender)|
//+-----+--------+-------------+
//| 1| 33| 3|
//| 2| 30| 2|
//| 3| 26| 1|
//+-----+--------+-------------+
depAgg.toDF("depId","maxAge","countGender").show()
// +-----+------+-----------+
// |depId|maxAge|countGender|
// +-----+------+-----------+
// | 1| 33| 3|
// | 2| 30| 2|
// | 3| 26| 1|
// +-----+------+-----------+
//==========================================
/*
去重操作distinct
*/
people.unionAll(newpeople).select("name").distinct().show()
// +-------+
// | name|
// +-------+
// | Jack|
// | John|
// |Michael|
// | Justin|
// | Herry|
// | andy|
// +-------+
//==========================================
/*
交集和异或
*/
people.select("name").except(newpeople.select("name")).show()
// +-------+
// | name|
// +-------+
// | andy|
// | Justin|
// |Michael|
// +-------+
people.select("name").intersect(newpeople.select("name")).show()
// +-----+
// | name|
// +-----+
// |Herry|
// | John|
// | Jack|
// +-----+
//==========================================
/*
DataFrame间的join
*/
people.join(dept,people("depId")===dept("depId"),"inner").show()
// +---+-----+------+----------+-------+------+-----+---------------+
// |age|depId|gender|job number| name|salary|depId| name|
// +---+-----+------+----------+-------+------+-----+---------------+
// | 33| 1| male| 001|Michael| 3000| 1|Develoment Dept|
// | 19| 1| male| 003| Justin| 4000| 1|Develoment Dept|
// | 32| 1| male| 004| John| 4000| 1|Develoment Dept|
// | 26| 3| male| 006| Jack| 4000| 3| Testing Dept|
// | 30| 2|female| 002| andy| 4000| 2| Personnel Dept|
// | 20| 2|female| 005| Herry| 4000| 2| Personnel Dept|
// +---+-----+------+----------+-------+------+-----+---------------+
sc.stop()
}
}
spark-DataFrame学习记录-[2]解决spark-dataframe的JOIN操作之后产生重复列(Reference '***' is ambiguous问题解决)
package com.dt.spark.main.DataFrameLearn
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
/**
* spark-DataFrame学习记录-[2]解决spark-dataframe的JOIN操作之后产生重复列(Reference '***' is ambiguous问题解决)
*/
object DataFrameSQL_2 {
def main(args: Array[String]) {
val conf = new SparkConf()
conf.setAppName("test")
conf.setMaster("local")
val sc = new SparkContext(conf)
//设置日志级别
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark.sql").setLevel(Level.WARN)
val sqlContext = new HiveContext(sc)
import sqlContext.implicits._
var df = sc.parallelize(Array(
("one", "A", 1), ("one", "B", 2), ("two", "A", 3), ("two", "B", 4)
)).toDF("key1", "key2", "value")
df.show()
// +----+----+-----+
// |key1|key2|value|
// +----+----+-----+
// | one| A| 1|
// | one| B| 2|
// | two| A| 3|
// | two| B| 4|
// +----+----+-----+
val df2 = sc.parallelize(Array(
("one", "A", 5), ("two", "A", 6)
)).toDF("key1", "key2", "value2")
df2.show()
// +----+----+------+
// |key1|key2|value2|
// +----+----+------+
// | one| A| 5|
// | two| A| 6|
// +----+----+------+
println("==========")
df = df.unionAll(df2)
df.show()
println("==========")
val joined = df.join(df2, df("key1") === df2("key1") && df("key2") === df2("key2"), "left_outer")
joined.show()
// +----+----+-----+----+----+------+
// |key1|key2|value|key1|key2|value2|
// +----+----+-----+----+----+------+
// | two| A| 3| two| A| 6|
// | two| B| 4|null|null| null|
// | one| A| 1| one| A| 5|
// | one| B| 2|null|null| null|
// +----+----+-----+----+----+------+
df.join(df2, Seq("key1", "key2"), "left_outer").show()
// +----+----+-----+------+
// |key1|key2|value|value2|
// +----+----+-----+------+
// | two| A| 3| 6|
// | two| B| 4| null|
// | one| A| 1| 5|
// | one| B| 2| null|
// +----+----+-----+------+
df.join(df2, Seq("key1")).show()
//df
// +----+----+-----+
// |key1|key2|value|
// +----+----+-----+
// | one| A| 1|
// | one| B| 2|
// | two| A| 3|
// | two| B| 4|
// +----+----+-----+
//df2
// +----+----+------+
// |key1|key2|value2|
// +----+----+------+
// | one| A| 5|
// | two| A| 6|
// +----+----+------+
// +----+----+-----+----+------+
// |key1|key2|value|key2|value2|
// +----+----+-----+----+------+
// | two| A| 3| A| 6|
// | two| B| 4| A| 6|
// | one| A| 1| A| 5|
// | one| B| 2| A| 5|
// +----+----+-----+----+------+
val df22 = df2.withColumnRenamed("key1","k1").withColumnRenamed("key2","k2")
df.join(df22,df("key1") === df22("k1") && df("key2") === df22("k2"), "left_outer").show()
// +----+----+-----+----+----+------+
// |key1|key2|value| k1| k2|value2|
// +----+----+-----+----+----+------+
// | two| A| 3| two| A| 6|
// | two| B| 4|null|null| null|
// | one| A| 1| one| A| 5|
// | one| B| 2|null|null| null|
// +----+----+-----+----+----+------+
(df.join(df22,df("key1") === df22("k1") && df("key2") === df22("k2"), "left_outer").columns.foreach(println(_)))
sc.stop()
}
}
spark-DataFrame学习记录-[3]以Json字符串构建RDD转DF
package com.dt.spark.main.DataFrameLearn
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}
/**
* spark-DataFrame学习记录-[3]以Json字符串构建RDD转DF
(1)字符串中$闭包自由变值
(2)以Json字符串构建RDD转DF
*/
object DataFrameSQL_3 {
def main(args: Array[String]) {
val conf = new SparkConf()
conf.setAppName("test")
conf.setMaster("local")
val sc = new SparkContext(conf)
//设置日志级别
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark.sql").setLevel(Level.WARN)
val sqlContext = new HiveContext(sc)
val name = "yourName"
val age = 12
val tesRDD = sc.makeRDD(s"""{"num":"$name","age":"$age"}""".stripMargin:: Nil)
val tesRDD2DF = sqlContext.read.json(tesRDD)
tesRDD2DF.show()
// +---+--------+
// |age| num|
// +---+--------+
// | 12|yourName|
// +---+--------+
sc.stop()
}
}
spark-DataFrame学习记录-[4]aggr
package com.dt.spark.main.DataFrameLearn
import org.apache.spark.sql.functions._
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by hjw on 17/3/25.
*/
//http://stackoverflow.com/questions/30218140/spark-how-to-translate-countdistinctvalue-in-dataframe-apis#
object DataFrameSQL_agg {
case class KV(page: String, visitor: String)
case class Logs(city_id: String, ctype: String, uuid: String)
def main(args: Array[String]) {
val list = List(("PAG1", "V1"),
("PAG1", "V1"),
("PAG2", "V1"),
("PAG2", "V2"),
("PAG2", "V1"),
("PAG1", "V1"),
("PAG1", "V2"),
("PAG1", "V1"),
("PAG1", "V2"),
("PAG1", "V1"),
("PAG2", "V2"),
("PAG1", "V3"))
val list2 = List(
("1001", "android", "V1"),
("1001", "android", "V1"),
("1001", "android", "V2"),
("1001", "iphone", "V1"),
("1001", "iphone", "V1"),
("1002", "android", "V1"),
("1002", "android", "V1"),
("1002", "android", "V2"),
("1002", "iphone", "V1"),
("1002", "iphone", "V1")
)
val list3 = List(
("1001", "android", "V1"),
("1001", "android", "V1"),
("1001", "android", "V2"),
("1001", "android", "V1"),
("1001", "android", "V1"),
("1002", "android", "V1"),
("1002", "android", "V1"),
("1002", "android", "V2"),
("1002", "iphone", "V1"),
("1002", "iphone", "V1")
)
val conf = new SparkConf()
conf.setAppName("test")
conf.setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new HiveContext(sc)
import sqlContext.implicits._
val logs = sc.parallelize(list.map(iter => KV(iter._1, iter._2))).toDF
//======方法一:SQL===============
logs.registerTempTable("logs")
val sqlResult = sqlContext.sql(
"""
select
page,
visitor as vs
from(
select
page,
count(distinct visitor) as visitor
from logs
group by page
)A
""")
val result = sqlResult.map(x => (x(0).toString, x(1).toString))
result.foreach(println)
// (PAG1,3)
// (PAG2,2)
//======方法一:API===============
//df.groupby("a").agg()
//df.select('a).distinct.count
//df.groupBy("word").agg(countDistinct("title")).show()
val result2 = logs.select("page", "visitor").groupBy('page).agg('page, countDistinct('visitor))
result.foreach(println)
val Log = sc.parallelize(list2.map(iter => Logs(iter._1, iter._2, iter._3))).toDF
println("==========test3=========")
val result3 = Log.groupBy('city_id,'ctype).agg(countDistinct('uuid))
result3.foreach(println)
// [1001,android,2]
// [1001,iphone,1]
// [1002,iphone,1]
// [1002,android,2]
println("==========test4=========")
val result4 = Log.groupBy('city_id).agg(countDistinct('uuid))
result4.foreach(println)
// [1001,2]
// [1002,2]
println("==========test5=========")
val Log3 = sc.parallelize(list3.map(iter => Logs(iter._1, iter._2, iter._3))).toDF
val result5 = Log3.groupBy('city_id).agg(countDistinct('uuid),countDistinct('ctype))
result5.foreach(println)
// [1002,2,2]
// [1001,2,1]
sc.stop()
}
}
spark-DataFrame学习记录-[5]explode
package com.dt.spark.main.DataFrameLearn
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.functions._
case class Book(title:String ,words:String)
case class Word(word:String)
/**
* Created by hjw on 16/7/17.
*/
//df.groupby("a").agg()
//df.select('a).distinct.count
//df.groupBy("word").agg(countDistinct("title")).show()
/**
* explode
*/
object DataFrameSQL_explode {
def main(args: Array[String]) {
val conf = new SparkConf()
conf.setAppName("test")
conf.setMaster("local")
val sc = new SparkContext(conf)
//设置日志级别
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark.sql").setLevel(Level.WARN)
val sqlContext = new HiveContext(sc)
import sqlContext.implicits._
//加载txt文件
val book = sc.textFile("./src/com/dt/spark/main/DataFrameLearn/srcFile/book.txt")
//==========================================
val df = book.map(_.split(",")).map(b=>Book(b(0),b(1))).toDF()
val allWords = df.explode('words){case Row(words:String)=>words.split(" ").map(Word(_))}
allWords.map{
row => "0 = " + row(0) + "; 1 = " + row(1) + "; 2 = " + row(2)
}.foreach(println(_))
// 0 = book_1; 1 = I am book1; 2 = I
// 0 = book_1; 1 = I am book1; 2 = am
// 0 = book_1; 1 = I am book1; 2 = book1
// 0 = book_2; 1 = red had; 2 = red
// 0 = book_2; 1 = red had; 2 = had
//import org.apache.spark.sql.functions._
allWords.groupBy("word").agg(countDistinct("title")).show()
// +-----+------------+
// | word|count(title)|
// +-----+------------+
// | had| 1|
// |book1| 1|
// | am| 1|
// | I| 1|
// | red| 1|
// +-----+------------+
sc.stop()
}
}
spark-DataFrame学习记录-[6]DF2RDD
package com.dt.spark.main.DataFrameToRDDLearn
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}
object DataFrameToRDD_2 {
def main(args: Array[String]) {
val conf = new SparkConf()
conf.setAppName("test")
conf.setMaster("local")
val sc = new SparkContext(conf)
//设置日志级别
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark.sql").setLevel(Level.WARN)
val sqlContext = new HiveContext(sc)
//==========================================
/*
通过编程指定模式将DataFrame转化成RDD
*/
val people = sc.textFile("./src/com/dt/spark/main/DataFrameToRDDLearn/srcFile/people.txt")
//添加隐式转化,利用toDF
//import sqlContext.implicits._
//==========================================
/*
指定一个schema包含的列名
*/
val schemaString = "name age"
//==========================================
/*
构建schema
//import org.apache.spark.sql.types.{StructField, StringType, StructType}
*/
val schema = StructType(schemaString.split(" ").
map(fieldName=>StructField(fieldName,StringType,true)))
//org.apache.spark.sql.types.StructType(schemaString.split(" ")
//.map(fieldName=>(fieldName,StringType,true)))
//==========================================
/*
将RDD转化成DataFrame的Row类型
将RDD中的每条记录转换成一个行(Row)的实例
*/
val rowRDD= people.map(_.split(",")).map(p=>Row(p(0),p(1).trim))
val peopleDF = sqlContext.createDataFrame(rowRDD,schema)
peopleDF.registerTempTable("peoletablefromschema")
//==========================================
/*
查看当前库的全部表名
*/
sqlContext.tableNames().foreach(println(_))
//peoletablefromschema
//==========================================
/*
利用sql查询
*/
val teenagers = sqlContext.sql("select name from peoletablefromschema where age >= 13 and age <= 19")
teenagers.map(t=>"Name: " + t(0)).collect().foreach(println)
// Name: Justin
// Name: Herry
val dt = "20161208"
val squaresDF = sc.makeRDD(s"""{"instid":"1","ctime":"1","event_type":"1","status":"1","event_id":"10014","path_info":"OK"}""".stripMargin:: Nil)
println(squaresDF)
val otherPeople = sqlContext.read.json(squaresDF)
sc.stop()
}
}