Hudi 數據湖的插入,更新,查詢,分析操作示例( 二 )


Hudi 數據湖的插入,更新,查詢,分析操作示例

文章插圖
準備一個 data2 文件
cp data1 data2 && vi data2data2 的數據更新為
{'uid':1,'uname':'grey1','dt':'2022/11'}{'uid':2,'uname':'tony1','dt':'2022/12'}然后執行
hdfs dfs -put data2 /mydata/更新數據的代碼,我們可以做如下調整,完整代碼如下
package git.snippet.testimport git.snippet.entity.MyEntityimport git.snippet.util.JsonUtilimport org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}import org.apache.spark.SparkConfimport org.apache.spark.sql.{SaveMode, SparkSession}object DataUpdate {def main(args: Array[String]): Unit = {System.setProperty("HADOOP_USER_NAME", "root")val sparkConf = new SparkConf().setAppName("MyFirstDataApp").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").setMaster("local[*]")val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()val ssc = sparkSession.sparkContextssc.hadoopConfiguration.set("dfs.client.use.datanode.hostname", "true")updateData(sparkSession)}def updateData(sparkSession: SparkSession) = {import org.apache.spark.sql.functions._import sparkSession.implicits._val commitTime = System.currentTimeMillis().toString //生成提交時間val df = sparkSession.read.text("/mydata/data2").mapPartitions(partitions => {partitions.map(item => {val jsonObject = JsonUtil.getJsonData(item.getString(0))MyEntity(jsonObject.getIntValue("uid"), jsonObject.getString("uname"), jsonObject.getString("dt"))})})val result = df.withColumn("ts", lit(commitTime)) //添加ts 時間戳列.withColumn("uuid", col("uid")) //添加uuid 列.withColumn("hudipart", col("dt")) //增加hudi分區列result.write.format("org.apache.hudi")//.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL).option("hoodie.insert.shuffle.parallelism", 2).option("hoodie.upsert.shuffle.parallelism", 2).option("PRECOMBINE_FIELD_OPT_KEY", "ts") //指定提交時間列.option("RECORDKEY_FIELD_OPT_KEY", "uuid") //指定uuid唯一標示列.option("hoodie.table.name", "myDataTable").option("hoodie.datasource.write.partitionpath.field", "hudipart") //分區列.mode(SaveMode.Append).save("/snippet/data/hudi")}}執行更新數據的代碼 。
驗證一下,訪問:http://192.168.100.130:50070/explorer.html#/snippet/data/hudi/2022
可以查看到更新的數據情況
Hudi 數據湖的插入,更新,查詢,分析操作示例

文章插圖
數據查詢的代碼也很簡單,完整代碼如下
package git.snippet.testimport org.apache.spark.SparkConfimport org.apache.spark.sql.SparkSessionobject DataQuery {def main(args: Array[String]): Unit = {System.setProperty("HADOOP_USER_NAME", "root")val sparkConf = new SparkConf().setAppName("MyFirstDataApp").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").setMaster("local[*]")val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()val ssc = sparkSession.sparkContextssc.hadoopConfiguration.set("dfs.client.use.datanode.hostname", "true")queryData(sparkSession)}def queryData(sparkSession: SparkSession) = {val df = sparkSession.read.format("org.apache.hudi").load("/snippet/data/hudi/*/*")df.show()println(df.count())}}執行 , 輸出以下信息,驗證成功 。
Hudi 數據湖的插入,更新,查詢,分析操作示例

文章插圖
數據查詢也支持很多查詢條件 , 比如增量查詢,按時間段查詢等 。
接下來是 flink 實時數據分析的服務,首先需要在 master 上啟動 kafka , 并創建 一個名字為 mytopic 的 topic,詳見Linux 下搭建 Kafka 環境
相關命令如下
創建topic
kafka-topics.sh --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --create --topicmytopic生產者啟動配置
kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic mytopic消費者啟動配置
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic mytopic然后運行如下代碼
package git.snippet.analyzer;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Properties;public class DataAnalyzer {public static void main(String[] args) {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers", "192.168.100.130:9092");properties.setProperty("group.id", "snippet");//構建FlinkKafkaConsumerFlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("mytopic", new SimpleStringSchema(), properties);//指定偏移量myConsumer.setStartFromLatest();final DataStream<String> stream = env.addSource(myConsumer);env.enableCheckpointing(5000);stream.print();try {env.execute("DataAnalyzer");} catch (Exception e) {e.printStackTrace();}}}

推薦閱讀