彈性分布式數據集 RDD及常用算子( 三 )

aggregateByKey:轉換算子,可以實現將多個聚合方式放在一起實現,并且也能對Map進行預聚合def main(args: Array[String]): Unit = {/*** aggregateByKey:轉換算子,可以實現將多個聚合方式放在一起實現,并且也能對Map進行預聚合* 可以彌補reduceByKey無法實現avg操作**/val conf: SparkConf = new SparkConf()conf.setAppName("Demo13aggregateByKey")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)val stuRDD: RDD[String] = sc.textFile("Spark/data/students.txt")val ageKVRDD: RDD[(String, Int)] = stuRDD.map(s => (s.split(",")(4), s.split(",")(2).toInt))val clazzCntKVRDD: RDD[(String, Int)] = stuRDD.map(s => (s.split(",")(4), 1))// 統計每個班級年齡之和val ageSumRDD: RDD[(String, Int)] = ageKVRDD.reduceByKey(_ + _)// 統計每個班級人數val clazzCntRDD: RDD[(String, Int)] = clazzCntKVRDD.reduceByKey(_ + _)// 統計每個班級的平均年齡ageSumRDD.join(clazzCntRDD).map {case (clazz: String, (ageSum: Int, cnt: Int)) =>(clazz, ageSum.toDouble / cnt)}.foreach(println)/*** zeroValue:初始化的值,類型自定義,可以是數據容器* seqOp:在組內(每個分區內部即每個Map任務)進行的操作 , 相當是Map端的預聚合操作* combOp:在組之間(每個Reduce任務之間)進行的操作,相當于就是最終每個Reduce的操作*/// 使用aggregateByKey統計班級年齡之和ageKVRDD.aggregateByKey(0)((age1: Int, age2: Int) => {age1 + age2 // 預聚合}, (map1AgeSum: Int, map2AgeSum: Int) => {map1AgeSum + map2AgeSum // 聚合}).foreach(println)// 使用aggregateByKey統計班級人數clazzCntKVRDD.aggregateByKey(0)((c1: Int, c2: Int) => {c1 + 1 // 預聚合}, (map1Cnt: Int, map2Cnt: Int) => {map1Cnt + map2Cnt // 聚合}).foreach(println)// 使用aggregateByKey統計班級的平均年齡ageKVRDD.aggregateByKey((0, 0))((t2: (Int, Int), age: Int) => {val mapAgeSum: Int = t2._1 + ageval mapCnt: Int = t2._2 + 1(mapAgeSum, mapCnt)}, (map1U: (Int, Int), map2U: (Int, Int)) => {val ageSum: Int = map1U._1 + map2U._1val cnt: Int = map1U._2 + map2U._2(ageSum, cnt)}).map {case (clazz: String, (sumAge: Int, cnt: Int)) =>(clazz, sumAge.toDouble / cnt)}.foreach(println)}cartesian:轉換算子,可以對兩個RDD做笛卡爾積def main(args: Array[String]): Unit = {/*** cartesian:轉換算子,可以對兩個RDD做笛卡爾積** 當數據重復時 很容易觸發笛卡爾積 造成數據的膨脹*/val conf: SparkConf = new SparkConf()conf.setAppName("Demo14cartesian")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)val idNameKVRDD: RDD[(String, String)] = sc.parallelize(List(("001", "zs"), ("002", "ls"), ("003", "ww")))val genderAgeKVRDD: RDD[(String, Int)] = sc.parallelize(List(("男", 25), ("女", 20), ("男", 22)))idNameKVRDD.cartesian(genderAgeKVRDD).foreach(println)}sortBy:轉換算子 可以指定一個字段進行排序 默認升序def main(args: Array[String]): Unit = {/*** sortBy:轉換算子 可以指定一個字段進行排序 默認升序*/val conf: SparkConf = new SparkConf()conf.setAppName("Demo15sortBy")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)val intRDD: RDD[Int] = sc.parallelize(List(1, 3, 6, 5, 2, 4, 6, 8, 9, 7))intRDD.sortBy(i => i).foreach(println) // 升序intRDD.sortBy(i => -i).foreach(println) // 降序intRDD.sortBy(i => i, ascending = false).foreach(println) // 降序val stuRDD: RDD[String] = sc.textFile("Spark/data/students.txt")// 按照年齡進行降序stuRDD.sortBy(s => -s.split(",")(2).toInt).foreach(println)}常見的Action算子def main(args: Array[String]): Unit = {/*** 常見的Action算子:foreach、take、collect、count、reduce、save相關* 每個Action算子都會觸發一個job**/val conf: SparkConf = new SparkConf()conf.setAppName("Demo16Action")conf.setMaster("local")val sc: SparkContext = new SparkContext(conf)val stuRDD: RDD[String] = sc.textFile("Spark/data/students.txt")/*** foreach:對每條數據進行處理,跟map算子的區別在于,foreach算子沒有返回值*/stuRDD.foreach(println)// 將stuRDD中的每條數據保存到MySQL中/*** 建表語句:* CREATE TABLE `stu_rdd` (* `id` int(10) NOT NULL AUTO_INCREMENT,* `name` char(5) DEFAULT NULL,* `age` int(11) DEFAULT NULL,* `gender` char(2) DEFAULT NULL,* `clazz` char(4) DEFAULT NULL,* PRIMARY KEY (`id`)* ) ENGINE=InnoDB DEFAULT CHARSET=utf8;*/// 每一條數據都會創建一次連接,頻繁地創建銷毀連接效率太低 , 不合適//stuRDD.foreach(line => {//val splits: Array[String] = line.split(",")//// 1、建立連接//val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/student?useSSL=false", "root", "123456")//println("建立了一次連接")//// 2、創建prepareStatement//val pSt: PreparedStatement = conn.prepareStatement("insert into stu_rdd(id,name,age,gender,clazz) values(?,?,?,?,?)")////// 3、傳入參數//pSt.setInt(1, splits(0).toInt)//pSt.setString(2, splits(1))//pSt.setInt(3, splits(2).toInt)//pSt.setString(4, splits(3))//pSt.setString(5, splits(4))////// 4、執行SQL//pSt.execute()////// 5、關閉連接//conn.close()////})/*** take : Action算子 , 可以將指定條數的數據轉換成Scala中的Array**/// 這里的foreach是Array的方法,不是算子stuRDD.take(5).foreach(println)/*** collect : Action算子 , 可以將RDD中所有的數據轉換成Scala中的Array*/// 這里的foreach是Array的方法,不是算子stuRDD.collect().foreach(println)/*** count : Action算子,統計RDD中數據的條數*/println(stuRDD.count())/*** reduce : Action算子,將所有的數據作為一組進行聚合操作*/// 統計所有學生的年齡之和println(stuRDD.map(_.split(",")(2).toInt).reduce(_ + _))/*** save相關:* saveAsTextFile、saveAsObjectFile*/}

推薦閱讀