Chapter 3 DataFram And SparkSQL

spark-DataFrame学习记录-[1]初识

(1)book.txt内容:

book_1,I am book1

book_2,red had

(2)develoment.json

{"name":"Develoment Dept","depId":"1"}

{"name":"Personnel Dept","depId":"2"}

{"name":"Testing","depId":"4"}

(3)newpeople.json

{"name":"John","job number":"007","age":32,"gender":"male","depId":1,"salary":4000}

{"name":"Herry","job number":"008","age":20,"gender":"female","depId":2,"salary":4000}

{"name":"Jack","job number":"009","age":26,"gender":"male","depId":3,"salary":4000}

(4)people.json

{"name":"Michael","job number":"001","age":33,"gender":"male","depId":1,"salary":3000}

{"name":"andy","job number":"002","age":30,"gender":"female","depId":2,"salary":4000}

{"name":"Justin","job number":"003","age":19,"gender":"male","depId":1,"salary":4000}

{"name":"John","job number":"004","age":32,"gender":"male","depId":1,"salary":4000}

{"name":"Herry","job number":"005","age":20,"gender":"female","depId":2,"salary":4000}

{"name":"Jack","job number":"006","age":26,"gender":"male","depId":3,"salary":4000}

object DataFrameSQL {
  def main(args: Array[String]) {

    val conf = new SparkConf()
    conf.setAppName("test")
    conf.setMaster("local")

    val sc = new SparkContext(conf)

    //设置日志级别
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.apache.spark.sql").setLevel(Level.WARN)

    val sqlContext = new HiveContext(sc)
    //加载json文件
    val people = sqlContext.read.json("./src/com/dt/spark/main/DataFrameLearn/srcFile/people.json")
    val newpeople = sqlContext.read.json("./src/com/dt/spark/main/DataFrameLearn/srcFile/newpeople.json")
    val dept = sqlContext.read.json("./src/com/dt/spark/main/DataFrameLearn/srcFile/department.json")

    //==========================================
    /*
    表格形式显示DataFrame信息
    show():
            默认输出前20条
    show(n):
            可以设置参数
     */

    people.show()
    //    +---+-----+------+----------+-------+-------+
    //    |age|depId|gender|job number|   name|sarlary|
    //      +---+-----+------+----------+-------+-------+
    //    | 33|    1|  male|       001|Michael|   3000|
    //    | 30|    2|female|       002|   andy|   4000|
    //    | 19|    1|  male|       003| Justin|   4000|
    //    | 32|    1|  male|       004|   John|   4000|
    //    | 20|    2|female|       005|  Herry|   4000|
    //    | 26|    3|  male|       006|   Jack|   4000|
    //    +---+-----+------+----------+-------+-------+

    /*
      打印printSchema
    */
    people.printSchema()
    //    root
    //    |-- age: long (nullable = true)
    //    |-- depId: long (nullable = true)
    //    |-- gender: string (nullable = true)
    //    |-- job number: string (nullable = true)
    //    |-- name: string (nullable = true)
    //    |-- sarlary: long (nullable = true)

    //==========================================
    /*
    DataFrame基本信息的查询
    columns:已数组的形式返回列名字
    count:  返回总记录数
    take:   已数组的形式返回前n记录信息
    toJSON: 转换为JsonRDD
     */

    // people.columns
    people.columns.foreach(println(_))
    //    age
    //    depId
    //    gender
    //    job number
    //    name
    //    salary

    println(people.count())
    //6

    people.take(2).foreach(print(_))
    //[33,1,male,001,Michael,3000][30,2,female,002,andy,4000]

    people.toJSON.collect().foreach(println(_))

    //    {"age":33,"depId":1,"gender":"male","job number":"001","name":"Michael","salary":3000}
    //    {"age":30,"depId":2,"gender":"female","job number":"002","name":"andy","salary":4000}
    //    {"age":19,"depId":1,"gender":"male","job number":"003","name":"Justin","salary":4000}
    //    {"age":32,"depId":1,"gender":"male","job number":"004","name":"John","salary":4000}
    //    {"age":20,"depId":2,"gender":"female","job number":"005","name":"Herry","salary":4000}
    //    {"age":26,"depId":3,"gender":"male","job number":"006","name":"Jack","salary":4000}

   //==========================================
    /*
     DataFrame的条件查询
     filter:
     where:
    */

    println(people.filter("gender = 'male'").count())
    //4
    println(people.filter("age > 25").count())
    //4

    println(people.filter("age > 25").filter("gender = 'male'").count())
    // 3

    println(people.where("age > 25").count())
    //4

    //==========================================
    /*
    排序和分区排序
     */
    people.sort(people("age").asc).show

    //    +---+-----+------+----------+-------+------+
    //    |age|depId|gender|job number|   name|salary|
    //      +---+-----+------+----------+-------+------+
    //    | 19|    1|  male|       003| Justin|  4000|
    //    | 20|    2|female|       005|  Herry|  4000|
    //    | 26|    3|  male|       006|   Jack|  4000|
    //    | 30|    2|female|       002|   andy|  4000|
    //    | 32|    1|  male|       004|   John|  4000|
    //    | 33|    1|  male|       001|Michael|  3000|
    //    +---+-----+------+----------+-------+------+

    people.sortWithinPartitions("gender","age").show()
    //    +---+-----+------+----------+-------+------+
    //    |age|depId|gender|job number|   name|salary|
    //      +---+-----+------+----------+-------+------+
    //    | 20|    2|female|       005|  Herry|  4000|
    //    | 30|    2|female|       002|   andy|  4000|
    //    | 19|    1|  male|       003| Justin|  4000|
    //    | 26|    3|  male|       006|   Jack|  4000|
    //    | 32|    1|  male|       004|   John|  4000|
    //    | 33|    1|  male|       001|Michael|  3000|
    //    +---+-----+------+----------+-------+------+

    //==========================================
    /*
    增加列
     */

    //select only "name" column
    people.select("age").show()

    //    +---+
    //    |age|
    //    +---+
    //    | 33|
    //    | 30|
    //    | 19|
    //    | 32|
    //    | 20|
    //    | 26|
    //    +---+

    //select everybody,but increment the age by 1
    //选择同时定义
    people.select(people("age"), (people("age")*1.0/100*100)).show()

    //      +---+---------+
    //      |age|(age + 1)|
    //      +---+---------+
    //      | 33|       34|
    //      | 30|       31|
    //      | 19|       20|
    //      | 32|       33|
    //      | 20|       21|
    //      | 26|       27|
    //      +---+---------+






    people.withColumn("level",people("age")/10).show
    //    +---+-----+------+----------+-------+------+-----+
    //    |age|depId|gender|job number|   name|salary|level|
    //    +---+-----+------+----------+-------+------+-----+
    //    | 33|    1|  male|       001|Michael|  3000|  3.3|
    //    | 30|    2|female|       002|   andy|  4000|  3.0|
    //    | 19|    1|  male|       003| Justin|  4000|  1.9|
    //    | 32|    1|  male|       004|   John|  4000|  3.2|
    //    | 20|    2|female|       005|  Herry|  4000|  2.0|
    //    | 26|    3|  male|       006|   Jack|  4000|  2.6|
    //    +---+-----+------+----------+-------+------+-----+


    //==========================================
    /*
    修改列名字
     */
    people.withColumnRenamed("job number","jobId").show()
//    +---+-----+------+-----+-------+------+
//    |age|depId|gender|jobId|   name|salary|
//    +---+-----+------+-----+-------+------+
//    | 33|    1|  male|  001|Michael|  3000|
//    | 30|    2|female|  002|   andy|  4000|
//    | 19|    1|  male|  003| Justin|  4000|
//    | 32|    1|  male|  004|   John|  4000|
//    | 20|    2|female|  005|  Herry|  4000|
//    | 26|    3|  male|  006|   Jack|  4000|
//    +---+-----+------+-----+-------+------+


    //==========================================
    /*
    unionAll操作
     */

    people.unionAll(newpeople).show()
    //
    //    +---+-----+------+----------+-------+------+
    //    |age|depId|gender|job number|   name|salary|
    //      +---+-----+------+----------+-------+------+
    //    | 33|    1|  male|       001|Michael|  3000|
    //    | 30|    2|female|       002|   andy|  4000|
    //    | 19|    1|  male|       003| Justin|  4000|
    //    | 32|    1|  male|       004|   John|  4000|
    //    | 20|    2|female|       005|  Herry|  4000|
    //    | 26|    3|  male|       006|   Jack|  4000|
    //    | 32|    1|  male|       007|   John|  4000|
    //    | 20|    2|female|       008|  Herry|  4000|
    //    | 26|    3|  male|       009|   Jack|  4000|
    //    +---+-----+------+----------+-------+------+


    //==========================================
    /*
    聚合操作groupBy
     */
    people.unionAll(newpeople).groupBy("name").count().show
    //      +-------+-----+
    //      |   name|count|
    //      +-------+-----+
    //      |   Jack|    2|
    //      |   John|    2|
    //      |Michael|    1|
    //      | Justin|    1|
    //      |  Herry|    2|
    //      |   andy|    1|
    //      +-------+-----+

    val depAgg = people.groupBy("depId").agg(
      Map(
        "age"->"max",
        "gender"->"count"
      )
    )
    depAgg.show()
    //+-----+--------+-------------+
    //|depId|max(age)|count(gender)|
    //+-----+--------+-------------+
    //|    1|      33|            3|
    //|    2|      30|            2|
    //|    3|      26|            1|
    //+-----+--------+-------------+
    depAgg.toDF("depId","maxAge","countGender").show()
    //    +-----+------+-----------+
    //    |depId|maxAge|countGender|
    //    +-----+------+-----------+
    //    |    1|    33|          3|
    //    |    2|    30|          2|
    //    |    3|    26|          1|
    //    +-----+------+-----------+


    //==========================================
    /*
    去重操作distinct
     */
    people.unionAll(newpeople).select("name").distinct().show()
    //    +-------+
    //    |   name|
    //    +-------+
    //    |   Jack|
    //    |   John|
    //    |Michael|
    //    | Justin|
    //    |  Herry|
    //    |   andy|
    //    +-------+

    //==========================================
    /*
    交集和异或
     */
    people.select("name").except(newpeople.select("name")).show()
    //    +-------+
    //    |   name|
    //    +-------+
    //    |   andy|
    //    | Justin|
    //    |Michael|
    //    +-------+
    people.select("name").intersect(newpeople.select("name")).show()
    //    +-----+
    //    | name|
    //    +-----+
    //    |Herry|
    //    | John|
    //    | Jack|
    //    +-----+

    //==========================================
    /*
    DataFrame间的join
     */
    people.join(dept,people("depId")===dept("depId"),"inner").show()
    //    +---+-----+------+----------+-------+------+-----+---------------+
    //    |age|depId|gender|job number|   name|salary|depId|           name|
    //      +---+-----+------+----------+-------+------+-----+---------------+
    //    | 33|    1|  male|       001|Michael|  3000|    1|Develoment Dept|
    //    | 19|    1|  male|       003| Justin|  4000|    1|Develoment Dept|
    //    | 32|    1|  male|       004|   John|  4000|    1|Develoment Dept|
    //    | 26|    3|  male|       006|   Jack|  4000|    3|   Testing Dept|
    //    | 30|    2|female|       002|   andy|  4000|    2| Personnel Dept|
    //    | 20|    2|female|       005|  Herry|  4000|    2| Personnel Dept|
    //    +---+-----+------+----------+-------+------+-----+---------------+



    sc.stop()

  }

}

