Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] support event scheduler for scan/exchange/agg (part 2) #54338

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ set(EXEC_FILES
pipeline/group_execution/execution_group.cpp
pipeline/group_execution/execution_group_builder.cpp
pipeline/group_execution/group_operator.cpp
pipeline/schedule/timeout_tasks.cpp
pipeline/schedule/event_scheduler.cpp
pipeline/schedule/observer.cpp
pipeline/schedule/pipeline_timer.cpp
Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -729,10 +729,12 @@ bool Aggregator::is_chunk_buffer_empty() {
}

ChunkPtr Aggregator::poll_chunk_buffer() {
auto notify = defer_notify_sink();
return _limited_buffer->pull();
}

void Aggregator::offer_chunk_to_buffer(const ChunkPtr& chunk) {
auto notify = defer_notify_source();
_limited_buffer->push(chunk);
}

Expand Down
12 changes: 12 additions & 0 deletions be/src/exec/aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "exec/chunk_buffer_memory_manager.h"
#include "exec/limited_pipeline_chunk_buffer.h"
#include "exec/pipeline/context_with_dependency.h"
#include "exec/pipeline/schedule/observer.h"
#include "exec/pipeline/spill_process_channel.h"
#include "exprs/agg/aggregate_factory.h"
#include "exprs/expr.h"
Expand Down Expand Up @@ -402,6 +403,15 @@ class Aggregator : public pipeline::ContextWithDependency {

HashTableKeyAllocator _state_allocator;

void attach_sink_observer(RuntimeState* state, pipeline::PipelineObserver* observer) {
_pip_observable.attach_sink_observer(state, observer);
}
void attach_source_observer(RuntimeState* state, pipeline::PipelineObserver* observer) {
_pip_observable.attach_source_observer(state, observer);
}
auto defer_notify_source() { return _pip_observable.defer_notify_source(); }
auto defer_notify_sink() { return _pip_observable.defer_notify_sink(); }

protected:
AggregatorParamsPtr _params;

Expand Down Expand Up @@ -510,6 +520,8 @@ class Aggregator : public pipeline::ContextWithDependency {
// aggregate combinator functions since they are not persisted in agg hash map
std::vector<AggregateFunctionPtr> _combinator_function;

pipeline::PipeObservable _pip_observable;

public:
void build_hash_map(size_t chunk_size, bool agg_group_by_with_limit = false);
void build_hash_map(size_t chunk_size, std::atomic<int64_t>& shared_limit_countdown, bool agg_group_by_with_limit);
Expand Down
22 changes: 20 additions & 2 deletions be/src/exec/chunk_buffer_memory_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,17 @@ class ChunkBufferMemoryManager {
}

void update_memory_usage(int64_t memory_usage, int64_t num_rows) {
_memory_usage += memory_usage;
_buffered_num_rows += num_rows;
bool prev_full = is_full();
size_t prev_memusage = _memory_usage.fetch_add(memory_usage);
size_t prev_num_rows = _buffered_num_rows.fetch_add(num_rows);
bool is_full =
prev_memusage + memory_usage >= _max_memory_usage || prev_num_rows + num_rows >= _max_buffered_rows;
bool expect = false;
bool full_changed = prev_full != is_full;
if (!full_changed) {
return;
}
_full_events_changed.compare_exchange_strong(expect, full_changed);
}

size_t get_memory_limit_per_driver() const { return _max_memory_usage_per_driver; }
Expand All @@ -65,12 +74,21 @@ class ChunkBufferMemoryManager {
_buffered_num_rows = 0;
}

bool full_events_changed() {
if (!_full_events_changed.load(std::memory_order_acquire)) {
return false;
}
bool val = true;
return _full_events_changed.compare_exchange_strong(val, false);
}

private:
std::atomic<size_t> _max_memory_usage{128UL * 1024 * 1024 * 1024}; // 128GB
size_t _max_memory_usage_per_driver = 128 * 1024 * 1024UL; // 128MB
size_t _max_buffered_rows{};
std::atomic<int64_t> _memory_usage{};
std::atomic<int64_t> _buffered_num_rows{};
size_t _max_input_dop;
std::atomic<bool> _full_events_changed{};
};
} // namespace starrocks::pipeline
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Status AggregateBlockingSinkOperator::prepare(RuntimeState* state) {
_aggregator->limit() != -1 && // has limit
_aggregator->conjunct_ctxs().empty() && // no 'having' clause
_aggregator->get_aggr_phase() == AggrPhase2); // phase 2, keep it to make things safe
_aggregator->attach_sink_observer(state, this->_observer);
return Status::OK();
}

