当前位置: > 财经>正文

从RDD转换得到DataFrame的两种方式 信托的两种方式分别是

2023-09-01 16:14:39 互联网 未知 财经

从RDD转换得到DataFrame的两种方式

在本地文件夹中创建一个文件people.txt 作为样例数据,其内容: Bob, 25 Jack, 22 Tom, 31 现在把people.txt加载到内存中生成一个DataFrame,并查询其中数据。

1.利用反射机制推断RDD模式

在利用反射机制推断RDD模式时,需要首先定义一个case class, 因为只有case class 才能被Spark 隐式地转换为DataFrame

case class Person(name:String, age:Long)这句话一定要写在object外面*

添加隐式转换:import spark.implicits._ 创建DataFrame

val peopleDF = spark.sparkContext.textFile("E:/pictures_test/people.txt") .map(_.split(",")) .map(attributes => Person(attributes(0), attributes(1).trim.toInt)) .toDF()

peopleDF 必须注册为临时表,才能供查询使用 peopleDF.createOrReplaceTempView("people")

添加查询的SQL语句 val personsRDD = spark.sql("select name, age from people where age > 20")

输出查询结果 personsRDD.map(t => "name:" + t(0) + "," + "age:" + t(1)).show()

全部代码:

import org.apache.spark.sql.SparkSessioncase class Person(name:String, age:Long) // 定义一个case classobject RDDToDataFrame_1 { def main(args:Array[String]): Unit ={ val spark = SparkSession .builder() .appName("RDDToDataFrame_1") .master("local[2]") .getOrCreate() import spark.implicits._ val peopleDF = spark.sparkContext.textFile("E:/pictures_test/people.txt") .map(_.split(",")) .map(attributes => Person(attributes(0), attributes(1).trim.toInt)) .toDF() peopleDF.createOrReplaceTempView("people") val personsRDD = spark.sql("select name, age from people where age > 30") personsRDD.map(t => "name:" + t(0) + "," + "age:" + t(1)).show() }} 2.利用编程方式定义RDD模式

当无法提前定义case class 时,就需要采用编程方式定义RDD模式

通过编程方式定义RDD模式的实现过程: 1.制作“表头“ 2.制作”表中的记录“ 3.把”表头“和”表中的记录“拼装在一起

import org.apache.spark.sql.{DataFrame, Row, SparkSession}import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}object RDDToDataFrame_2 { def main(args: Array[String]): Unit ={ val spark = SparkSession .builder() .appName("RDDToDataFrame_2") .master("local[2]") .getOrCreate() //添加隐式转换 import spark.implicits._ val fields = Array(StructField("name", StringType, true), StructField("age", IntegerType, true)) val schema = StructType(fields) val peopleRDD= spark.sparkContext.textFile("E:/pictures_test/people.txt") val rowRDD = peopleRDD.map(_.split(",")) .map(a => Row(a(0), a(1).trim.toInt)) val peopleDF = spark.createDataFrame(rowRDD, schema) peopleDF.createOrReplaceTempView("people") val results: DataFrame = spark.sql("select name, age from people") results.map(t => "name:" + t(0) + "," + "age:" + t(1)).show() }}

结果截图:

版权声明: 本站仅提供信息存储空间服务,旨在传递更多信息,不拥有所有权,不承担相关法律责任,不代表本网赞同其观点和对其真实性负责。如因作品内容、版权和其它问题需要同本网联系的,请发送邮件至 举报,一经查实,本站将立刻删除。