spark-DataFrame学习记录-[2]解决spark-dataframe的JOIN操作之后产生重复列(Reference '***' is ambiguous问题解决)

package com.dt.spark.main.DataFrameLearn

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
/**
  * spark-DataFrame学习记录-[2]解决spark-dataframe的JOIN操作之后产生重复列(Reference '***' is ambiguous问题解决)
  */
object DataFrameSQL_2 {
  def main(args: Array[String]) {

    val conf = new SparkConf()
    conf.setAppName("test")
    conf.setMaster("local")

    val sc = new SparkContext(conf)

    //设置日志级别
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.apache.spark.sql").setLevel(Level.WARN)

    val sqlContext = new HiveContext(sc)
    import sqlContext.implicits._


    var df = sc.parallelize(Array(
      ("one", "A", 1), ("one", "B", 2), ("two", "A", 3), ("two", "B", 4)
    )).toDF("key1", "key2", "value")
    df.show()
    //    +----+----+-----+
    //    |key1|key2|value|
    //    +----+----+-----+
    //    | one|   A|    1|
    //    | one|   B|    2|
    //    | two|   A|    3|
    //    | two|   B|    4|
    //    +----+----+-----+

    val df2 = sc.parallelize(Array(
      ("one", "A", 5), ("two", "A", 6)
    )).toDF("key1", "key2", "value2")
    df2.show()

    //    +----+----+------+
    //    |key1|key2|value2|
    //    +----+----+------+
    //    | one|   A|     5|
    //    | two|   A|     6|
    //    +----+----+------+

    println("==========")
    df = df.unionAll(df2)
    df.show()
    println("==========")

    val joined = df.join(df2, df("key1") === df2("key1") && df("key2") === df2("key2"), "left_outer")
    joined.show()

    //    +----+----+-----+----+----+------+
    //    |key1|key2|value|key1|key2|value2|
    //    +----+----+-----+----+----+------+
    //    | two|   A|    3| two|   A|     6|
    //    | two|   B|    4|null|null|  null|
    //    | one|   A|    1| one|   A|     5|
    //    | one|   B|    2|null|null|  null|
    //    +----+----+-----+----+----+------+

    df.join(df2, Seq("key1", "key2"), "left_outer").show()

    //    +----+----+-----+------+
    //    |key1|key2|value|value2|
    //    +----+----+-----+------+
    //    | two|   A|    3|     6|
    //    | two|   B|    4|  null|
    //    | one|   A|    1|     5|
    //    | one|   B|    2|  null|
    //    +----+----+-----+------+


    df.join(df2, Seq("key1")).show()
    //df
    //    +----+----+-----+
    //    |key1|key2|value|
    //    +----+----+-----+
    //    | one|   A|    1|
    //    | one|   B|    2|
    //    | two|   A|    3|
    //    | two|   B|    4|
    //    +----+----+-----+

    //df2
    //    +----+----+------+
    //    |key1|key2|value2|
    //    +----+----+------+
    //    | one|   A|     5|
    //    | two|   A|     6|
    //    +----+----+------+

    //    +----+----+-----+----+------+
    //    |key1|key2|value|key2|value2|
    //    +----+----+-----+----+------+
    //    | two|   A|    3|   A|     6|
    //    | two|   B|    4|   A|     6|
    //    | one|   A|    1|   A|     5|
    //    | one|   B|    2|   A|     5|
    //    +----+----+-----+----+------+


    val df22 = df2.withColumnRenamed("key1","k1").withColumnRenamed("key2","k2")

    df.join(df22,df("key1") === df22("k1") && df("key2") === df22("k2"), "left_outer").show()
    //    +----+----+-----+----+----+------+
    //    |key1|key2|value|  k1|  k2|value2|
    //    +----+----+-----+----+----+------+
    //    | two|   A|    3| two|   A|     6|
    //    | two|   B|    4|null|null|  null|
    //    | one|   A|    1| one|   A|     5|
    //    | one|   B|    2|null|null|  null|
    //    +----+----+-----+----+----+------+
    (df.join(df22,df("key1") === df22("k1") && df("key2") === df22("k2"), "left_outer").columns.foreach(println(_)))

    sc.stop()

  }

}

