教育行業(yè)A股IPO第一股(股票代碼 003032)

全國咨詢/投訴熱線:400-618-4000

Schema怎樣實(shí)現(xiàn)反射機(jī)制推斷?

更新時(shí)間:2021年03月23日17時(shí)09分 來源:傳智教育 瀏覽次數(shù):


在Windows系統(tǒng)下開發(fā)Scala代碼,可以使用本地環(huán)境測試,因此我們首先需要在本地磁盤準(zhǔn)備文本數(shù)據(jù)文件,這里將HDFS中的/spark/person.txt文件下載到本地D:/spark/person.txt路徑下。從文件4-1可以看出,當(dāng)前數(shù)據(jù)文件共3列,我們可以非常容易的分析出,這三列分別是編號、姓名、年齡。但是計(jì)算機(jī)無法像人一樣直觀的感受字段的實(shí)際含義,因此我們需要通過反射機(jī)制來推斷包含特定類型對象的Schema信息。


Schema的開發(fā)


接下來我們打開IDEA開發(fā)工具,創(chuàng)建名為“spark_chapter04”的Maven工程,講解實(shí)現(xiàn)反射機(jī)制推斷Schema的開發(fā)流程。

1.添加Spark SQL依賴

在pom.xml文件中添加Spark SQL依賴,代碼片段如下所示。

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql_2.11</artifactId>
  <version>2.3.2</version>
</dependency>


2.編寫代碼

實(shí)現(xiàn)反射機(jī)制推斷Schema需要定義一個(gè)case class樣例類,定義字段和屬性,樣例類的參數(shù)名稱會被利用反射機(jī)制作為列名,編寫代碼如文件1所示。

文件1 CaseClassSchema.scala

   import org.apache.spark.SparkContext
   import org.apache.spark.rdd.RDD
   import org.apache.spark.sql.{DataFrame, Row, SparkSession}
   //定義樣例類
   case class Person(id:Int,name:String,age:Int)
   object CaseClassSchema {
     def main(args: Array[String]): Unit = {
       //1.構(gòu)建SparkSession
       val spark : SparkSession = SparkSession.builder()
                       .appName("CaseClassSchema ")
                       .master("local[2]")
                       .getOrCreate()
      //2.獲取SparkContext
      val sc : SparkContext =spark.sparkContext
      //設(shè)置日志打印級別
      sc.setLogLevel("WARN")
      //3.讀取文件
      val data: RDD[Array[String]] =
          sc.textFile("D://spark//person.txt").map(x=>x.split(" "))
      //4.將RDD與樣例類關(guān)聯(lián)
      val personRdd: RDD[Person] = 
                 data.map(x=>Person(x(0).toInt,x(1),x(2).toInt))
      //5.獲取DataFrame
      //手動導(dǎo)入隱式轉(zhuǎn)換
      import spark.implicits._
      val personDF: DataFrame = personRdd.toDF
      //------------DSL風(fēng)格操作開始-------------
      //1.顯示DataFrame的數(shù)據(jù),默認(rèn)顯示20行
      personDF.show()
      //2.顯示DataFrame的schema信息
      personDF.printSchema()
      //3.統(tǒng)計(jì)DataFrame中年齡大于30的人數(shù)
      println(personDF.filter($"age">30).count())
      //-----------DSL風(fēng)格操作結(jié)束-------------
      //-----------SQL風(fēng)格操作開始-------------
      //將DataFrame注冊成表
      personDF.createOrReplaceTempView("t_person")
      spark.sql("select * from t_person").show()
      spark.sql("select * from t_person where name='zhangsan'").show()
      //-----------SQL風(fēng)格操作結(jié)束-------------
      //關(guān)閉資源操作
      sc.stop()
      spark.stop()
    }

在文件1中,第5行代碼表示定義了一個(gè)Person的Case類,這是因?yàn)樵诶梅瓷錂C(jī)制推斷RDD模式時(shí),首先需要定義一個(gè)Case類,因?yàn)镾park SQL能夠自動將包含Case類的RDD隱式轉(zhuǎn)換成DataFrame,Case類定義了Table的結(jié)構(gòu),Case類的屬性通過反射機(jī)制變成表的列名。第9-14行代碼中通過SparkSession.builder()方法構(gòu)建名為“spark”的SparkSession對象,并通過spark對象獲取SparkContext。第18-26行代碼中,通過sc對象讀取文件,系統(tǒng)會將文件加載到內(nèi)存中生成一個(gè)RDD,將RDD 與case class Person進(jìn)行匹配,personRdd對象即為RDD【Person】,toDF()方法是將RDD轉(zhuǎn)換為DataFrame,在調(diào)用toDF()方法之前需要手動添加“spark.implicits._”包。第29-39行代碼表示當(dāng)前創(chuàng)建DataFrame對象后,使用DSL和SQL兩種語法操作風(fēng)格進(jìn)行數(shù)據(jù)查詢。DataFrame操作和之前在Spark-Shell操作示例大致相同,因此這里將不再展示執(zhí)行效果。



猜你喜歡:

Scala的方法和函數(shù)介紹【大數(shù)據(jù)文章】

Sequence File是什么?簡單介紹幾種文件儲存格式

Spark SQL架構(gòu)的工作原理和工作流程是什么?

Schema約束的優(yōu)勢是什么?

傳智教育大數(shù)據(jù)項(xiàng)目開發(fā)培訓(xùn)

0 分享到:
和我們在線交談!