Flink WordCount入門

下面通過一個單詞統計的案例,快速上手應用 Flink,進行流處理(Streaming)和批處理(Batch)
單詞統計(批處理)

  1. 引入依賴
<!--flink核心包--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.7.2</version></dependency><!--flink流處理包--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.7.2</version></dependency>
  1. 代碼實現
public class WordCountBatch {public static void main(String[] args) throws Exception {String inputFile= "E:\\data\\word.txt";String outPutFile= "E:\\data\\wordResult.txt";ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();//1. 讀取數據DataSource<String> dataSource = executionEnvironment.readTextFile(inputFile);//2. 對數據進行處理 , 轉成word,1的格式FlatMapOperator<String, Tuple2<String, Integer>> flatMapOperator = dataSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {String[] words = s.split(" ");for (String word : words) {collector.collect(new Tuple2<>(word, 1));}}});//3. 對數據分組 , 相同word的一個組UnsortedGrouping<Tuple2<String, Integer>> tuple2UnsortedGrouping = flatMapOperator.groupBy(0);//4. 對分組后的數據求和AggregateOperator<Tuple2<String, Integer>> sum = tuple2UnsortedGrouping.sum(1);//5. 寫出數據sum.writeAsCsv(outPutFile).setParallelism(1);//執行executionEnvironment.execute("wordcount batch process");}}執行 main 方法 , 得出結果 。我測試的 word.txt 內容如下:
ni hao hiwang mei meiliu meini haowo hen haothis is a good ideaApache Flink【Flink WordCount入門】輸出的文件結果:
a,1mei,3Apache,1Flink,1good,1hen,1hi,1idea,1ni,2is,1liu,1this,1wo,1hao,3wang,1單詞統計(流數據)需求:Socket 模擬實時發送單詞,使用 Flink 實時接收數據,對指定時間窗口內(如 5s)的數據進行聚合統計 , 每隔 1s 匯總計算一次,并且把時間窗口內計算結果打印出來
public class WordCountStream {public static void main(String[] args) throws Exception {int port = 7000;StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> textStream = executionEnvironment.socketTextStream("192.168.56.103", port, "\n");SingleOutputStreamOperator<Tuple2<String, Integer>> tuple2SingleOutputStreamOperator = textStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {String[] split = s.split("\\s");for (String word : split) {collector.collect(Tuple2.of(word, 1));}}});SingleOutputStreamOperator<Tuple2<String, Integer>> word = tuple2SingleOutputStreamOperator.keyBy(0).timeWindow(Time.seconds(5),Time.seconds(1)).sum(1);word.print();executionEnvironment.execute("wordcount stream process");}}運行起來之后,我們就可以開始發送 socket 請求過去 。我們測試可以使用 netcat 工具 。在 linux 上安裝好后,使用下面的命令:
nc -lk 7000然后發送數據即可 。
Flink WordCount入門

文章插圖
Flink WordCount入門

文章插圖

    推薦閱讀