Expand All @@ -46,6 +47,7 @@ void AggregateBlockingSinkOperator::close(RuntimeState* state) {
Status AggregateBlockingSinkOperator::set_finishing(RuntimeState* state) {
if (_is_finished) return Status::OK();
ONCE_DETECT(_set_finishing_once);
auto notify = _aggregator->defer_notify_source();
auto defer = DeferOp([this]() {
COUNTER_UPDATE(_aggregator->input_row_count(), _aggregator->num_input_rows());
_aggregator->sink_complete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ class AggregateBlockingSinkOperatorFactory final : public OperatorFactory {

~AggregateBlockingSinkOperatorFactory() override = default;

bool support_event_scheduler() const override { return true; }

Status prepare(RuntimeState* state) override;

OperatorPtr create(int32_t degree_of_parallelism, int32_t driver_sequence) override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ bool AggregateBlockingSourceOperator::is_finished() const {
}

Status AggregateBlockingSourceOperator::set_finished(RuntimeState* state) {
auto notify = _aggregator->defer_notify_sink();
return _aggregator->set_finished();
}

Expand All @@ -37,6 +38,12 @@ void AggregateBlockingSourceOperator::close(RuntimeState* state) {
SourceOperator::close(state);
}

Status AggregateBlockingSourceOperator::prepare(RuntimeState* state) {
RETURN_IF_ERROR(SourceOperator::prepare(state));
_aggregator->attach_source_observer(state, this->_observer);
return Status::OK();
}

StatusOr<ChunkPtr> AggregateBlockingSourceOperator::pull_chunk(RuntimeState* state) {
RETURN_IF_CANCELLED(state);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class AggregateBlockingSourceOperator : public SourceOperator {

bool has_output() const override;
bool is_finished() const override;
Status prepare(RuntimeState* state) override;

Status set_finished(RuntimeState* state) override;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ namespace starrocks::pipeline {
Status AggregateDistinctBlockingSinkOperator::prepare(RuntimeState* state) {
RETURN_IF_ERROR(Operator::prepare(state));
RETURN_IF_ERROR(_aggregator->prepare(state, state->obj_pool(), _unique_metrics.get()));
return _aggregator->open(state);
RETURN_IF_ERROR(_aggregator->open(state));
_aggregator->attach_sink_observer(state, this->_observer);
return Status::OK();
}

void AggregateDistinctBlockingSinkOperator::close(RuntimeState* state) {
Expand All @@ -35,6 +37,7 @@ void AggregateDistinctBlockingSinkOperator::close(RuntimeState* state) {
Status AggregateDistinctBlockingSinkOperator::set_finishing(RuntimeState* state) {
if (_is_finished) return Status::OK();
ONCE_DETECT(_set_finishing_once);
auto notify = _aggregator->defer_notify_source();
auto defer = DeferOp([this]() {
COUNTER_UPDATE(_aggregator->input_row_count(), _aggregator->num_input_rows());
_aggregator->sink_complete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ class AggregateDistinctBlockingSinkOperatorFactory final : public OperatorFactor

~AggregateDistinctBlockingSinkOperatorFactory() override = default;

bool support_event_scheduler() const override { return true; }

Status prepare(RuntimeState* state) override {
RETURN_IF_ERROR(OperatorFactory::prepare(state));
return Status::OK();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ void AggregateDistinctBlockingSourceOperator::close(RuntimeState* state) {
SourceOperator::close(state);
}

Status AggregateDistinctBlockingSourceOperator::prepare(RuntimeState* state) {
RETURN_IF_ERROR(SourceOperator::prepare(state));
_aggregator->attach_source_observer(state, this->_observer);
return Status::OK();
}

StatusOr<ChunkPtr> AggregateDistinctBlockingSourceOperator::pull_chunk(RuntimeState* state) {
RETURN_IF_CANCELLED(state);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class AggregateDistinctBlockingSourceOperator : public SourceOperator {

bool has_output() const override;
bool is_finished() const override;
Status prepare(RuntimeState* state) override;

Status set_finished(RuntimeState* state) override;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Status AggregateDistinctStreamingSinkOperator::prepare(RuntimeState* state) {
if (_aggregator->streaming_preaggregation_mode() == TStreamingPreaggregationMode::LIMITED_MEM) {
_limited_mem_state.limited_memory_size = config::streaming_agg_limited_memory_size;
}
_aggregator->attach_sink_observer(state, this->_observer);
return _aggregator->open(state);
}

Expand All @@ -37,6 +38,7 @@ void AggregateDistinctStreamingSinkOperator::close(RuntimeState* state) {
}

Status AggregateDistinctStreamingSinkOperator::set_finishing(RuntimeState* state) {
auto notify = _aggregator->defer_notify_source();
_is_finished = true;

// skip processing if cancelled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ class AggregateDistinctStreamingSinkOperatorFactory final : public OperatorFacto

~AggregateDistinctStreamingSinkOperatorFactory() override = default;

bool support_event_scheduler() const override { return true; }

OperatorPtr create(int32_t degree_of_parallelism, int32_t driver_sequence) override {
return std::make_shared<AggregateDistinctStreamingSinkOperator>(
this, _id, _plan_node_id, driver_sequence, _aggregator_factory->get_or_create(driver_sequence));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,18 @@

#include "aggregate_distinct_streaming_source_operator.h"

#include "common/status.h"
#include "exec/pipeline/source_operator.h"
#include "runtime/runtime_state.h"

namespace starrocks::pipeline {

Status AggregateDistinctStreamingSourceOperator::prepare(RuntimeState* state) {
RETURN_IF_ERROR(SourceOperator::prepare(state));
_aggregator->attach_source_observer(state, this->_observer);
return Status::OK();
}

bool AggregateDistinctStreamingSourceOperator::has_output() const {
// There are two cases where chunk buffer is not null
// case1:streaming mode is 'FORCE_STREAMING'
Expand Down Expand Up @@ -46,6 +56,7 @@ bool AggregateDistinctStreamingSourceOperator::is_finished() const {
}

Status AggregateDistinctStreamingSourceOperator::set_finished(RuntimeState* state) {
auto notify = _aggregator->defer_notify_sink();
return _aggregator->set_finished();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class AggregateDistinctStreamingSourceOperator : public SourceOperator {

~AggregateDistinctStreamingSourceOperator() override = default;

Status prepare(RuntimeState* state) override;
bool has_output() const override;
bool is_finished() const override;

Expand Down Expand Up @@ -60,6 +61,8 @@ class AggregateDistinctStreamingSourceOperatorFactory final : public SourceOpera

~AggregateDistinctStreamingSourceOperatorFactory() override = default;

bool support_event_scheduler() const override { return true; }

OperatorPtr create(int32_t degree_of_parallelism, int32_t driver_sequence) override {
return std::make_shared<AggregateDistinctStreamingSourceOperator>(
this, _id, _plan_node_id, driver_sequence, _aggregator_factory->get_or_create(driver_sequence));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ Status AggregateStreamingSinkOperator::prepare(RuntimeState* state) {
if (_aggregator->streaming_preaggregation_mode() == TStreamingPreaggregationMode::LIMITED_MEM) {
_limited_mem_state.limited_memory_size = config::streaming_agg_limited_memory_size;
}
return _aggregator->open(state);
RETURN_IF_ERROR(_aggregator->open(state));
_aggregator->attach_sink_observer(state, this->_observer);
return Status::OK();
}

void AggregateStreamingSinkOperator::close(RuntimeState* state) {
Expand All @@ -40,8 +42,8 @@ void AggregateStreamingSinkOperator::close(RuntimeState* state) {
}

Status AggregateStreamingSinkOperator::set_finishing(RuntimeState* state) {
auto notify = _aggregator->defer_notify_source();
_is_finished = true;

// skip processing if cancelled
if (state->is_cancelled()) {
return Status::OK();
Expand Down Expand Up @@ -314,4 +316,12 @@ Status AggregateStreamingSinkOperator::reset_state(RuntimeState* state, const st
_is_finished = false;
return _aggregator->reset_state(state, refill_chunks, this);
}

std::string AggregateStreamingSinkOperator::get_name() const {
std::string finished = is_finished() ? "X" : "O";
auto full = _aggregator->is_chunk_buffer_full();
return fmt::format("{}_{}_{}({}) {{ full:{} has_output:{}}}", _name, _plan_node_id, (void*)this, finished, full,
has_output());
}

} // namespace starrocks::pipeline
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class AggregateStreamingSinkOperator : public Operator {
bool releaseable() const override { return true; }
void set_execute_mode(int performance_level) override;

std::string get_name() const override;

private:
// Invoked by push_chunk if current mode is TStreamingPreaggregationMode::FORCE_STREAMING
Status _push_chunk_by_force_streaming(const ChunkPtr& chunk);
Expand Down Expand Up @@ -85,6 +87,8 @@ class AggregateStreamingSinkOperatorFactory final : public OperatorFactory {

~AggregateStreamingSinkOperatorFactory() override = default;

bool support_event_scheduler() const override { return true; }

OperatorPtr create(int32_t degree_of_parallelism, int32_t driver_sequence) override {
return std::make_shared<AggregateStreamingSinkOperator>(this, _id, _plan_node_id, driver_sequence,
_aggregator_factory->get_or_create(driver_sequence));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,16 @@ bool AggregateStreamingSourceOperator::is_finished() const {
}

Status AggregateStreamingSourceOperator::set_finished(RuntimeState* state) {
auto notify = _aggregator->defer_notify_sink();
return _aggregator->set_finished();
}

Status AggregateStreamingSourceOperator::prepare(RuntimeState* state) {
RETURN_IF_ERROR(SourceOperator::prepare(state));
_aggregator->attach_source_observer(state, this->_observer);
return Status::OK();
}

void AggregateStreamingSourceOperator::close(RuntimeState* state) {
_aggregator->unref(state);
SourceOperator::close(state);
Expand Down Expand Up @@ -97,6 +104,7 @@ Status AggregateStreamingSourceOperator::_output_chunk_from_hash_map(ChunkPtr* c
}
});

// TODO: notify sink here
if (need_reset_aggregator) {
if (!_aggregator->is_sink_complete()) {
RETURN_IF_ERROR(_aggregator->reset_state(state, {}, nullptr, false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class AggregateStreamingSourceOperator : public SourceOperator {

bool has_output() const override;
bool is_finished() const override;
Status prepare(RuntimeState* state) override;
Status set_finished(RuntimeState* state) override;

void close(RuntimeState* state) override;
Expand All @@ -58,6 +59,8 @@ class AggregateStreamingSourceOperatorFactory final : public SourceOperatorFacto

~AggregateStreamingSourceOperatorFactory() override = default;

bool support_event_scheduler() const override { return true; }

OperatorPtr create(int32_t degree_of_parallelism, int32_t driver_sequence) override {
return std::make_shared<AggregateStreamingSourceOperator>(this, _id, _plan_node_id, driver_sequence,
_aggregator_factory->get_or_create(driver_sequence));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ class SpillableAggregateBlockingSinkOperatorFactory : public OperatorFactory {

OperatorPtr create(int32_t degree_of_parallelism, int32_t driver_sequence) override;

bool support_event_scheduler() const override { return false; }

private:
ObjectPool _pool;
SortExecExprs _sort_exprs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ class SpillableAggregateBlockingSourceOperatorFactory final : public SourceOpera

OperatorPtr create(int32_t degree_of_parallelism, int32_t driver_sequence) override;

bool support_event_scheduler() const override { return false; }

private:
AggregatorFactoryPtr _hash_aggregator_factory;
// _stream_aggregatory_factory is only used when spilling happens
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ class SpillableAggregateDistinctBlockingSinkOperatorFactory final : public Opera

OperatorPtr create(int32_t degree_of_parallelism, int32_t driver_sequence) override;

bool support_event_scheduler() const override { return false; }

private:
ObjectPool _pool;
SortExecExprs _sort_exprs;
Expand Down Expand Up @@ -144,6 +146,8 @@ class SpillableAggregateDistinctBlockingSourceOperatorFactory final : public Sou

OperatorPtr create(int32_t degree_of_parallelism, int32_t driver_sequence) override;

bool support_event_scheduler() const override { return false; }

private:
AggregatorFactoryPtr _hash_aggregator_factory;
// _stream_aggregatory_factory is only used when spilling happens
Expand Down
Loading
Loading