Ignite實戰( 四 )

2.5.7 廣播任務該方法在與計算實例關聯的所有節點broadcast()上執行任務 。
// Limit broadcast to remote nodes only.IgniteCompute compute = ignite.compute(ignite.cluster().forRemotes());// Print out hello message on remote nodes in the cluster group.compute.broadcast(() -> System.out.println("Hello Node: " + ignite.cluster().localNode().id()));2.5.8 異步執行前幾節中描述的所有方法都有異步對應物:

  • callAsync(…?)
  • runAsync(…?)
  • applyAsync(…?)
  • broadcastAsync(…?)
異步方法返回一個IgniteFuture表示操作結果的值 。在以下示例中,異步執行一組可調用任務 。
IgniteCompute compute = ignite.compute();Collection<IgniteCallable<Integer>> calls = new ArrayList<>();// Iterate through all words in the sentence and create callable jobs.for (String word : "Count characters using a callable".split(" "))calls.add(word::length);IgniteFuture<Collection<Integer>> future = compute.callAsync(calls);future.listen(fut -> {// Total number of characters.int total = fut.get().stream().mapToInt(Integer::intValue).sum();System.out.println("Total number of characters: " + total);});2.5.9 執行超時任務您可以設置任務執行的超時時間 。如果任務沒有在給定的時間范圍內完成,它會被停止并取消該任務產生的所有作業 。
要執行超時任務,請使用withTimeout(…?)計算接口的方法 。該方法返回一個計算接口,該接口以時間限制的方式執行給它的第一個任務 。后續任務沒有超時:您需要調用withTimeout(…?)每個應該有超時的任務 。
IgniteCompute compute = ignite.compute();compute.withTimeout(300_000).run(() -> {// your computation// ...});2.5.10 在本地節點上的作業之間共享狀態在一個節點上執行的不同計算作業之間共享狀態通常很有用 。為此 , 每個節點上都有一個共享的并發本地映射 。
IgniteCluster cluster = ignite.cluster();ConcurrentMap<String, Integer> nodeLocalMap = cluster.nodeLocalMap();節點局部值類似于線程局部變量,因為這些值不分布并且僅保留在本地節點上 。節點本地數據可用于在計算作業之間共享狀態 。它也可以被部署的服務使用 。
在以下示例中,作業每次在某個節點上執行時都會增加一個節點本地計數器 。結果,每個節點上的節點本地計數器告訴我們作業在該節點上執行了多少次 。
IgniteCallable<Long> job = new IgniteCallable<Long>() {@IgniteInstanceResourceprivate Ignite ignite;@Overridepublic Long call() {// Get a reference to node local.ConcurrentMap<String, AtomicLong> nodeLocalMap = ignite.cluster().nodeLocalMap();AtomicLong cntr = nodeLocalMap.get("counter");if (cntr == null) {AtomicLong old = nodeLocalMap.putIfAbsent("counter", cntr = new AtomicLong());if (old != null)cntr = old;}return cntr.incrementAndGet();}};2.5.11 從計算任務訪問數據如果您的計算任務需要訪問存儲在緩存中的數據,您可以通過以下實例來完成Ignite:
public class MyCallableTask implements IgniteCallable<Integer> {@IgniteInstanceResourceprivate Ignite ignite;@Overridepublic Integer call() throws Exception {IgniteCache<Long, Person> cache = ignite.cache("person");// Get the data you needPerson person = cache.get(1L);// do with the data what you need to doreturn 1;}}請注意,上面顯示的示例可能不是最有效的方法 。原因是key對應的person對象1可能位于與執行任務的節點不同的節點上 。在這種情況下,對象是通過網絡獲取的 。這可以通過將任務與數據放在一起來避免 。
注意:如果要在IgniteCallable和IgniteRunnable任務中使用鍵和值對象,請確保鍵和值類部署在所有集群節點上 。2.6 SQL查詢與處理Ignite 帶有符合 ANSI-99、水平可擴展和容錯的分布式 SQL 數據庫 。根據用例,通過跨集群節點對數據進行分區或完全復制來提供分布 。
作為 SQL 數據庫,Ignite 支持所有 DML 命令 , 包括 SELECT、UPDATE、INSERT 和 DELETE 查詢,并且還實現了與分布式系統相關的 DDL 命令子集 。
您可以通過連接來自外部工具和應用程序的JDBC或ODBC驅動程序與 Ignite 進行交互,就像與任何其他啟用了 SQL 的存儲一樣 。Java、.NET 和 C++ 開發人員可以利用本機 SQL API 。
在內部 , SQL 表與鍵值緩存具有相同的數據結構 。這意味著您可以更改數據的分區分布并利用親和力托管技術來獲得更好的性能 。
Ignite 的默認 SQL 引擎使用 H2 數據庫來解析和優化查詢并生成執行計劃,但也可以啟用基于 Apache Calcite 的 SQL 引擎來執行查詢 。
2.6.1 分布式查詢針對分區表的查詢以分布式方式執行: