我的 Kafka 旅程 - Consumer

kafka采用Consumer消費者Pull主動拉取數據的方式,當Broker無數據時,消費者空轉 。Kafka并不刪除已消費的消息,各自獨立的消費者可消費同一個Broker分區數據 。
消費流程1、消費者發起網絡消費請求

  1. # 每批次最小抓取設置(推薦1字節)

  2. fetch.min.bytes

  3. # 每批次最大抓取大小設置(推薦500ms)

  4. fetch.max.bytes

  5. # 未達到大小的超時設置(推薦50M)

  6. fetch.max.wait.ms

2、拉取數據到內存消費隊列中
  1. # 單次拉取最大消息條數設置(推薦500條)

  2. max.poll.records

2.1、反序列化處理(對應了Producer端的序列化動作)
2.2、攔截器處理(如:匯總統計記錄)
3、數據的后續處理保存等的消費端動作 。
offset當一個消費者掛掉或重啟后,是否還記得消費到的位置了?offset解決了此問題 。

對于每一個topic,都會維持一個分區日志 , 分區中的每一個記錄都會分配一個Id來表示順序,稱之為offset,offset用來唯一的標識分區中每條記錄 , 并將每次的消費位置提交到topic中 。消費者恢復啟動后接著按序消費數據 。
自動提交
  1. # 開啟自動提交

  2. enable.auto.commit = true

  3. # 每次提交間隔(推薦5秒)

  4. auto.commit.interval.ms = 5000

手動提交先關閉自動提交后,在Consumer客戶端的代碼中 , 通過調用方法函數提交 , 通常的方法名:
  1. # 同步提交,等提交完成才可下一次再消費

  2. .CommitSync

  3. # 異步提交,可直接進行下一個消費,也有可能提交失敗

  4. .CommitAync

指定消費在Consumer客戶端的代碼中,手動指定offset的位置進行消費,關聯到的方法函數名:
  1. # 按指定時間得出offset值

  2. .offsetsForTimes

  3. # 按指定offset值繼續消費

  4. .seek

初始策略
  1. # earliest: 最早消費;無offset時,從頭開始消費 。

  2. # latest: 最新消費;無offset時,從最新的數據開始消費 。

  3. # none: 無offset時 , 引發異常 。

  4. auto.offset.reset = earliest | latest | none

消費現象重復消費:offset未提交成功,下次消費還是舊的offset 。
漏消費:offset提交成功,消費者端后續的數據處理未完成(建議下游步驟事務處理) 。
消費者組為了實現橫向擴展 , 應用程序需要創建一個消費者群組,然后往群組里添加消費者來提高處理效率,群組里的每個消費者只處理一部分消息 。

推薦閱讀