From 28911e1fc84ee5c1a13c1a4b4269e926798cd3a9 Mon Sep 17 00:00:00 2001 From: Pxl Date: Wed, 1 Jul 2026 09:30:05 +0800 Subject: [PATCH] [fix](be) Avoid local runtime filter merge deadlock (#64866) Issue Number: None Related PR: None Problem Summary: Local runtime filter merge can deadlock when one join build instance publishes a local-merge runtime filter while another instance sends its runtime filter size. The old local merge context lock protected both the merger and the producer list, so one path could hold a producer runtime filter lock and then wait for the context lock while another path held the context lock and then waited for a producer lock. This change gives RuntimeFilterMerger its own internal synchronization and makes LocalMergeContext expose a snapshot of the merger and producers. Publish, send-size, and sync-size paths take the context lock only while copying that snapshot, then merge filters or update producer sizes outside the context lock. RuntimeFilterMerger returns the ready transition from merge_from directly, removing the separate unlocked ready check. None - Test: Unit Test - build-support/clang-format.sh be/src/exec/runtime_filter/runtime_filter_merger.h be/src/exec/runtime_filter/runtime_filter_mgr.cpp be/src/exec/runtime_filter/runtime_filter_mgr.h be/src/exec/runtime_filter/runtime_filter_producer.cpp be/test/exec/runtime_filter/runtime_filter_merger_test.cpp be/test/exec/runtime_filter/runtime_filter_mgr_test.cpp - git diff --cached --check - ./run-be-ut.sh --run --filter=RuntimeFilterMgrTest.* - ./run-be-ut.sh --run --filter=RuntimeFilterMergerTest.* - Behavior changed: No - Does this need documentation: No (cherry picked from commit 9d7d3a2c4f437e0c2962bde1a49848ed0535063c) --- .../runtime_filter/runtime_filter_merger.h | 27 +++- .../runtime_filter/runtime_filter_mgr.cpp | 141 +++++++++++------- .../exec/runtime_filter/runtime_filter_mgr.h | 35 +++-- .../runtime_filter_producer.cpp | 42 +++--- be/src/runtime/runtime_state.cpp | 2 +- .../runtime_filter_merger_test.cpp | 26 ++-- .../runtime_filter_mgr_test.cpp | 33 ++-- 7 files changed, 182 insertions(+), 124 deletions(-) diff --git a/be/src/exec/runtime_filter/runtime_filter_merger.h b/be/src/exec/runtime_filter/runtime_filter_merger.h index 41ab571d6a258f..4e1d19dabc3a7b 100644 --- a/be/src/exec/runtime_filter/runtime_filter_merger.h +++ b/be/src/exec/runtime_filter/runtime_filter_merger.h @@ -17,6 +17,8 @@ #pragma once +#include + #include "exec/runtime_filter/runtime_filter.h" #include "exec/runtime_filter/runtime_filter_definitions.h" #include "exprs/vexpr.h" @@ -47,6 +49,7 @@ class RuntimeFilterMerger : public RuntimeFilter { } std::string debug_string() override { + std::unique_lock l(_rmtx); return fmt::format( "Merger: ({}, expected_producer_num: {}, received_producer_num: {}, " "received_rf_size_num: {}, received_sum_size: {})", @@ -55,12 +58,15 @@ class RuntimeFilterMerger : public RuntimeFilter { } // If input is a disabled predicate, the final result is a disabled predicate. - Status merge_from(const RuntimeFilter* other) { + // Returns true only for the call that makes the merger ready. + Status merge_from(const RuntimeFilter* other, bool* ready) { + std::unique_lock l(_rmtx); _received_producer_num++; if (_expected_producer_num < _received_producer_num) { return Status::InternalError( "runtime filter merger input product more than expected, {}", debug_string()); } + *ready = _received_producer_num == _expected_producer_num; if (_received_producer_num == _expected_producer_num) { _rf_state = State::READY; } @@ -72,18 +78,26 @@ class RuntimeFilterMerger : public RuntimeFilter { return st; } - void set_expected_producer_num(int num) { + // Only raise the expected producer count. RuntimeFilterMgr may compute the + // count under its own lock and apply it after releasing that lock, so + // concurrent registrations can update the merger out of order. + void increase_expected_producer_num(int num) { + std::unique_lock l(_rmtx); if (_received_producer_num > 0 || _received_rf_size_num > 0) { throw Exception(ErrorCode::INTERNAL_ERROR, "runtime filter merger set expected producer after receive data, {}", debug_string()); } - _expected_producer_num = num; + _expected_producer_num = std::max(_expected_producer_num, num); } - int get_expected_producer_num() const { return _expected_producer_num; } + int get_expected_producer_num() { + std::unique_lock l(_rmtx); + return _expected_producer_num; + } bool add_rf_size(uint64_t size) { + std::unique_lock l(_rmtx); _received_rf_size_num++; if (_expected_producer_num < _received_rf_size_num) { throw Exception(ErrorCode::INTERNAL_ERROR, @@ -94,7 +108,10 @@ class RuntimeFilterMerger : public RuntimeFilter { return (_received_rf_size_num == _expected_producer_num); } - uint64_t get_received_sum_size() const { return _received_sum_size; } + uint64_t get_received_sum_size() { + std::unique_lock l(_rmtx); + return _received_sum_size; + } bool ready() const { return _rf_state == State::READY; } diff --git a/be/src/exec/runtime_filter/runtime_filter_mgr.cpp b/be/src/exec/runtime_filter/runtime_filter_mgr.cpp index 49007d5c73534b..e52439d9c2691f 100644 --- a/be/src/exec/runtime_filter/runtime_filter_mgr.cpp +++ b/be/src/exec/runtime_filter/runtime_filter_mgr.cpp @@ -54,7 +54,7 @@ RuntimeFilterMgr::RuntimeFilterMgr(const bool is_global) std::vector> RuntimeFilterMgr::get_consume_filters( int filter_id) { - std::lock_guard l(_lock); + LockGuard l(_lock); auto iter = _consumer_map.find(filter_id); if (iter == _consumer_map.end()) { return {}; @@ -68,13 +68,17 @@ Status RuntimeFilterMgr::register_consumer_filter( SCOPED_CONSUME_MEM_TRACKER(_tracker.get()); int32_t key = desc.filter_id; - std::lock_guard l(_lock); - RETURN_IF_ERROR(RuntimeFilterConsumer::create(state, &desc, node_id, consumer)); - _consumer_map[key].push_back(*consumer); + std::shared_ptr new_consumer; + RETURN_IF_ERROR(RuntimeFilterConsumer::create(state, &desc, node_id, &new_consumer)); + { + LockGuard l(_lock); + _consumer_map[key].push_back(new_consumer); + } + *consumer = new_consumer; return Status::OK(); } -Status RuntimeFilterMgr::register_local_merger_producer_filter( +Status RuntimeFilterMgr::register_local_merge_producer_filter( const QueryContext* query_ctx, const TRuntimeFilterDesc& desc, std::shared_ptr producer) { if (!_is_global) [[unlikely]] { @@ -89,55 +93,57 @@ Status RuntimeFilterMgr::register_local_merger_producer_filter( } SCOPED_CONSUME_MEM_TRACKER(_tracker.get()); int32_t key = desc.filter_id; + uint32_t producer_stage = producer->stage(); - LocalMergeContext* context; + std::shared_ptr context; + std::shared_ptr merger; + int expected_producer_num = 0; { - std::lock_guard l(_lock); - context = &_local_merge_map[key]; // may inplace construct default object - } + LockGuard l(_lock); + auto iter = _local_merge_map.find(key); + if (iter == _local_merge_map.end() || !iter->second || + producer_stage > iter->second->stage) { + auto new_context = std::make_shared(); + RETURN_IF_ERROR(RuntimeFilterMerger::create(query_ctx, &desc, &new_context->merger)); + new_context->stage = producer_stage; + _local_merge_map.insert_or_assign(key, new_context); + context = new_context; + } else { + context = iter->second; + } - RETURN_IF_ERROR(context->register_producer(query_ctx, &desc, producer)); - return Status::OK(); -} + context->producers.emplace_back(producer); + merger = context->merger; + expected_producer_num = cast_set(context->producers.size()); + } -Status LocalMergeContext::register_producer(const QueryContext* query_ctx, - const TRuntimeFilterDesc* desc, - std::shared_ptr producer) { - std::lock_guard l(mtx); - if (producer->stage() > stage) { - // New recursive CTE round: discard stale merger and producers from - // the previous round and recreate the merger for the new round. - merger.reset(); - producers.clear(); - stage = producer->stage(); - } - if (!merger) { - RETURN_IF_ERROR(RuntimeFilterMerger::create(query_ctx, desc, &merger)); - } - producers.emplace_back(producer); - merger->set_expected_producer_num(cast_set(producers.size())); + merger->increase_expected_producer_num(expected_producer_num); // Sync the local merger's stage from the producer so that outgoing merge RPCs // (via _push_to_remote) carry the correct recursive CTE round number. - merger->set_stage(producer->stage()); + merger->set_stage(producer_stage); return Status::OK(); } -Status RuntimeFilterMgr::get_local_merge_producer_filters(int filter_id, - LocalMergeContext** local_merge_filters) { +Status RuntimeFilterMgr::get_local_merge_context(int filter_id, uint32_t expected_stage, + std::shared_ptr* context) { if (!_is_global) [[unlikely]] { return Status::InternalError( "A local merge filter can not be registered in Local RuntimeFilterMgr"); } - std::lock_guard l(_lock); + context->reset(); + LockGuard l(_lock); auto iter = _local_merge_map.find(filter_id); if (iter == _local_merge_map.end()) { - // Filter may have been removed during a recursive CTE stage reset. // Return OK with nullptr to let the caller skip gracefully. - *local_merge_filters = nullptr; return Status::OK(); } - *local_merge_filters = &iter->second; - DCHECK(iter->second.merger); + if (!iter->second) { + return Status::InternalError("local merge context is nullptr for filter_id: {}", filter_id); + } + if (expected_stage != iter->second->stage) { + return Status::OK(); + } + *context = iter->second; return Status::OK(); } @@ -151,18 +157,28 @@ Status RuntimeFilterMgr::register_producer_filter( SCOPED_CONSUME_MEM_TRACKER(_tracker.get()); int32_t key = desc.filter_id; - std::lock_guard l(_lock); - if (_producer_id_set.contains(key)) { - return Status::InvalidArgument("filter {} has been registered", key); + { + LockGuard l(_lock); + if (_producer_id_set.contains(key)) { + return Status::InvalidArgument("filter {} has been registered", key); + } + } + std::shared_ptr new_producer; + RETURN_IF_ERROR(RuntimeFilterProducer::create(query_ctx, &desc, &new_producer)); + { + LockGuard l(_lock); + if (_producer_id_set.contains(key)) { + return Status::InvalidArgument("filter {} has been registered", key); + } + _producer_id_set.insert(key); } - RETURN_IF_ERROR(RuntimeFilterProducer::create(query_ctx, &desc, producer)); - _producer_id_set.insert(key); + *producer = new_producer; return Status::OK(); } bool RuntimeFilterMgr::set_runtime_filter_params( const TRuntimeFilterParams& runtime_filter_params) { - std::lock_guard l(_lock); + LockGuard l(_lock); if (!_has_merge_addr) { _merge_addr = runtime_filter_params.runtime_filter_merge_addr; _has_merge_addr = true; @@ -195,7 +211,7 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc( cnt_val->targetv2_info = targetv2_info; RETURN_IF_ERROR( RuntimeFilterMerger::create(query_ctx.get(), runtime_filter_desc, &cnt_val->merger)); - cnt_val->merger->set_expected_producer_num(producer_size); + cnt_val->merger->increase_expected_producer_num(producer_size); return Status::OK(); } @@ -297,13 +313,13 @@ Status RuntimeFilterMergeControllerEntity::send_filter_size(std::shared_ptrfilter_id(), &local_merge_filters)); - if (local_merge_filters == nullptr) { + std::shared_ptr context; + RETURN_IF_ERROR(get_local_merge_context(request->filter_id(), request->stage(), &context)); + if (!context) { // Filter was removed during a recursive CTE stage reset; discard stale request. return Status::OK(); } - for (auto producer : local_merge_filters->producers) { + for (const auto& producer : context->producers) { producer->set_synced_size(request->filter_size()); } return Status::OK(); @@ -311,18 +327,32 @@ Status RuntimeFilterMgr::sync_filter_size(const PSyncFilterSizeRequest* request) std::string RuntimeFilterMgr::debug_string() { std::string result = "Local Merger Info:\n"; - std::lock_guard l(_lock); - for (const auto& [filter_id, ctx] : _local_merge_map) { + struct LocalMergeContextSnapshot { + std::shared_ptr merger; + std::vector> producers; + }; + std::vector local_merge_contexts; + std::vector> consumers; + { + LockGuard l(_lock); + for (const auto& [filter_id, ctx] : _local_merge_map) { + DORIS_CHECK(ctx); + DORIS_CHECK(ctx->merger); + local_merge_contexts.push_back({ctx->merger, ctx->producers}); + } + for (const auto& [filter_id, filter_consumers] : _consumer_map) { + consumers.insert(consumers.end(), filter_consumers.begin(), filter_consumers.end()); + } + } + for (const auto& ctx : local_merge_contexts) { result += fmt::format("{}\n", ctx.merger->debug_string()); for (const auto& producer : ctx.producers) { result += fmt::format("{}\n", producer->debug_string()); } } result += "Consumer Info:\n"; - for (const auto& [filter_id, consumers] : _consumer_map) { - for (const auto& consumer : consumers) { - result += fmt::format("{}\n", consumer->debug_string()); - } + for (const auto& consumer : consumers) { + result += fmt::format("{}\n", consumer->debug_string()); } return result; } @@ -366,10 +396,9 @@ Status RuntimeFilterMergeControllerEntity::merge(std::shared_ptr q RETURN_IF_ERROR(tmp_filter->assign(*request, attach_data)); - RETURN_IF_ERROR(cnt_val.merger->merge_from(tmp_filter.get())); + RETURN_IF_ERROR(cnt_val.merger->merge_from(tmp_filter.get(), &is_ready)); cnt_val.arrive_id.insert(UniqueId(request->fragment_instance_id())); - is_ready = cnt_val.merger->ready(); // update is_ready in locked scope } if (is_ready) { @@ -477,7 +506,7 @@ Status GlobalMergeContext::reset(QueryContext* query_ctx) { DORIS_CHECK(merger); int producer_size = merger->get_expected_producer_num(); RETURN_IF_ERROR(RuntimeFilterMerger::create(query_ctx, &runtime_filter_desc, &merger)); - merger->set_expected_producer_num(producer_size); + merger->increase_expected_producer_num(producer_size); arrive_id.clear(); source_addrs.clear(); done = false; diff --git a/be/src/exec/runtime_filter/runtime_filter_mgr.h b/be/src/exec/runtime_filter/runtime_filter_mgr.h index 418f9aa41b7414..d680e339f614b3 100644 --- a/be/src/exec/runtime_filter/runtime_filter_mgr.h +++ b/be/src/exec/runtime_filter/runtime_filter_mgr.h @@ -53,15 +53,12 @@ class QueryContext; class ExecEnv; struct LocalMergeContext { - std::mutex mtx; std::shared_ptr merger; std::vector> producers; // Tracks the recursive CTE round. When a producer from a newer round - // registers, the context is reset (merger recreated, old producers dropped). + // registers, RuntimeFilterMgr replaces the whole context and old in-flight + // users keep the previous context alive through shared_ptr. uint32_t stage = 0; - - Status register_producer(const QueryContext* query_ctx, const TRuntimeFilterDesc* desc, - std::shared_ptr producer); }; struct GlobalMergeContext { @@ -92,11 +89,12 @@ class RuntimeFilterMgr { int node_id, std::shared_ptr* consumer_filter); - Status register_local_merger_producer_filter(const QueryContext* query_ctx, - const TRuntimeFilterDesc& desc, - std::shared_ptr producer); + Status register_local_merge_producer_filter(const QueryContext* query_ctx, + const TRuntimeFilterDesc& desc, + std::shared_ptr producer); - Status get_local_merge_producer_filters(int filter_id, LocalMergeContext** local_merge_filters); + Status get_local_merge_context(int filter_id, uint32_t expected_stage, + std::shared_ptr* context); // Create local producer. This producer is hold by RuntimeFilterProducerHelper. Status register_producer_filter(const QueryContext* query_ctx, const TRuntimeFilterDesc& desc, @@ -110,10 +108,10 @@ class RuntimeFilterMgr { std::string debug_string(); void remove_filter(int32_t filter_id) { - std::lock_guard l(_lock); + LockGuard l(_lock); _consumer_map.erase(filter_id); - // NOTE: _local_merge_map is NOT erased here. It is reset lazily in - // LocalMergeContext::register_producer when a producer from a newer + // NOTE: _local_merge_map is NOT erased here. It is replaced lazily in + // register_local_merge_producer_filter when a producer from a newer // recursive CTE round registers. Erasing eagerly here would race with // multi-fragment REBUILD: a consumer-only fragment's remove_filter could // delete the entry that the producer fragment just re-registered. @@ -136,16 +134,21 @@ class RuntimeFilterMgr { // RuntimeFilterMgr is owned by RuntimeState, so we only // use filter_id as key // key: "filter-id" - std::map>> _consumer_map; - std::set _producer_id_set; - std::map _local_merge_map; + // Protects fields marked GUARDED_BY(_lock). While holding this lock, only + // access RuntimeFilterMgr-owned state or copy shared_ptr snapshots; do not + // call methods on existing RuntimeFilter objects, because RF objects have + // their own locks and may call back into RuntimeFilterMgr. + AnnotatedMutex _lock; + std::map>> _consumer_map + GUARDED_BY(_lock); + std::set _producer_id_set GUARDED_BY(_lock); + std::map> _local_merge_map GUARDED_BY(_lock); std::unique_ptr _tracker; TNetworkAddress _merge_addr; bool _has_merge_addr = false; - std::mutex _lock; }; // controller -> diff --git a/be/src/exec/runtime_filter/runtime_filter_producer.cpp b/be/src/exec/runtime_filter/runtime_filter_producer.cpp index 6105014df10df4..ede2fb9e756d3a 100644 --- a/be/src/exec/runtime_filter/runtime_filter_producer.cpp +++ b/be/src/exec/runtime_filter/runtime_filter_producer.cpp @@ -54,16 +54,16 @@ Status RuntimeFilterProducer::publish(RuntimeState* state, bool build_hash_table // when global consumer not exist, send_to_local_targets will do nothing, so merge rf is useless return Status::OK(); } - LocalMergeContext* context = nullptr; - RETURN_IF_ERROR(state->global_runtime_filter_mgr()->get_local_merge_producer_filters( - _wrapper->filter_id(), &context)); - if (context == nullptr) { + std::shared_ptr context; + RETURN_IF_ERROR(state->global_runtime_filter_mgr()->get_local_merge_context( + _wrapper->filter_id(), _stage, &context)); + if (!context) { // Filter was removed during a recursive CTE stage reset; this producer is stale. return Status::OK(); } - std::lock_guard l(context->mtx); - RETURN_IF_ERROR(context->merger->merge_from(this)); - if (context->merger->ready()) { + bool ready = false; + RETURN_IF_ERROR(context->merger->merge_from(this, &ready)); + if (ready) { if (_has_remote_target) { RETURN_IF_ERROR(_send_to_remote_targets(state, context->merger.get())); } else { @@ -161,26 +161,26 @@ Status RuntimeFilterProducer::send_size(RuntimeState* state, uint64_t local_filt set_state(State::WAITING_FOR_SYNCED_SIZE); if (_need_do_merge(state)) { - LocalMergeContext* merger_context = nullptr; - RETURN_IF_ERROR(state->global_runtime_filter_mgr()->get_local_merge_producer_filters( - _wrapper->filter_id(), &merger_context)); - if (merger_context == nullptr) { + std::shared_ptr context; + RETURN_IF_ERROR(state->global_runtime_filter_mgr()->get_local_merge_context( + _wrapper->filter_id(), _stage, &context)); + if (!context) { // Filter was removed during a recursive CTE stage reset; this producer is stale. return Status::OK(); } - std::lock_guard merger_lock(merger_context->mtx); - if (merger_context->merger->add_rf_size(local_filter_size)) { - if (!_has_remote_target) { - for (auto filter : merger_context->producers) { - filter->set_synced_size(merger_context->merger->get_received_sum_size()); - } - return Status::OK(); - } else { - local_filter_size = merger_context->merger->get_received_sum_size(); + uint64_t received_sum_size = 0; + bool ready_to_sync = context->merger->add_rf_size(local_filter_size); + if (!ready_to_sync) { + return Status::OK(); + } + received_sum_size = context->merger->get_received_sum_size(); + if (!_has_remote_target) { + for (const auto& filter : context->producers) { + filter->set_synced_size(received_sum_size); } - } else { return Status::OK(); } + local_filter_size = received_sum_size; } else if (!_has_remote_target) { set_synced_size(local_filter_size); diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index eb12f83fedb1ac..75f05269853039 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -526,7 +526,7 @@ Status RuntimeState::register_producer_runtime_filter( DORIS_CHECK(pfc); (*producer_filter)->set_stage(pfc->rec_cte_stage()); } - RETURN_IF_ERROR(global_runtime_filter_mgr()->register_local_merger_producer_filter( + RETURN_IF_ERROR(global_runtime_filter_mgr()->register_local_merge_producer_filter( _query_ctx, desc, *producer_filter)); return Status::OK(); } diff --git a/be/test/exec/runtime_filter/runtime_filter_merger_test.cpp b/be/test/exec/runtime_filter/runtime_filter_merger_test.cpp index da9d5b66291795..3d0bb701c8b1e9 100644 --- a/be/test/exec/runtime_filter/runtime_filter_merger_test.cpp +++ b/be/test/exec/runtime_filter/runtime_filter_merger_test.cpp @@ -35,15 +35,17 @@ class RuntimeFilterMergerTest : public RuntimeFilterTest { auto desc = TRuntimeFilterDescBuilder().build(); FAIL_IF_ERROR_OR_CATCH_EXCEPTION( RuntimeFilterMerger::create(_query_ctx.get(), &desc, &merger)); - merger->set_expected_producer_num(2); + merger->increase_expected_producer_num(2); ASSERT_FALSE(merger->ready()); ASSERT_EQ(merger->_wrapper->_state, RuntimeFilterWrapper::State::UNINITED); + bool ready = false; std::shared_ptr producer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( _runtime_states[0]->register_producer_runtime_filter(desc, &producer)); producer->set_wrapper_state_and_ready_to_publish(first_product_state); - FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get())); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get(), &ready)); + ASSERT_FALSE(ready); ASSERT_FALSE(merger->ready()); ASSERT_EQ(merger->_wrapper->_state, first_expected_state); @@ -51,7 +53,8 @@ class RuntimeFilterMergerTest : public RuntimeFilterTest { FAIL_IF_ERROR_OR_CATCH_EXCEPTION( _runtime_states[1]->register_producer_runtime_filter(desc, &producer2)); producer2->set_wrapper_state_and_ready_to_publish(second_product_state); - FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer2.get())); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer2.get(), &ready)); + ASSERT_TRUE(ready); ASSERT_TRUE(merger->ready()); ASSERT_EQ(merger->_wrapper->_state, second_expected_state); } @@ -63,15 +66,17 @@ class RuntimeFilterMergerTest : public RuntimeFilterTest { std::shared_ptr merger; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( RuntimeFilterMerger::create(_query_ctx.get(), &desc, &merger)); - merger->set_expected_producer_num(1); + merger->increase_expected_producer_num(1); ASSERT_FALSE(merger->ready()); + bool ready = false; std::shared_ptr producer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( _runtime_states[0]->register_producer_runtime_filter(desc, &producer)); FAIL_IF_ERROR_OR_CATCH_EXCEPTION(producer->init(123)); producer->set_wrapper_state_and_ready_to_publish(state); - FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get())); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get(), &ready)); + ASSERT_TRUE(ready); ASSERT_TRUE(merger->ready()); PMergeFilterRequest request; @@ -99,7 +104,7 @@ TEST_F(RuntimeFilterMergerTest, add_rf_size) { std::shared_ptr merger; auto desc = TRuntimeFilterDescBuilder().build(); FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterMerger::create(_query_ctx.get(), &desc, &merger)); - merger->set_expected_producer_num(2); + merger->increase_expected_producer_num(2); ASSERT_FALSE(merger->add_rf_size(123)); ASSERT_TRUE(merger->add_rf_size(1)); @@ -118,22 +123,25 @@ TEST_F(RuntimeFilterMergerTest, invalid_merge) { std::shared_ptr merger; auto desc = TRuntimeFilterDescBuilder().build(); FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterMerger::create(_query_ctx.get(), &desc, &merger)); - merger->set_expected_producer_num(1); + merger->increase_expected_producer_num(1); ASSERT_FALSE(merger->ready()); ASSERT_EQ(merger->_wrapper->_state, RuntimeFilterWrapper::State::UNINITED); + bool ready = false; std::shared_ptr producer; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( _runtime_states[0]->register_producer_runtime_filter(desc, &producer)); producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY); - FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get())); // ready wrapper + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get(), &ready)); + ASSERT_TRUE(ready); + ASSERT_TRUE(merger->ready()); ASSERT_EQ(merger->_wrapper->_state, RuntimeFilterWrapper::State::READY); std::shared_ptr producer2; FAIL_IF_ERROR_OR_CATCH_EXCEPTION( _runtime_states[1]->register_producer_runtime_filter(desc, &producer2)); producer2->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY); - auto st = merger->merge_from(producer2.get()); + auto st = merger->merge_from(producer2.get(), &ready); ASSERT_EQ(st.code(), ErrorCode::INTERNAL_ERROR); } diff --git a/be/test/exec/runtime_filter/runtime_filter_mgr_test.cpp b/be/test/exec/runtime_filter/runtime_filter_mgr_test.cpp index d6ccc080961333..5a50e762c2527f 100644 --- a/be/test/exec/runtime_filter/runtime_filter_mgr_test.cpp +++ b/be/test/exec/runtime_filter/runtime_filter_mgr_test.cpp @@ -77,12 +77,12 @@ TEST_F(RuntimeFilterMgrTest, TestRuntimeFilterMgr) { // producer_filter should not be nullptr EXPECT_FALSE( global_runtime_filter_mgr - ->register_local_merger_producer_filter(ctx.get(), desc, producer_filter) + ->register_local_merge_producer_filter(ctx.get(), desc, producer_filter) .ok()); // local merge filter should not be registered in local mgr EXPECT_FALSE( local_runtime_filter_mgr - ->register_local_merger_producer_filter(ctx.get(), desc, producer_filter) + ->register_local_merge_producer_filter(ctx.get(), desc, producer_filter) .ok()); // producer should not registered in global mgr EXPECT_FALSE(global_runtime_filter_mgr @@ -103,28 +103,29 @@ TEST_F(RuntimeFilterMgrTest, TestRuntimeFilterMgr) { .ok()); EXPECT_NE(producer_filter, nullptr); - LocalMergeContext* local_merge_filters = nullptr; + std::shared_ptr context; // filter_id not yet registered: global mgr returns OK with nullptr // (graceful skip for recursive CTE stage reset). EXPECT_TRUE(global_runtime_filter_mgr - ->get_local_merge_producer_filters(filter_id, &local_merge_filters) + ->get_local_merge_context(filter_id, producer_filter->stage(), &context) .ok()); - EXPECT_EQ(local_merge_filters, nullptr); + EXPECT_EQ(context, nullptr); // local mgr always returns error (not supported) - EXPECT_FALSE(local_runtime_filter_mgr - ->get_local_merge_producer_filters(filter_id, &local_merge_filters) - .ok()); - // Register local merge filter - EXPECT_TRUE( - global_runtime_filter_mgr - ->register_local_merger_producer_filter(ctx.get(), desc, producer_filter) + EXPECT_FALSE( + local_runtime_filter_mgr + ->get_local_merge_context(filter_id, producer_filter->stage(), &context) .ok()); + // Register local merge filter + EXPECT_TRUE(global_runtime_filter_mgr + ->register_local_merge_producer_filter(ctx.get(), desc, producer_filter) + .ok()); EXPECT_TRUE(global_runtime_filter_mgr - ->get_local_merge_producer_filters(filter_id, &local_merge_filters) + ->get_local_merge_context(filter_id, producer_filter->stage(), &context) .ok()); - EXPECT_NE(local_merge_filters, nullptr); - EXPECT_EQ(local_merge_filters->producers.size(), 1); - local_merge_filters->producers.front()->_rf_state = + EXPECT_NE(context, nullptr); + EXPECT_NE(context->merger, nullptr); + EXPECT_EQ(context->producers.size(), 1); + context->producers.front()->_rf_state = RuntimeFilterProducer::State ::WAITING_FOR_SYNCED_SIZE; } {