spark-DataFrame学习记录-[3]以Json字符串构建RDD转DF

package com.dt.spark.main.DataFrameLearn

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}

/**
  * spark-DataFrame学习记录-[3]以Json字符串构建RDD转DF
(1)字符串中$闭包自由变值
(2)以Json字符串构建RDD转DF
  */
object DataFrameSQL_3 {
  def main(args: Array[String]) {

    val conf = new SparkConf()
    conf.setAppName("test")
    conf.setMaster("local")

    val sc = new SparkContext(conf)

    //设置日志级别
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.apache.spark.sql").setLevel(Level.WARN)

    val sqlContext = new HiveContext(sc)

    val name = "yourName"
    val age  = 12
    val tesRDD = sc.makeRDD(s"""{"num":"$name","age":"$age"}""".stripMargin:: Nil)
    val tesRDD2DF = sqlContext.read.json(tesRDD)
    tesRDD2DF.show()
    //    +---+--------+
    //    |age|     num|
    //    +---+--------+
    //    | 12|yourName|
    //    +---+--------+

    sc.stop()

  }

}

spark-DataFrame学习记录-[4]aggr

package com.dt.spark.main.DataFrameLearn

import org.apache.spark.sql.functions._
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by hjw on 17/3/25.
  */
//http://stackoverflow.com/questions/30218140/spark-how-to-translate-countdistinctvalue-in-dataframe-apis#
object DataFrameSQL_agg {

