彈性分布式數據集 RDD及常用算子

RDD(彈性分布式數據集)及常用算子RDD(Resilient Distributed Dataset)叫做彈性分布式數據集 , 是 Spark 中最基本的數據
處理模型 。代碼中是一個抽象類,它代表一個彈性的、不可變、可分區、里面的元素可并行
計算的集合 。
彈性

  • 存儲的彈性:內存與磁盤的自動切換;
  • 容錯的彈性:數據丟失可以自動恢復;
  • 計算的彈性:計算出錯重試機制;
  • 分片的彈性:可根據需要重新分片 。
【彈性分布式數據集 RDD及常用算子】分布式:數據存儲在大數據集群不同節點上
數據集:RDD 封裝了計算邏輯,并不保存數據
數據抽象:RDD 是一個抽象類,需要子類具體實現
不可變:RDD 封裝了計算邏輯,是不可以改變的,想要改變 , 只能產生新的 RDD,在
  • 新的 RDD 里面封裝計算邏輯
可分區、并行計算
五大特性:A list of partitionsA function for computing each splitA list of dependencies on other RDDsOptionally, a Partitioner for key-value RDDsOptionally, a list of preferred locations to compute each split on
彈性分布式數據集 RDD及常用算子

文章插圖
基礎編程RDD 創建從集合中創建 RDD,Spark 主要提供了兩個方法:parallelize 和 makeRDD
val conf = new SparkConf().setMaster("local").setAppName("spark")val sc = new SparkContext(conf)val rdd1 = sc.parallelize( List(1,2,3,4))val rdd2 = sc.makeRDD( List(1,2,3,4))rdd1.collect().foreach(println)rdd2.collect().foreach(println)sc.stop()從外部存儲(文件)創建 RDDval conf = new SparkConf().setMaster("local").setAppName("spark")val sc = new SparkContext(conf)val fileRDD: RDD[String] = sc.textFile("input")fileRDD.collect().foreach(println)sc.stop()RDD 轉換算子RDD 根據數據處理方式的不同將算子整體上分為 Value 類型、雙 Value 類型和 Key-Value
類型
/*** 在Spark所有的操作可以分為兩類:* 1、Transformation操作(算子)* 2、Action操作(算子)** 轉換算子是懶執行的,需要由Action算子觸發執行* 每個Action算子會觸發一個Job** Spark的程序的層級劃分:* Application --> Job --> Stage --> Task** 怎么區分Transformation算子和Action算子?* 看算子的返回值是否還是RDD,如果是由一個RDD轉換成另一個RDD,則該算子是轉換算子* 如果由一個RDD得到其他類型(非RDD類型或者沒有返回值),則該算子是行為算子** 在使用Spark處理數據時可以大體分為三個步驟:* 1、加載數據并構建成RDD* 2、對RDD進行各種各樣的轉換操作,即調用轉換算子* 3、使用Action算子觸發Spark任務的執行*/map算子/*** map算子:轉換算子* 需要接受一個函數f* 函數f:參數的個數只有一個,類型為RDD中數據的類型 => 返回值類型自己定義* 可以將函數f作用在RDD中的每一條數據上 , 需要函數f必須有返回值,最終會得到一個新的RDD* 傳入一條數據得到一條數據*/ def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf()conf.setAppName("Demo03map")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)val linesRDD: RDD[String] = sc.textFile("Spark/data/words.txt")linesRDD.map(line => {println("執行了map方法")line}).foreach(println)linesRDD.map(line => {println("執行了map方法")line}).foreach(println)linesRDD.map(line => {println("執行了map方法")line}).foreach(println)linesRDD.map(line => {println("執行了map方法")line}).foreach(println)List(1,2,3,4).map(line=>{println("List的map方法不需要什么Action算子觸發")line})}flatMap:轉換算子def main(args: Array[String]): Unit = {/*** flatMap:轉換算子* 同map算子類似 , 只不過所接受的函數f需要返回一個可以遍歷的類型* 最終會將函數f的返回值進行展開(扁平化處理),得到一個新的RDD* 傳入一條數據 會得到 多條數據*/val conf: SparkConf = new SparkConf()conf.setAppName("Demo04flatMap")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)// 另一種構建RDD的方式:基于Scala本地的集合例如Listval intRDD: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5))intRDD.foreach(println)val strRDD: RDD[String] = sc.parallelize(List("java,java,scala", "scala,scala,python", "python,python,python"))strRDD.flatMap(_.split(",")).foreach(println)}filter:轉換算子def main(args: Array[String]): Unit = {/*** filter:轉換算子* 用于過濾數據,需要接受一個函數f* 函數f:參數只有一個,類型為RDD中每一條數據的類型 => 返回值類型必須為Boolean* 最終會基于函數f返回的Boolean值進行過濾,得到一個新的RDD* 如果函數f返回的Boolean為true則保留數據* 如果函數f返回的Boolean為false則過濾數據*/val conf: SparkConf = new SparkConf()conf.setAppName("Demo05filter")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)val seqRDD: RDD[Int] = sc.parallelize(1 to 100, 4)println(seqRDD.getNumPartitions) // getNumPartitions并不是算子,它只是RDD的一個屬性//seqRDD.foreach(println)// 將奇數過濾出來seqRDD.filter(i => i % 2 == 1).foreach(println)// 將偶數過濾出來seqRDD.filter(i => i % 2 == 0).foreach(println)}

推薦閱讀