彈性分布式數據集 RDD及常用算子( 二 )

sample:轉換算子def main(args: Array[String]): Unit = {/*** sample:轉換算子* 用于對數據進行取樣* 總共有三個參數:* withReplacement:有無放回* fraction:抽樣的比例(這個比例并不是精確的,因為抽樣是隨機的)* seed:隨機數種子*/val conf: SparkConf = new SparkConf()conf.setAppName("Demo06sample")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)val stuRDD: RDD[String] = sc.textFile("Spark/data/students.txt")stuRDD.sample(withReplacement = false, 0.1).foreach(println)// 如果想讓每次抽樣的數據都一樣 , 則可以將seed進行固定stuRDD.sample(withReplacement = false, 0.01, 10).foreach(println)}mapValues:轉換算子def main(args: Array[String]): Unit = {/*** mapValues:轉換算子* 同map類似,只不過mapValues需要對KV格式的RDD的Value進行遍歷處理*/val conf: SparkConf = new SparkConf()conf.setAppName("Demo07mapValues")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)val kvRDD: RDD[(String, Int)] = sc.parallelize(List("k1" -> 1, "k2" -> 2, "k3" -> 3))// 對每個Key對應的Value進行平方kvRDD.mapValues(i => i * i).foreach(println)// 使用map方法實現kvRDD.map(kv => (kv._1, kv._2 * kv._2)).foreach(println)}join:轉換算子def main(args: Array[String]): Unit = {/*** join:轉換算子* 需要作用在兩個KV格式的RDD上,會將相同的Key的數據關聯在一起*/val conf: SparkConf = new SparkConf()conf.setAppName("Demo08join")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)// 加載學生數據,并轉換成KV格式,以ID作為Key,其他數據作為Valueval stuKVRDD: RDD[(String, String)] = sc.textFile("Spark/data/students.txt").map(line => {val id: String = line.split(",")(0)// split 指定分割符切分字符串得到Array// mkString 指定拼接符將Array轉換成字符串val values: String = line.split(",").tail.mkString("|")(id, values)})// 加載分數數據 , 并轉換成KV格式,以ID作為Key,其他數據作為Valueval scoKVRDD: RDD[(String, String)] = sc.textFile("Spark/data/score.txt").map(line => {val id: String = line.split(",")(0)val values: String = line.split(",").tail.mkString("|")(id, values)})// join : 內連接val joinRDD1: RDD[(String, (String, String))] = stuKVRDD.join(scoKVRDD)//joinRDD1.foreach(println)//stuKVRDD.leftOuterJoin(scoKVRDD).foreach(println)//stuKVRDD.rightOuterJoin(scoKVRDD).foreach(println)stuKVRDD.fullOuterJoin(scoKVRDD).foreach(println)}union:轉換算子,用于將兩個相類型的RDD進行連接def main(args: Array[String]): Unit = {// union:轉換算子,用于將兩個相類型的RDD進行連接val conf: SparkConf = new SparkConf()conf.setAppName("Demo09union")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)val stuRDD: RDD[String] = sc.textFile("Spark/data/students.txt")val sample01RDD: RDD[String] = stuRDD.sample(withReplacement = false, 0.01, 1)val sample02RDD: RDD[String] = stuRDD.sample(withReplacement = false, 0.01, 1)println(s"sample01RDD的分區數:${sample01RDD.getNumPartitions}")println(s"sample02RDD的分區數:${sample02RDD.getNumPartitions}")// union 操作最終得到的RDD的分區數等于兩個RDD分區數之和println(s"union后的分區數:${sample01RDD.union(sample02RDD).getNumPartitions}")val intRDD: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5))//sample01RDD.union(intRDD) // 兩個RDD的類型不一致無法進行union// union 等同于SQL中的union allsample01RDD.union(sample02RDD).foreach(println)// 如果要進行去重 即等同于SQL中的union 則可以在 union后再進行distinctsample01RDD.union(sample02RDD).distinct().foreach(println)}groupBy:按照某個字段進行分組def main(args: Array[String]): Unit = {/*** groupBy:按照某個字段進行分組*/val conf: SparkConf = new SparkConf()conf.setAppName("Demo10groupBy")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)val stuRDD: RDD[String] = sc.textFile("Spark/data/students.txt")// 統計班級人數stuRDD.groupBy(s => s.split(",")(4)).map(kv => s"${kv._1},${kv._2.size}").foreach(println)}groupByKey:轉換算子 , 需要作用在KV格式的RDD上 def main(args: Array[String]): Unit = {/*** groupByKey:轉換算子 , 需要作用在KV格式的RDD上*/val conf: SparkConf = new SparkConf()conf.setAppName("Demo11groupByKey")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)val stuRDD: RDD[String] = sc.textFile("Spark/data/students.txt")// 使用groupByKey統計班級人數// 將學生數據變成KV格式的RDD,以班級作為Key,1作為Valueval clazzKVRDD: RDD[(String, Int)] = stuRDD.map(s => (s.split(",")(4), 1))val grpRDD: RDD[(String, Iterable[Int])] = clazzKVRDD.groupByKey()grpRDD.map(kv => s"${kv._1},${kv._2.size}").foreach(println)}reduceByKey:轉換算子,需要作用在KV格式的RDD上,不僅能實現分組,還能實現聚合def main(args: Array[String]): Unit = {/*** reduceByKey:轉換算子,需要作用在KV格式的RDD上,不僅能實現分組,還能實現聚合* 需要接受一個函數f* 函數f:兩個參數,參數的類型同RDD的Value的類型一致,最終需要返回同RDD的Value的類型一致值* 實際上函數f可以看成一個聚合函數* 常見的聚合函數(操作):max、min、sum、count、avg* reduceByKey可以實現Map端的預聚合,類似MR中的Combiner* 并不是所有的操作都能使用預聚合,例如avg就無法實現*/val conf: SparkConf = new SparkConf()conf.setAppName("Demo11groupByKey")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)val stuRDD: RDD[String] = sc.textFile("Spark/data/students.txt")// 使用reduceByKey統計班級人數// 將學生數據變成KV格式的RDD , 以班級作為Key,1作為Valueval clazzKVRDD: RDD[(String, Int)] = stuRDD.map(s => (s.split(",")(4), 1))clazzKVRDD.reduceByKey((i1: Int, i2: Int) => i1 + i2).foreach(println)// 簡寫形式clazzKVRDD.reduceByKey((i1, i2) => i1 + i2).foreach(println)clazzKVRDD.reduceByKey(_ + _).foreach(println)}

推薦閱讀