  case class KV(page: String, visitor: String)

  case class Logs(city_id: String, ctype: String, uuid: String)

  def main(args: Array[String]) {
    val list = List(("PAG1", "V1"),
      ("PAG1", "V1"),
      ("PAG2", "V1"),
      ("PAG2", "V2"),
      ("PAG2", "V1"),
      ("PAG1", "V1"),
      ("PAG1", "V2"),
      ("PAG1", "V1"),
      ("PAG1", "V2"),
      ("PAG1", "V1"),
      ("PAG2", "V2"),
      ("PAG1", "V3"))

    val list2 = List(
      ("1001", "android", "V1"),
      ("1001", "android", "V1"),
      ("1001", "android", "V2"),
      ("1001", "iphone", "V1"),
      ("1001", "iphone", "V1"),
      ("1002", "android", "V1"),
      ("1002", "android", "V1"),
      ("1002", "android", "V2"),
      ("1002", "iphone", "V1"),
      ("1002", "iphone", "V1")
    )



    val list3 = List(
      ("1001", "android", "V1"),
      ("1001", "android", "V1"),
      ("1001", "android", "V2"),
      ("1001", "android", "V1"),
      ("1001", "android", "V1"),
      ("1002", "android", "V1"),
      ("1002", "android", "V1"),
      ("1002", "android", "V2"),
      ("1002", "iphone", "V1"),
      ("1002", "iphone", "V1")
    )


    val conf = new SparkConf()

    conf.setAppName("test")
    conf.setMaster("local")

    val sc = new SparkContext(conf)
    val sqlContext = new HiveContext(sc)
    import sqlContext.implicits._

    val logs = sc.parallelize(list.map(iter => KV(iter._1, iter._2))).toDF

    //======方法一:SQL===============
    logs.registerTempTable("logs")
    val sqlResult = sqlContext.sql(
      """
       select
         page,
         visitor  as vs
       from(
       select
       page,
       count(distinct visitor) as visitor
       from logs
        group by page
        )A
      """)
    val result = sqlResult.map(x => (x(0).toString, x(1).toString))
    result.foreach(println)
    //    (PAG1,3)
    //    (PAG2,2)

    //======方法一:API===============
    //df.groupby("a").agg()
    //df.select('a).distinct.count
    //df.groupBy("word").agg(countDistinct("title")).show()
    val result2 = logs.select("page", "visitor").groupBy('page).agg('page, countDistinct('visitor))
    result.foreach(println)



    val Log = sc.parallelize(list2.map(iter => Logs(iter._1, iter._2, iter._3))).toDF

    println("==========test3=========")
    val result3 = Log.groupBy('city_id,'ctype).agg(countDistinct('uuid))
    result3.foreach(println)
    //      [1001,android,2]
    //      [1001,iphone,1]
    //      [1002,iphone,1]
    //      [1002,android,2]

    println("==========test4=========")
    val result4 = Log.groupBy('city_id).agg(countDistinct('uuid))
    result4.foreach(println)
    //      [1001,2]
    //      [1002,2]

    println("==========test5=========")
    val Log3 = sc.parallelize(list3.map(iter => Logs(iter._1, iter._2, iter._3))).toDF
    val result5 = Log3.groupBy('city_id).agg(countDistinct('uuid),countDistinct('ctype))
    result5.foreach(println)
     //      [1002,2,2]
     //      [1001,2,1]


    sc.stop()
  }

}

spark-DataFrame学习记录-[5]explode

package com.dt.spark.main.DataFrameLearn

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.functions._

case class Book(title:String ,words:String)
case class Word(word:String)
/**
  * Created by hjw on 16/7/17.
  */

//df.groupby("a").agg()
//df.select('a).distinct.count
//df.groupBy("word").agg(countDistinct("title")).show()


/**
  * explode
  */
object DataFrameSQL_explode {
  def main(args: Array[String]) {

    val conf = new SparkConf()
    conf.setAppName("test")
    conf.setMaster("local")

    val sc = new SparkContext(conf)

    //设置日志级别
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.apache.spark.sql").setLevel(Level.WARN)

    val sqlContext = new HiveContext(sc)
    import sqlContext.implicits._

    //加载txt文件
    val book = sc.textFile("./src/com/dt/spark/main/DataFrameLearn/srcFile/book.txt")


    //==========================================

    val df = book.map(_.split(",")).map(b=>Book(b(0),b(1))).toDF()
    val allWords = df.explode('words){case Row(words:String)=>words.split(" ").map(Word(_))}
    allWords.map{
      row => "0 = " + row(0) + "; 1 = " + row(1) + "; 2 = " + row(2)
    }.foreach(println(_))
    //    0 = book_1; 1 = I am book1; 2 = I
    //    0 = book_1; 1 = I am book1; 2 = am
    //    0 = book_1; 1 = I am book1; 2 = book1
    //    0 = book_2; 1 = red had; 2 = red
    //    0 = book_2; 1 = red had; 2 = had


    //import org.apache.spark.sql.functions._
    allWords.groupBy("word").agg(countDistinct("title")).show()
//    +-----+------------+
//    | word|count(title)|
//      +-----+------------+
//    |  had|           1|
//    |book1|           1|
//    |   am|           1|
//    |    I|           1|
//    |  red|           1|
//    +-----+------------+
      sc.stop()

  }

}

spark-DataFrame学习记录-[6]DF2RDD

package com.dt.spark.main.DataFrameToRDDLearn

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}


object DataFrameToRDD_2 {
  def main(args: Array[String]) {

    val conf = new SparkConf()
    conf.setAppName("test")
    conf.setMaster("local")

    val sc = new SparkContext(conf)

    //设置日志级别
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.apache.spark.sql").setLevel(Level.WARN)



    val sqlContext = new HiveContext(sc)
    //==========================================
    /*
    通过编程指定模式将DataFrame转化成RDD
     */
    val people = sc.textFile("./src/com/dt/spark/main/DataFrameToRDDLearn/srcFile/people.txt")

    //添加隐式转化,利用toDF
    //import sqlContext.implicits._

    //==========================================
    /*
    指定一个schema包含的列名
     */
    val schemaString = "name age"

    //==========================================
    /*
    构建schema
    //import org.apache.spark.sql.types.{StructField, StringType, StructType}
     */
     val schema = StructType(schemaString.split(" ").
     map(fieldName=>StructField(fieldName,StringType,true)))
     //org.apache.spark.sql.types.StructType(schemaString.split(" ")
    //.map(fieldName=>(fieldName,StringType,true)))


    //==========================================
    /*
    将RDD转化成DataFrame的Row类型
    将RDD中的每条记录转换成一个行(Row)的实例
     */
    val rowRDD= people.map(_.split(",")).map(p=>Row(p(0),p(1).trim))

    val peopleDF = sqlContext.createDataFrame(rowRDD,schema)

    peopleDF.registerTempTable("peoletablefromschema")

    //==========================================
    /*
    查看当前库的全部表名
     */
    sqlContext.tableNames().foreach(println(_))
    //peoletablefromschema

    //==========================================
    /*
     利用sql查询
     */
    val teenagers = sqlContext.sql("select name from peoletablefromschema where age >= 13 and age <= 19")
    teenagers.map(t=>"Name: " + t(0)).collect().foreach(println)
    //    Name: Justin
    //    Name: Herry

    val dt = "20161208"
    val squaresDF = sc.makeRDD(s"""{"instid":"1","ctime":"1","event_type":"1","status":"1","event_id":"10014","path_info":"OK"}""".stripMargin:: Nil)
    println(squaresDF)
    val otherPeople = sqlContext.read.json(squaresDF)

    sc.stop()

  }

}

results matching ""

    No results matching ""