Hudi 數據湖的插入,更新,查詢,分析操作示例

Hudi 數據湖的插入 , 更新,查詢,分析操作示例【Hudi 數據湖的插入,更新,查詢,分析操作示例】作者:Grey
原文地址:
博客園:Hudi 數據湖的插入,更新,查詢 , 分析操作示例
CSDN:Hudi 數據湖的插入 , 更新,查詢,分析操作示例
前置工作首先,需要先完成
Linux 下搭建 Kafka 環境
Linux 下搭建 Hadoop 環境
Linux 下搭建 HBase 環境
Linux 下搭建 Hive 環境
本文基于上述四個環境已經搭建完成的基礎上進行 Hudi 數據湖的插入,更新,查詢操作 。
開發環境Scala 2.11.8
JDK 1.8
需要熟悉 Maven 構建項目和 Scala 一些基礎語法 。
操作步驟master 節點首先啟動集群,執行:
stop-dfs.sh && start-dfs.sh啟動 yarn,執行:
stop-yarn.sh && start-yarn.sh然后準備一個 Mave 項目 , 在 src/main/resources 目錄下 , 將 Hadoop 的一些配置文件拷貝進來,分別是
$HADOOP_HOME/etc/hadoop/core-site.xml 文件
<?xml version="1.0" encoding="UTF-8"?><?xml-stylesheet type="text/xsl" ?><configuration><property><name>fs.default.name</name><value>hdfs://master:9000</value></property><property><name>hadoop.tmp.dir</name><value>/usr/local/hadoop/tmp</value></property></configuration>注意,需要在你訪問集群的機器上配置 host 文件,這樣才可以識別 master 節點 。
$HADOOP_HOME/etc/hadoop/hdfs-site.xml 文件
<?xml version="1.0" encoding="UTF-8"?><?xml-stylesheet type="text/xsl" ?><configuration><property><name>dfs.replication</name><value>1</value></property><property><name>dfs.permissions</name><value>false</value></property></configuration>$HADOOP_HOME/etc/hadoop/yarn-site.xml 文件,目前還沒有任何配置
<?xml version="1.0"?><configuration></configuration>然后,設計實體的數據結構,
package git.snippet.entitycase class MyEntity(uid: Int,uname: String,dt: String)插入數據代碼如下
package git.snippet.testimport git.snippet.entity.MyEntityimport git.snippet.util.JsonUtilimport org.apache.spark.SparkConfimport org.apache.spark.sql.{SaveMode, SparkSession}object DataInsertion {def main(args: Array[String]): Unit = {System.setProperty("HADOOP_USER_NAME", "root")val sparkConf = new SparkConf().setAppName("MyFirstDataApp").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").setMaster("local[*]")val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()val ssc = sparkSession.sparkContextssc.hadoopConfiguration.set("dfs.client.use.datanode.hostname", "true")insertData(sparkSession)}def insertData(sparkSession: SparkSession) = {import org.apache.spark.sql.functions._import sparkSession.implicits._val commitTime = System.currentTimeMillis().toString //生成提交時間val df = sparkSession.read.text("/mydata/data1").mapPartitions(partitions => {partitions.map(item => {val jsonObject = JsonUtil.getJsonData(item.getString(0))MyEntity(jsonObject.getIntValue("uid"), jsonObject.getString("uname"), jsonObject.getString("dt"))})})val result = df.withColumn("ts", lit(commitTime)) //添加ts 時間戳列.withColumn("uuid", col("uid")).withColumn("hudipart", col("dt")) //增加hudi分區列result.write.format("org.apache.hudi").option("hoodie.insert.shuffle.parallelism", 2).option("hoodie.upsert.shuffle.parallelism", 2).option("PRECOMBINE_FIELD_OPT_KEY", "ts") //指定提交時間列.option("RECORDKEY_FIELD_OPT_KEY", "uuid") //指定uuid唯一標示列.option("hoodie.table.name", "myDataTable").option("hoodie.datasource.write.partitionpath.field", "hudipart") //分區列.mode(SaveMode.Overwrite).save("/snippet/data/hudi")}}然后,在 master 節點先準備好數據
vi data1輸入如下數據
{'uid':1,'uname':'grey','dt':'2022/09'}{'uid':2,'uname':'tony','dt':'2022/10'}然后創建文件目錄,
hdfs dfs -mkdir /mydata/把 data1 放入目錄下
hdfs dfs -put data1 /mydata/訪問:http://192.168.100.130:50070/explorer.html#/mydata
可以查到這個數據

Hudi 數據湖的插入,更新,查詢,分析操作示例

文章插圖
接下來執行插入數據的 scala 代碼 , 執行完畢后,驗證一下
訪問:http://192.168.100.130:50070/explorer.html#/snippet/data/hudi/2022
可以查看到插入的數據

推薦閱讀