教育行業(yè)A股IPO第一股(股票代碼 003032)

全國咨詢/投訴熱線:400-618-4000

編程的方法定義Schema信息【Python大數(shù)據(jù)技術(shù)文章】

更新時間:2021年09月08日18時13分 來源:傳智教育 瀏覽次數(shù):

好口碑IT培訓(xùn)

當case類不能提前定義的時候,就需要采用編程方式定義Schema信息,定義DataFrame主要包含3個步驟,具體如下:

(1)創(chuàng)建一個Row對象結(jié)構(gòu)的RDD;

(2)基于StructType類型創(chuàng)建Schema;

(3)通過SparkSession提供的createDataFrame()方法來拼接Schema。

根據(jù)上述步驟,創(chuàng)建SparkSqlSchema. scala文件,使用編程方式定義Schema信息的具體代碼如文件4-3所示。

文件4-3 SparkSqlSchema.scala

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sq1.types.
{IntegerType,StringType,StructField,StructType}
import org.apache.spark.sql.(DataFrame,Row,Sparkession)
object SparkSqlSchema {
def main(args: Array[string]): Unit=(
//1.創(chuàng)建SparkSession
val spark: sparkSession=Sparksession.bullder()
.appName ("SparkSq1Schema")
.master ("1oca1[2]")
.getOrCreate ()
//2.獲取sparkConttext對象
val sc: SparkContext=spark.sparkContext
//設(shè)置日志打印級別
sc.setLogLevel ( "WARN")
//3.加載數(shù)據(jù)
val dataRDD:RDD[String]=sc.textFile("D://spark//person.txt")
//4.切分每一行
val dataArrayRDD:RDD[ Array[string]]=dataRDD.map( .split(" "))
//5.加載數(shù)據(jù)到Row對象中
val personRDD:RDD[Row]=
dataArrayRDD.map(x=>Row(x(0).toInt,x(1),x(2).toInt))
//6.創(chuàng)建Schema
val schema:StructType=StructType(Seq(
StructField("id",IntegerType,false),
StructField("name",StringType,false),
StructField("age", IntegerType, false)
))
//7.利用personRDD與Schema創(chuàng)建DataFrame
val personDF:DataFrame=spark.createDataFrame(personRDD,schema)
//8.DSL操作顯示DataFrame的數(shù)據(jù)結(jié)果
personDF . show ()
//9.將DataFrame注冊成表
personDF.createOrReplaceTempView ("t_person")
//10.sq1語句操作
spark.sq1 ("select¥from t_ person") .show()
//11.關(guān)閉資源
sc.stop()
spark.stop ()

在文件4-3中,第9~23行代碼表示將文件轉(zhuǎn)換成為RDD的基本步驟,第25~29行代碼即為編程方式定義Schema的核心代碼,Spark SQL提供了Class StructType( val fields:Array[StructField])類來表示模式信息,生成一個StructType對象,需要提供fields作為輸入?yún)?shù),fields是個集合類型,StructField(name,dataTypenullable)參數(shù)分別表示為字段名稱、字段數(shù)據(jù)類型、字段值是否允許為空值,根據(jù)person.txt文本數(shù)據(jù)文件分別設(shè)置id、name、age字段作為Schema,第31行代碼表示通過調(diào)用spark.createDataFrame()方法將RDD和Schema進行合并轉(zhuǎn)換為DataFrame,第33~40行代碼即為操作DataFrame進行數(shù)據(jù)查詢。

猜你喜歡:

Kerberos是什么?Kerberos怎樣做身份認證?

如何對序列執(zhí)行切片操作?【Python切片教程】

怎樣使用CLI調(diào)動Hive的一些功能?

MySQL表數(shù)據(jù)導(dǎo)入到Hive文件【圖文詳解】

傳智教育python大數(shù)據(jù)開發(fā)培訓(xùn)

0 分享到:
和我們在線交談!