介紹
生產者-消費者
模型用於解耦生產者與消費者,平衡兩者之間的能力不平衡,該模型廣泛應用於各個系統中,Hudi也使用了該模型控制對記錄的處理,即記錄會被生產者生產至隊列中,然後由消費者從隊列中消費,更具體一點,對於更新操作,生產者會將文件中老的記錄放入隊列中等待消費者消費,消費后交由HoodieMergeHandle
處理;對於插入操作,生產者會將新記錄放入隊列中等待消費者消費,消費后交由HandleCreateHandle
處理。
入口
前面的文章中提到過無論是HoodieCopyOnWriteTable#handleUpdate
處理更新時直接生成了一個SparkBoundedInMemoryExecutor
對象,還是HoodieCopyOnWriteTable#handleInsert
處理插入時生成了一個CopyOnWriteLazyInsertIterable
對象,再迭代時調用該對象的CopyOnWriteLazyInsertIterable#computeNext
方法生成SparkBoundedInMemoryExecutor
對象。最後兩者均會調用SparkBoundedInMemoryExecutor#execute
開始記錄的處理,該方法核心代碼如下
public E execute() {
try {
ExecutorCompletionService<Boolean> producerService = startProducers();
Future<E> future = startConsumer();
// Wait for consumer to be done
return future.get();
} catch (Exception e) {
throw new HoodieException(e);
}
}
該方法會啟動所有生產者和單個消費者進行處理。
Hudi定義了BoundedInMemoryQueueProducer
接口表示生產者,其子類實現如下
- FunctionBasedQueueProducer,基於
Function
來生產記錄,在合併日誌log文件和數據parquet文件時使用,以便提供RealTimeView
。 - IteratorBasedQueueProducer,基於迭代器來生產記錄,在插入更新時使用。
定義了BoundedInMemoryQueueConsumer
類表示消費者,其主要子類實現如下
- CopyOnWriteLazyInsertIterable$CopyOnWriteInsertHandler,主要處理
CopyOnWrite
表類型時的插入。- MergeOnReadLazyInsertIterable$MergeOnReadInsertHandler,主要處理
MergeOnRead
- MergeOnReadLazyInsertIterable$MergeOnReadInsertHandler,主要處理
表類型時的插入,其為CopyOnWriteInsertHandler
的子類。
- CopyOnWriteLazyInsertIterable$UpdateHandler,主要處理
CopyOnWrite
表類型時的更新。
整個生產消費相關的類繼承結構非常清晰。
對於生產者的啟動,startProducers
方法核心代碼如下
public ExecutorCompletionService<Boolean> startProducers() {
// Latch to control when and which producer thread will close the queue
final CountDownLatch latch = new CountDownLatch(producers.size());
final ExecutorCompletionService<Boolean> completionService =
new ExecutorCompletionService<Boolean>(executorService);
producers.stream().map(producer -> {
return completionService.submit(() -> {
try {
preExecute();
producer.produce(queue);
} catch (Exception e) {
logger.error("error producing records", e);
queue.markAsFailed(e);
throw e;
} finally {
synchronized (latch) {
latch.countDown();
if (latch.getCount() == 0) {
// Mark production as done so that consumer will be able to exit
queue.close();
}
}
}
return true;
});
}).collect(Collectors.toList());
return completionService;
}
該方法使用CountDownLatch
來協調生產者線程與消費者線程的退出動作,然後調用produce
方法開始生產,對於插入更新時的IteratorBasedQueueProducer
而言,其核心代碼如下
public void produce(BoundedInMemoryQueue<I, ?> queue) throws Exception {
...
while (inputIterator.hasNext()) {
queue.insertRecord(inputIterator.next());
}
...
}
可以看到只要迭代器還有記錄(可能為插入時的新記錄或者更新時的舊記錄),就會往隊列中不斷寫入。
對於消費者的啟動,startConsumer
方法的核心代碼如下
private Future<E> startConsumer() {
return consumer.map(consumer -> {
return executorService.submit(() -> {
...
preExecute();
try {
E result = consumer.consume(queue);
return result;
} catch (Exception e) {
queue.markAsFailed(e);
throw e;
}
});
}).orElse(CompletableFuture.completedFuture(null));
}
消費時會先進行執行前的準備,然後開始消費,其中consume
方法的核心代碼如下
public O consume(BoundedInMemoryQueue<?, I> queue) throws Exception {
Iterator<I> iterator = queue.iterator();
while (iterator.hasNext()) {
consumeOneRecord(iterator.next());
}
// Notifies done
finish();
return getResult();
}
可以看到只要隊列中還有記錄,就可以獲取該記錄,然後調用不同BoundedInMemoryQueueConsumer
子類的consumeOneRecord
進行更新插入處理。
值得一提的是Hudi對隊列進行了流控,生產者不能無限制地將記錄寫入隊列中,隊列緩存的大小由用戶配置,隊列能放入記錄的條數由採樣的記錄大小和隊列緩存大小控制。
在生產時,會調用BoundedInMemoryQueue#insertRecord
將記錄寫入隊列,其核心代碼如下
public void insertRecord(I t) throws Exception {
...
rateLimiter.acquire();
// We are retrieving insert value in the record queueing thread to offload computation
// around schema validation
// and record creation to it.
final O payload = transformFunction.apply(t);
adjustBufferSizeIfNeeded(payload);
queue.put(Option.of(payload));
}
首先獲取一個許可(Semaphore
),未成功獲取會被阻塞直至成功獲取,然後獲取記錄的負載以便調整隊列,然後放入內部隊列(LinkedBlockingQueue
)中,其中adjustBufferSizeIfNeeded
方法的核心代碼如下
private void adjustBufferSizeIfNeeded(final O payload) throws InterruptedException {
if (this.samplingRecordCounter.incrementAndGet() % RECORD_SAMPLING_RATE != 0) {
return;
}
final long recordSizeInBytes = payloadSizeEstimator.sizeEstimate(payload);
final long newAvgRecordSizeInBytes =
Math.max(1, (avgRecordSizeInBytes * numSamples + recordSizeInBytes) / (numSamples + 1));
final int newRateLimit =
(int) Math.min(RECORD_CACHING_LIMIT, Math.max(1, this.memoryLimit / newAvgRecordSizeInBytes));
// If there is any change in number of records to cache then we will either release (if it increased) or acquire
// (if it decreased) to adjust rate limiting to newly computed value.
if (newRateLimit > currentRateLimit) {
rateLimiter.release(newRateLimit - currentRateLimit);
} else if (newRateLimit < currentRateLimit) {
rateLimiter.acquire(currentRateLimit - newRateLimit);
}
currentRateLimit = newRateLimit;
avgRecordSizeInBytes = newAvgRecordSizeInBytes;
numSamples++;
}
首先看是否已經達到採樣頻率,然後計算新的記錄平均大小和限流速率,如果新的限流速率大於當前速率,則可釋放一些許可(供阻塞的生產者獲取後繼續生產),否則需要獲取(回收)一些許可(許可變少後生產速率自然就降低了)。該操作可根據採樣的記錄大小動態調節速率,不至於在記錄負載太大和記錄負載太小時,放入同等個數,從而起到動態調節作用。
在消費時,會調用BoundedInMemoryQueue#readNextRecord
讀取記錄,其核心代碼如下
private Option<O> readNextRecord() {
...
rateLimiter.release();
Option<O> newRecord = Option.empty();
while (expectMoreRecords()) {
try {
throwExceptionIfFailed();
newRecord = queue.poll(RECORD_POLL_INTERVAL_SEC, TimeUnit.SECONDS);
if (newRecord != null) {
break;
}
} catch (InterruptedException e) {
throw new HoodieException(e);
}
}
...
if (newRecord != null && newRecord.isPresent()) {
return newRecord;
} else {
// We are done reading all the records from internal iterator.
this.isReadDone.set(true);
return Option.empty();
}
}
可以看到首先會釋放一個許可,然後判斷是否還可以讀取記錄(還在生產或者停止生產但隊列不為空都可讀取),然後從內部隊列獲取記錄或返回。
上述便是生產者-消費者
在Hudi中應用的分析。
總結
Hudi採用了生產者-消費者
模型來控制記錄的處理,與傳統多生產者-多消費者
模型不同的是,Hudi現在只支持多生產者-單消費者
模型,單消費者意味着Hudi暫時不支持文件的併發寫入。而對於生產消費的隊列的實現,Hudi並未僅僅只是基於LinkedBlockingQueue
,而是採用了更精細化的速率控制,保證速率會隨着記錄負載大小的變化和配置的隊列緩存大小而動態變化,這也降低了系統發生OOM的概率。
本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】
※收購3c,收購IPHONE,收購蘋果電腦-詳細收購流程一覽表
※網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線
※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益
※公開收購3c價格,不怕被賤賣!
※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象