整合營銷服務商

          電腦端+手機端+微信端=數據同步管理

          免費咨詢熱線:

          深入解讀HBase2.0新功能之Assignment

          深入解讀HBase2.0新功能之AssignmentManagerV2

          AssignmentManager模塊是HBase中一個非常重要的模塊,Assignment Manager(之后簡稱AM)負責了HBase中所有region的Assign,UnAssign,以及split/merge過程中region狀態變化的管理等等。在HBase-0.90之前,AM的狀態全部存在內存中,自從HBASE-2485之后,AM把狀態持久化到了Zookeeper上。在此基礎上,社區對AM又修復了大量的bug和優化(見此文章),最終形成了用在HBase-1.x版本上的這個AM。

          老Assignment Mananger的問題

          相信深度使用過HBase的人一般都會被Region RIT的狀態困擾過,長時間的region in transition狀態簡直令人抓狂。

          除了一些確實是由于Region無法被RegionServer open的case,大部分的RIT,都是AM本身的問題引起的??偨Y一下HBase-1.x版本中AM的問題,主要有以下幾點:

          region狀態變化復雜

          這張圖很好地展示了region在open過程中參與的組件和狀態變化。可以看到,多達7個組件會參與region狀態的變化。并且在region open的過程中多達20多個步驟!越復雜的邏輯意味著越容易出bug

          region狀態多處緩存

          region的狀態會緩存在多個地方,Master中RegionStates會保存Region的狀態,Meta表中會保存region的狀態,Zookeeper上也會保存region的狀態,要保持這三者完全同步是一件很困難的事情。同時,Master和RegionServer都會修改Meta表的狀態和Zookeeper的狀態,非常容易導致狀態的混亂。如果出現不一致,到底以哪里的狀態為準?每一個region的transition流程都是各自為政,各自有各自的處理方法

          重度依賴Zookeeper

          在老的AM中,region狀態的通知完全通過Zookeeper。比如說RegionServer打開了一個region,它會在Zookeeper把這個region的RIT節點改成OPEN狀態,而不去直接通知Master。Master會在Zookeeper上watch這個RIT節點,通過Zookeeper的通知機制來通知Master這個region已經發生變化。Master再根據Zookeeper上讀取出來的新狀態進行一定的操作。嚴重依賴Zookeeper的通知機制導致了region的上線/下線的速度存在了一定的瓶頸。特別是在region比較多的時候,Zookeeper的通知會出現嚴重的滯后現象。

          正是這些問題的存在,導致AM的問題頻發。我本人就fix過多個AM導致region無法open的issue。比如說這三個相互關聯的“連環”case:HBASE-17264,HBASE-17265,HBASE-17275。

          Assignment Mananger V2

          面對這些問題的存在,社區也在不斷嘗試解決這些問題,特別是當region的規模達到100w級別的時候,AM成為了一個嚴重的瓶頸。HBASE-11059中提出的ZK-less Region Assignment就是一個非常好的改良設計。在這個設計中,AM完全擺脫了Zookeeper的限制,在測試中,zk-less的assign比zk的assign快了一個數量級!

          但是在這個設計中,它摒棄了Zookeeper這個持久化的存儲,一些region transition過程中的中間狀態無法被保存。因此,在此基礎上,社區又更進了一步,提出了Assignment Mananger V2在這個方案。在這個方案中,仍然摒棄了Zookeeper參與Assignment的整個過程。但是,它引入了ProcedureV2這個持久化存儲來保存Region transition中的各個狀態,保證在master重啟時,之前的assing/unassign,split等任務能夠從中斷點重新執行。具體的來說,AMv2方案中,主要的改進有以下幾點:

          Procedure V2

          關于Procedure V2,我之后將獨立寫文章介紹。這里,我只大概介紹下ProcedureV2和引入它所帶來的價值。

          我們知道,Master中會有許多復雜的管理工作,比如說建表,region的transition。這些工作往往涉及到非常多的步驟,如果master在做中間某個步驟的時候宕機了,這個任務就會永遠停留在了中間狀態(RIT因為之前有Zookeeper做持久化因此會繼續從某個狀態開始執行)。比如說在enable/disable table時,如果master宕機了,可能表就停留在了enabling/disabling狀態。需要一些外部的手段進行恢復。那么從本質上來說,ProcedureV2提供了一個持久化的手段(通過ProcedureWAL,一種類似RegionServer中WAL的日志持久化到HDFS上),使master在宕機后能夠繼續之前未完成的任務繼續完成。同時,ProcedureV2提供了非常豐富的狀態轉換并支持回滾執行,即使執行到某一個步驟出錯,master也可以按照用戶的邏輯對之前的步驟進行回滾。比如建表到某一個步驟失敗了,而之前已經在HDFS中創建了一些新region的文件夾,那么ProcedureV2在rollback的時候,可以把這些殘留刪除掉。

          Procedure中提供了兩種Procedure框架,順序執行和狀態機,同時支持在執行過程中插入subProcedure,從而能夠支持非常豐富的執行流程。在AMv2中,所有的Assign,UnAssign,TableCreate等等流程,都是基于Procedure實現的。

          去除Zookeeper依賴

          有了Procedure V2之后,所有的狀態都可以持久化在Procedure中,Procedure中每次的狀態變化,都能夠持久化到ProcedureWAL中,因此數據不會丟失,宕機后也能恢復。同時,AMv2中region的狀態扭轉(OPENING,OPEN,CLOSING,CLOSE等)都會由Master記錄在Meta表中,不需要Zookeeper做持久化。再者,之前的AM使用的Zookeeper watch機制通知master region狀態的改變,而現在每當RegionServer Open或者close一個region后,都會直接發送RPC給master匯報,因此也不需要Zookeeper來做狀態的通知。綜合以上原因,Zookeeper已經在AMv2中沒有了存在的必要。

          減少狀態沖突的可能性

          之前我說過,在之前的AM中,region的狀態會同時存在于meta表,Zookeeper和master的內存狀態。同時Master和regionserver都會去修改Zookeeper和meta表,維護狀態統一的代價非常高,非常容易出bug。而在AMv2中,只有master才能去修改meta表。并在region整個transition中做為一個“權威”存在,如果regionserver匯報上來的region狀態與master看到的不一致,則master會命令RegionServer abort。Region的狀態,都以master內存中保存的RegionStates為準。

          除了上述這些優化,AMv2中還有許多其他的優化。比如說AMv2依賴Procedure V2提供的一套locking機制,保證了對于一個實體,如一張表,一個region或者一個RegionServer同一時刻只有一個Procedure在執行。同時,在需要往RegionServer發送命令,如發送open,close等命令時,AMv2實現了一個RemoteProcedureDispatcher來對這些請求做batch,批量把對應服務器的指令一起發送等等。在代碼結構上,之前處理相應region狀態的代碼散落在AssignmentManager這個類的各個地方,而在AMv2中,每個對應的操作,都有對應的Procedure實現,如AssignProcedure,DisableTableProcedure,SplitTableRegionProcedure等等。這樣下來,使AssignmentManager這個之前雜亂的類變的清晰簡單,代碼量從之前的4000多行減到了2000行左右。

          AssignProcedure

          AMv2中有太多的Procedure對應各種不同的transition,這里不去詳細介紹每個Procedure的操作。我將以AssignProcedure為例,講解一下在AMv2中,一個region是怎么assign給一個RegionServer,并在對應的RS上Open的。

          AssignProcedure是一個基于Procedure實現的狀態機。它擁有3個狀態:

          • REGION_TRANSITION_QUEUE: Assign開始時的狀態。在這個狀態時,Procedure會對region狀態做一些改變和存儲,并丟到AssignmentManager的assign queue中。對于單獨region的assign,AssignmentManager會把他們group起來,再通過LoadBalancer分配相應的服務器。當這一步驟完成后,Procedure會把自己標為REGION_TRANSITION_DISPATCH,然后看是否已經分配服務器,如果還沒有被分配服務器的話,則會停止繼續執行,等待被喚醒。
          • REGION_TRANSITION_DISPATCH: 當AssignmentManager為這個region分配好服務器時,Procedure就會被喚醒?;蛘逷rocedure在執行完REGION_TRANSITION_QUEUE狀態時master宕機,Procedure被恢復后,也會進入此步驟執行。所以在此步驟下,Procedure會先檢查一下是否分配好了服務器,如果沒有,則把狀態轉移回REGION_TRANSITION_QUEUE,否則的話,則把這個region交給RemoteProcedureDispatcher,發送RPC給對應的RegionServer來open這個region。同樣的,RemoteProcedureDispatcher也會對相應的指令做一個batch,批量把一批region open的命令發送給某一臺服務器。當命令發送完成之后,Procedure又會進入休眠狀態,等待RegionServer成功OPen這個region后,喚醒這個Procedure
          • REGION_TRANSITION_FINISH: 當有RegionServer匯報了此region被打開后,會把Procedure的狀態置為此狀態,并喚醒Procedure執行。此時,AssignProcedure會做一些狀態改變的工作,并修改meta表,把meta表中這個region的位置指向對應的RegionServer。至此,region assign的工作全部完成。

          AMv2中提供了一個Web頁面(Master頁面中的‘Procedures&Locks’鏈接)來展示當前正在執行的Procedure和持有的鎖。

          其實通過log,我們也可以看到Assign的整個過程。

          假設,一臺server宕機,此時master會產生一個ServerCrashProcedure 來處理,在這個Procedure中,會做一系列的工作,比如WAL的restore。當這些前置的工作做完后,就會開始assign之前在宕掉服務器上的region,比如56f985a727afe80a184dac75fbf6860c。此時會在ServerCrashProcedure產生一系列的子任務:

          2017-05-23 12:04:24,175 INFO [ProcExecWrkr-30] procedure2.ProcedureExecutor: Initialized subprocedures=[{pid=1178, ppid=1176, state=RUNNABLE:REGION_TRANSITION_QUEUE; AssignProcedure table=IntegrationTestBigLinkedList, region=bfd57f0b72fd3ca77e9d3c5e3ae48d76, target=ve0540.halxg.cloudera.com,16020,1495525111232}, {pid=1179, ppid=1176, state=RUNNABLE:REGION_TRANSITION_QUEUE; AssignProcedure table=IntegrationTestBigLinkedList, region=56f985a727afe80a184dac75fbf6860c, target=ve0540.halxg.cloudera.com,16020,1495525111232}]
          

          可以看到,ServerCrashProcedure的pid(Procedure ID)為1178,在此Procedure中產生的assign 56f985a727afe80a184dac75fbf6860c這個region的子Procedure的pid為1179,同時他的ppid(Parent Procedure ID)為1178。在AMv2中,通過追蹤這些ID,就非常容易把一個region的transition整個過程全部串起來。

          接下來,pid=1170這個Procedure開始執行,首先執行的是REGION_TRANSITION_QUEUE狀態的邏輯,然后進入睡眠狀態。

          2017-05-23 12:04:24,241 INFO [ProcExecWrkr-30] assignment.AssignProcedure: Start pid=1179, ppid=1176, state=RUNNABLE:REGION_TRANSITION_QUEUE; AssignProcedure table=IntegrationTestBigLinkedList, region=56f985a727afe80a184dac75fbf6860c, target=ve0540.halxg.cloudera.com,16020,1495525111232; rit=OFFLINE, location=ve0540.halxg.cloudera.com,16020,1495525111232; forceNewPlan=false, retain=false
          

          當target server被指定時,Procedure進入REGION_TRANSITION_DISPATCH狀態,dispatch了region open的請求,同時把meta表中region的狀態改成了OPENING,然后再次進入休眠狀態

          2017-05-23 12:04:24,494 INFO [ProcExecWrkr-38] assignment.RegionStateStore: pid=1179 updating hbase:meta row=IntegrationTestBigLinkedList,H\xE3@\x8D\x964\x9D\xDF\x8F@9'\x0F\xC8\xCC\xC2,1495566261066.56f985a727afe80a184dac75fbf6860c., regionState=OPENING, regionLocation=ve0540.halxg.cloudera.com,16020,1495525111232 2017-05-23 12:04:24,498 INFO [ProcExecWrkr-38] assignment.RegionTransitionProcedure: Dispatch pid=1179, ppid=1176, state=RUNNABLE:REGION_TRANSITION_DISPATCH; AssignProcedure table=IntegrationTestBigLinkedList, region=56f985a727afe80a184dac75fbf6860c, target=ve0540.halxg.cloudera.com,16020,1495525111232; rit=OPENING, location=ve0540.halxg.cloudera.com,16020,1495525111232
          

          最后,當RegionServer打開了這個region后,會發RPC通知master,那么在通知過程中,這個Procedure再次被喚醒,開始執行REGION_TRANSITION_FINISH的邏輯,最后更新meta表,把這個region置為打開狀態。

          2017-05-23 12:04:26,643 DEBUG [RpcServer.default.FPBQ.Fifo.handler=46,queue=1,port=16000] assignment.RegionTransitionProcedure: Received report OPENED seqId=11984985, pid=1179, ppid=1176, state=RUNNABLE:REGION_TRANSITION_DISPATCH; AssignProcedure table=IntegrationTestBigLinkedList, region=56f985a727afe80a184dac75fbf6860c, target=ve0540.halxg.cloudera.com,16020,1495525111232; rit=OPENING, location=ve0540.halxg.cloudera.com,16020,1495525111232 2017-05-23 12:04:26,643 INFO [ProcExecWrkr-9] assignment.RegionStateStore: pid=1179 updating hbase:meta row=IntegrationTestBigLinkedList,H\xE3@\x8D\x964\x9D\xDF\x8F@9'\x0F\xC8\xCC\xC2,1495566261066.56f985a727afe80a184dac75fbf6860c., regionState=OPEN, openSeqNum=11984985, regionLocation=ve0540.halxg.cloudera.com,16020,1495525111232
          2017-05-23 12:04:26,836 INFO [ProcExecWrkr-9] procedure2.ProcedureExecutor: Finish suprocedure pid=1179, ppid=1176, state=SUCCESS; AssignProcedure table=IntegrationTestBigLinkedList, region=56f985a727afe80a184dac75fbf6860c, target=ve0540.halxg.cloudera.com,16020,1495525111232
          

          一路看下來,由于整個region assign的過程都是在Procedure中執行,整個過程清晰明了,非常容易追述,也沒有了Zookeeper一些event事件的干擾。

          總結

          Assignment Mananger V2依賴Procedure V2實現了一套清晰明了的region transition機制。去除了Zookeeper依賴,減少了region狀態沖突的可能性。整體上來看,代碼的可讀性更強,出了問題也更好查錯。對于解決之前AM中的一系列“頑疾”,AMv2做了很好的嘗試,也是一個非常好的方向。

          AMv2之所以能保持簡潔高效的一個重要原因就是重度依賴了Procedure V2,把一些復雜的邏輯都轉移到了Procedure V2中。但是這樣做的問題是:一旦ProcedureWAL出現了損壞,或者Procedure本身存在bug,這個后果就是災難性的。事實上在我們的測試環境中,就出現過PRocedureWAL損壞導致region RIT的情況。

          另外需要注意的是,截止目前為止,HBCK仍然無法支持AMv2,這會導致一旦出現問題,修復起來會比較困難。

          當然,新的事務還是要有一段成熟期,相信經過一段時間的bug修復和完善后,我相信AMv2一定會完美解決之前的一些問題,給HBase的運維上帶來一些不同的體驗。愿世界不再被HBase的RIT困擾 :-)。

          云端使用

          阿里HBase目前已經在阿里云提供商業化服務,任何有需求的用戶都可以在阿里云端使用深入改進的、一站式的HBase服務。云HBase版本與自建HBase相比在運維、可靠性、性能、穩定性、安全、成本等方面均有很多的改進。

          同時,云HBase2.0 在2018年6月6日正式發布,點擊了解更多!

          文作者為京東算法服務部的張穎和段學浩,并由 Apache Hive PMC,阿里巴巴技術專家李銳幫忙校對。主要內容為:

          1.背景

          2.Flink SQL 的優化

          3.總結

          一、背景

          目前,京東搜索推薦的數據處理流程如上圖所示??梢钥吹綄崟r和離線是分開的,離線數據處理大部分用的是 Hive / Spark,實時數據處理則大部分用 Flink / Storm。

          這就造成了以下現象:在一個業務引擎里,用戶需要維護兩套環境、兩套代碼,許多共性不能復用,數據的質量和一致性很難得到保障。且因為流批底層數據模型不一致,導致需要做大量的拼湊邏輯;甚至為了數據一致性,需要做大量的同比、環比、二次加工等數據對比,效率極差,并且非常容易出錯。

          而支持批流一體的 Flink SQL 可以很大程度上解決這個痛點,因此我們決定引入 Flink 來解決這種問題。

          在大多數作業,特別是 Flink 作業中,執行效率的優化一直是 Flink 任務優化的關鍵,在京東每天數據增量 PB 級情況下,作業的優化顯得尤為重要。

          寫過一些 SQL 作業的同學肯定都知道,對于 Flink SQL 作業,在一些情況下會造成同一個 UDF 被反復調用的情況,這對一些消耗資源的任務非常不友好;此外,影響執行效率大致可以從 shuffle、join、failover 策略等方面考慮;另外,Flink 任務調試的過程也非常復雜,對于一些線上機器隔離的公司來說尤甚。

          為此,我們實現了內嵌式的 Derby 來作為 Hive 的元數據存儲數據庫 (allowEmbedded);在任務恢復方面,批式作業沒有 checkpoint 機制來實現failover,但是 Flink 特有的 region 策略可以使批式作業快速恢復;此外,本文還介紹了對象重用等相關優化措施。

          二、 Flink SQL 的優化

          1. UDF 重用

          在 Flink SQL 任務里會出現以下這種情況:如果相同的 UDF 既出現在 LogicalProject 中,又出現在 Where 條件中,那么 UDF 會進行多次調用 (見https://issues.apache.org/jira/browse/FLINK-20887)。但是如果該 UDF 非常耗 CPU 或者內存,這種多余的計算會非常影響性能,為此我們希望能把 UDF 的結果緩存起來下次直接使用。在設計的時候需要考慮:(非常重要:請一定保證 LogicalProject 和 where 條件的 subtask chain 到一起)

          • 一個 taskmanager 里面可能會有多個 subtask,所以這個 cache 要么是 thread (THREAD LOCAL) 級別要么是 tm 級別;
          • 為了防止出現一些情況導致清理 cache 的邏輯走不到,一定要在 close 方法里將 cache 清掉;
          • 為了防止內存無限增大,選取的 cache 最好可以主動控制 size;至于 “超時時間”,建議可以配置一下,但是最好不要小于 UDF 先后調用的時間;
          • 上文有提到過,一個 tm 里面可能會有多個 subtask,相當于 tm 里面是個多線程的環境。首先我們的 cache 需要是線程安全的,然后可根據業務判斷需不需要鎖。

          根據以上考慮,我們用 guava cache 將 UDF 的結果緩存起來,之后調用的時候直接去cache 里面拿數據,最大可能降低任務的消耗。下面是一個簡單的使用(同時設置了最大使用 size、超時時間,但是沒有寫鎖):

          public class RandomFunction extends ScalarFunction {
              private static Cache<String, Integer> cache=CacheBuilder.newBuilder()
                      .maximumSize(2)
                      .expireAfterWrite(3, TimeUnit.SECONDS)
                      .build();
          
              public int eval(String pvid) {
                  profileLog.error("RandomFunction invoked:" + atomicInteger.incrementAndGet());
                  Integer result=cache.getIfPresent(pvid);
                  if (null==result) {
                      int tmp=(int)(Math.random() * 1000);
                      cache.put("pvid", tmp);
                      return tmp;
                  }
                  return result;
              }
              @Override
              public void close() throws Exception {
                  super.close();
                  cache.cleanUp();
              }
          }

          2. 單元測試

          大家可能會好奇為什么會把單元測試也放到優化里面,大家都知道 Flink 任務調試過程非常復雜,對于一些線上機器隔離的公司來說尤甚。京東的本地環境是沒有辦法訪問任務服務器的,因此在初始階段調試任務,我們耗費了很多時間用來上傳 jar 包、查看日志等行為。

          為了降低任務的調試時間、增加代碼開發人員的開發效率,實現了內嵌式的 Derby 來作為 Hive 的元數據存儲數據庫 (allowEmbedded),這算是一種優化開發時間的方法。具體思路如下:

          首先創建 Hive Conf:

          public static HiveConf createHiveConf() {
              ClassLoader classLoader=new HiveOperatorTest().getClass().getClassLoader();
              HiveConf.setHiveSiteLocation(classLoader.getResource(HIVE_SITE_XML));
          
              try {
                  TEMPORARY_FOLDER.create();
                  String warehouseDir=TEMPORARY_FOLDER.newFolder().getAbsolutePath() + "/metastore_db";
                  String warehouseUri=String.format(HIVE_WAREHOUSE_URI_FORMAT, warehouseDir);
          
                  HiveConf hiveConf=new HiveConf();
                  hiveConf.setVar(
                          HiveConf.ConfVars.METASTOREWAREHOUSE,
                          TEMPORARY_FOLDER.newFolder("hive_warehouse").getAbsolutePath());
                  hiveConf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, warehouseUri);
          
                  hiveConf.set("datanucleus.connectionPoolingType", "None");
                  hiveConf.set("hive.metastore.schema.verification", "false");
                  hiveConf.set("datanucleus.schema.autoCreateTables", "true");
                  return hiveConf;
              } catch (IOException e) {
                  throw new CatalogException("Failed to create test HiveConf to HiveCatalog.", e);
              }
          }

          接下來創建 Hive Catalog:(利用反射的方式調用 embedded 的接口)

          public static void createCatalog() throws Exception{
              Class clazz=HiveCatalog.class;
              Constructor c1=clazz.getDeclaredConstructor(new Class[]{String.class, String.class, HiveConf.class, String.class, boolean.class});
              c1.setAccessible(true);
              hiveCatalog=(HiveCatalog)c1.newInstance(new Object[]{"test-catalog", null, createHiveConf(), "2.3.4", true});
              hiveCatalog.open();
          }

          創建 tableEnvironment:(同官網)

          EnvironmentSettings settings=EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
          TableEnvironment tableEnv=TableEnvironment.create(settings);
          TableConfig tableConfig=tableEnv.getConfig();
          Configuration configuration=new Configuration();
          configuration.setInteger("table.exec.resource.default-parallelism", 1);
          tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
          tableEnv.useCatalog(hiveCatalog.getName());

          最后關閉 Hive Catalog:

          public static void closeCatalog() {
              if (hiveCatalog !=null) {
                  hiveCatalog.close();
              }
          }

          此外,對于單元測試,構建合適的數據集也是一個非常大的功能,我們實現了 CollectionTableFactory,允許自己構建合適的數據集,使用方法如下:

          CollectionTableFactory.reset();
          CollectionTableFactory.initData(Arrays.asList(Row.of("this is a test"), Row.of("zhangying480"), Row.of("just for test"), Row.of("a test case")));
          StringBuilder sbFilesSource=new StringBuilder();
          sbFilesSource.append("CREATE temporary TABLE db1.`search_realtime_table_dump_p13`(" + "  `pvid` string) with ('connector.type'='COLLECTION','is-bounded'='true')");
          tableEnv.executeSql(sbFilesSource.toString());

          3. join 方式的選擇

          傳統的離線 Batch SQL (面向有界數據集的 SQL) 有三種基礎的實現方式,分別是 Nested-loop Join、Sort-Merge Join 和 Hash Join。


          效率

          空間

          備注

          Nested-loop Join

          占用大


          Sort-Merge Join

          有sort merge開銷

          占用小

          有序數據集的一種優化措施

          Hash Join

          占用大

          適合大小表

          • Nested-loop Join 最為簡單直接,將兩個數據集加載到內存,并用內嵌遍歷的方式來逐個比較兩個數據集內的元素是否符合 Join 條件。Nested-loop Join 的時間效率以及空間效率都是最低的,可以使用:table.exec.disabled-operators:NestedLoopJoin 來禁用。以下兩張圖片是禁用前和禁用后的效果 (如果你的禁用沒有生效,先看一下是不是 Equi-Join):
          • Sort-Merge Join 分為 Sort 和 Merge 兩個階段:首先將兩個數據集進行分別排序,然后再對兩個有序數據集分別進行遍歷和匹配,類似于歸并排序的合并。(Sort-Merge Join 要求對兩個數據集進行排序,但是如果兩個輸入是有序的數據集,則可以作為一種優化方案)。
          • Hash Join 同樣分為兩個階段:首先將一個數據集轉換為 Hash Table,然后遍歷另外一個數據集元素并與 Hash Table 內的元素進行匹配。第一階段和第一個數據集分別稱為 build 階段和 build table;第二個階段和第二個數據集分別稱為 probe 階段和 probe table。Hash Join 效率較高但是對空間要求較大,通常是作為 Join 其中一個表為適合放入內存的小表的情況下的優化方案 (并不是不允許溢寫磁盤)。

          注意:Sort-Merge Join 和 Hash Join 只適用于 Equi-Join ( Join 條件均使用等于作為比較算子)。

          Flink 在 join 之上又做了一些細分,具體包括:


          特點

          使用

          Repartition-Repartition strategy

          對數據集分別進行分區和shuffle,如果數據集大的時候效率極差

          兩個數據集相差不大

          Broadcast-Forward strategy

          將小表的數據全部發送到大表數據的機器上

          兩個數據集有較大的差距

          • Repartition-Repartition strategy:Join 的兩個數據集分別對它們的 key 使用相同的分區函數進行分區,并經過網絡發送數據;
          • Broadcast-Forward strategy:大的數據集不做處理,另一個比較小的數據集全部復制到集群中一部分數據的機器上。

          眾所周知,batch 的 shuffle 非常耗時間。

          • 如果兩個數據集有較大差距,建議采用 Broadcast-Forward strategy;
          • 如果兩個數據集差不多,建議采用 Repartition-Repartition strategy。

          可以通過:table.optimizer.join.broadcast-threshold 來設置采用 broadcast 的 table 大小,如果設置為 “-1”,表示禁用 broadcast。

          下圖為禁用前后的效果:

          4. multiple input

          在 Flink SQL 任務里,降低 shuffle 可以有效的提高 SQL 任務的吞吐量,在實際的業務場景中經常遇到這樣的情況:上游產出的數據已經滿足了數據分布要求 (如連續多個 join 算子,其中 key 是相同的),此時 Flink 的 forward shuffle 是冗余的 shuffle,我們希望將這些算子 chain 到一起。Flink 1.12 引入了 mutiple input 的特性,可以消除大部分沒必要的 forward shuffle,把 source 的算子 chain 到一起。

          table.optimizer.multiple-input-enabled:true

          下圖為開了 multiple input 和沒有開的拓撲圖 ( operator chain 功能已經打開):?

          5. 對象重用

          上下游 operator 之間會經過序列化 / 反序列化 / 復制階段來進行數據傳輸,這種行為非常影響 Flink SQL 程序的性能,可以通過啟用對象重用來提高性能。但是這在 DataStream 里面非常危險,因為可能會發生以下情況:在下一個算子中修改對象意外影響了上面算子的對象。

          但是 Flink 的 Table / SQL API 中是非常安全的,可以通過如下方式來啟用:

          StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
          env.getConfig().enableObjectReuse();

          或者是通過設置:pipeline-object-reuse:true

          為什么啟用了對象重用會有這么大的性能提升?在 Blink planner 中,同一任務的兩個算子之間的數據交換最終將調用 BinaryString#copy,查看實現代碼,可以發現 BinaryString#copy 需要復制底層 MemorySegment 的字節,通過啟用對象重用來避免復制,可以有效提升效率。

          下圖為沒有開啟對象重用時相應的火焰圖:

          6. SQL 任務的 failover 策略

          batch 任務模式下 checkpoint 以及其相關的特性全部都不可用,因此針對實時任務的基于 checkpoint 的 failover 策略是不能應用在批任務上面的,但是 batch 任務允許 Task 之間通過 Blocking Shuffle 進行通信,當一個 Task 因為任務未知的原因失敗之后,由于 Blocking Shuffle 中存儲了這個 Task 所需要的全部數據,所以只需要重啟這個 Task 以及通過 Pipeline Shuffle 與其相連的全部下游任務即可:

          jobmanager.execution.failover-strategy:region (已經 finish 的 operator 可直接恢復)

          table.exec.shuffle-mode:ALL_EDGES_BLOCKING (shuffle 策略)。

          7. shuffle

          Flink 里的 shuffle 分為 pipeline shuffle 和 blocking shuffle。

          • pipeline shuffle 性能好,但是對資源的要求高,而且容錯比較差 (會將該 operator 分到前面的一個 region 里面,對于 batch 任務來說,如果這個算子出問題,將從上一個 region 恢復);
          • blocking shuffle 就是傳統的 batch shuffle,會將數據落盤,這種 shuffle 的容錯好,但是會產生大量的磁盤、網絡 io (如果為了省心的話,建議用 blocking suffle)。blocking shuffle 又分為 hash shuffle 和 sort shuffle,如果你的磁盤是 ssd 并且并發不太大的話,可以選擇使用 hash shuffle,這種 shuffle 方式產生的文件多、隨機讀多,對磁盤 io 影響較大;如果你是 sata 并且并發比較大,可以選擇用 sort-merge shuffle,這種 shuffle 產生的數據少,順序讀,不會產生大量的磁盤 io,不過開銷會更大一些 (sort merge)。

          相應的控制參數:

          table.exec.shuffle-mode,該參數有多個參數,默認是 ALL_EDGES_BLOCKING,表示所有的邊都會用 blocking shuffle,不過大家可以試一下 POINTWISE_EDGES_PIPELINED,表示 forward 和 rescale edges 會自動開始 pipeline 模式。

          taskmanager.network.sort-shuffle.min-parallelism ,將這個參數設置為小于你的并行度,就可以開啟 sort-merge shuffle;這個參數的設置需要考慮一些其他的情況,具體的可以按照官網設置。

          三、總結

          本文著重從 shuffle、join 方式的選擇、對象重用、UDF 重用等方面介紹了京東在 Flink SQL 任務方面做的優化措施。另外,感謝京東實時計算研發部付海濤等全部同事的支持與幫助。

          原文鏈接:http://click.aliyun.com/m/1000288770/

          本文為阿里云原創內容,未經允許不得轉載。

          .創建主鍵

          create table ESC_STOTE.TF_B_AIR_CONFIG(

          TYPE_ID VARCHAR2(20) not null,

          PROVINCE_CODE VARCHAR2(4) not null,

          PROVINCE_TYPE VARCHAR2(2) not null,

          LIMIT_NUM VARCHAR2(2) not null,

          EFFECTIVE_FALG VARCHAR2 default '1',

          UPDATE_TIME DATE default sysdate,

          constraint TF_B_AIR_CONFIG_PK primary key(TYPE_ID)--單列主鍵

          )

          create table ECS_STORE.TF_B_AIR_CONFIG(

          TYPE_ID VARCHAR2(20) not null,

          PROVINCE_CODE VARCHAR2(4) not null,

          PARAMETER_TYPE VARCHAR2(2) not null,

          LIMIT_NUM VARCHAR2(4) not null,

          EFFECTIVE_FALG VARCHAR2(2) default '1',

          UPDATE_TIME DATE default sysdate,

          constraint TF_B_AIR_CONFIG_PK primary key(TYPE_ID , PROVINCE_CODE)--復合主鍵

          )

          第二種:創建表后,再創建約束

          alter table table_name add constraint constraint_name primary key(col1,col2,...coln);

          示例:

          ----創建TF_B_AIR_CONFIG表

          create table ECS_STORE.TF_B_AIR_CONFIG(

          TYPE_ID VARCHAR2(20) not null,

          PROVINCE_CODE VARCHAR2(4) not null,

          PARAMETER_TYPE VARCHAR2(2) not null,

          LIMIT_NUM VARCHAR2(4) not null,

          EFFECTIVE_FALG VARCHAR2(2) default '1',

          UPDATE_TIME DATE default sysdate

          )

          --單列主鍵

          alter table ECS_STORE.TF_B_AIR_CONFIG add constraint TF_B_AIR_CONFIG_PK primary key (TYPE_ID);

          --聯合主鍵

          alter table ECS_STORE.TF_B_AIR_CONFIG add constraint TF_B_AIR_CONFIG_PK primary key (TYPE_ID , PROVINCE_CODE);

          2.禁用主鍵

          alter table table_name disable constraint constraint_name;

          alter table ECS_STORE.TF_B_AIR_CONFIG disable constraint TF_B_AIR_CONFIG_PK ;

          3.啟用主鍵

          alter table table_name enable constraint constraint_name;

          alter table ECS_STORE.TF_B_AIR_CONFIG enable constraint TF_B_AIR_CONFIG_PK ;

          4.刪除主鍵

          alter table table_name drop constraint constraint_name;


          主站蜘蛛池模板: 日美欧韩一区二去三区| 麻豆一区二区三区精品视频| 一区二区三区无码视频免费福利| 亚洲一区在线观看视频| 精品一区二区三区中文| 日韩精品一区二区三区老鸭窝 | 精品一区二区三区在线成人| 动漫精品专区一区二区三区不卡| 日韩毛片基地一区二区三区| 无码国产精品一区二区免费模式| 狠狠色婷婷久久一区二区三区| 国产精品乱码一区二区三| 亚洲一区中文字幕| 性色av一区二区三区夜夜嗨 | 中文字幕乱码亚洲精品一区| 久久国产精品一区免费下载| 国产在线aaa片一区二区99| 国产日韩AV免费无码一区二区三区| 在线免费观看一区二区三区| 无码人妻精品一区二区三区9厂 | 日本福利一区二区| 无码丰满熟妇一区二区| 亚洲变态另类一区二区三区| 日韩一区二区视频在线观看| 国产日韩一区二区三区在线播放| 亚洲av乱码一区二区三区| 人妻激情偷乱视频一区二区三区 | 亚洲一区二区三区高清不卡| 国产一区二区久久久| 日本国产一区二区三区在线观看| 中文字幕一区在线观看视频 | 国产成人久久精品麻豆一区| 国产天堂一区二区综合| 国产综合无码一区二区辣椒| 国偷自产一区二区免费视频| 亚洲高清美女一区二区三区 | 性色AV 一区二区三区| 国产激情一区二区三区成人91| 99精品久久精品一区二区| 亚洲av无码一区二区三区不卡| 精品三级AV无码一区|