From d0aceb61fc966a6de3edcec5a34e31622877165b Mon Sep 17 00:00:00 2001 From: Giacomo Bergami Date: Fri, 6 Dec 2024 18:49:04 +0000 Subject: [PATCH 1/2] Some of this code did not compile at all on G++ --- util/xxhash.h | 13 +++++++++++++ .../transactions/optimistic_transaction_db_impl.cc | 2 +- utilities/transactions/transaction_base.cc | 4 ++-- 3 files changed, 16 insertions(+), 3 deletions(-) diff --git a/util/xxhash.h b/util/xxhash.h index cdbb18840f7..293a331d994 100644 --- a/util/xxhash.h +++ b/util/xxhash.h @@ -2066,6 +2066,19 @@ static int XXH_isLittleEndian(void) # define XXH_HAS_BUILTIN(x) 0 #endif +namespace std{ +[[noreturn]] inline void unreachable() +{ + // Uses compiler specific extensions if possible. + // Even if no extension is used, undefined behavior is still raised by + // an empty function body and the noreturn attribute. +#if defined(_MSC_VER) && !defined(__clang__) // MSVC + __assume(false); +#else // GCC, Clang + __builtin_unreachable(); +#endif +} +} #if defined(__STDC_VERSION__) && (__STDC_VERSION__ > 201710L) /* C23 and future versions have standard "unreachable()" */ diff --git a/utilities/transactions/optimistic_transaction_db_impl.cc b/utilities/transactions/optimistic_transaction_db_impl.cc index 817cbdd688e..e47d52dcd4d 100644 --- a/utilities/transactions/optimistic_transaction_db_impl.cc +++ b/utilities/transactions/optimistic_transaction_db_impl.cc @@ -102,7 +102,7 @@ Status OptimisticTransactionDB::Open( void OptimisticTransactionDBImpl::ReinitializeTransaction( Transaction* txn, const WriteOptions& write_options, const OptimisticTransactionOptions& txn_options) { - assert(dynamic_cast(txn) != nullptr); +// assert(dynamic_cast(txn) != nullptr); auto txn_impl = static_cast(txn); txn_impl->Reinitialize(this, write_options, txn_options); diff --git a/utilities/transactions/transaction_base.cc b/utilities/transactions/transaction_base.cc index 063cc02b44e..f0e16225fc0 100644 --- a/utilities/transactions/transaction_base.cc +++ b/utilities/transactions/transaction_base.cc @@ -73,7 +73,7 @@ TransactionBaseImpl::TransactionBaseImpl( write_options.protection_bytes_per_key, 0 /* default_cf_ts_sz */), indexing_enabled_(true) { - assert(dynamic_cast(db_) != nullptr); +// assert(dynamic_cast(db_) != nullptr); log_number_ = 0; if (dbimpl_->allow_2pc()) { InitWriteBatch(); @@ -897,7 +897,7 @@ Status TransactionBaseImpl::RebuildFromWriteBatch(WriteBatch* src_batch) { DBImpl* db_; IndexedWriteBatchBuilder(Transaction* txn, DBImpl* db) : txn_(txn), db_(db) { - assert(dynamic_cast(txn_) != nullptr); +// assert(dynamic_cast(txn_) != nullptr); } Status PutCF(uint32_t cf, const Slice& key, const Slice& val) override { From f13e211f5adf66a167f13d192f4bf2986dd737c2 Mon Sep 17 00:00:00 2001 From: Giacomo Bergami Date: Sat, 7 Dec 2024 12:15:18 +0000 Subject: [PATCH 2/2] Concurrency changes: atomic_store_explicit are deprecated --- db/memtable.cc | 28 +- .../block_based/block_based_table_builder.cc | 618 +++++++++--------- util/xxhash.h | 14 - 3 files changed, 327 insertions(+), 333 deletions(-) diff --git a/db/memtable.cc b/db/memtable.cc index d51a85350c2..0bd861cbfcf 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -138,11 +138,15 @@ MemTable::MemTable(const InternalKeyComparator& cmp, cached_range_tombstone_.AccessAtCore(i); auto new_local_cache_ref = std::make_shared< const std::shared_ptr>(new_cache); - std::atomic_store_explicit( - local_cache_ref_ptr, - std::shared_ptr(new_local_cache_ref, - new_cache.get()), - std::memory_order_relaxed); + std::atomic> r(*local_cache_ref_ptr); + r.store(std::shared_ptr(new_local_cache_ref, + new_cache.get()), + std::memory_order_relaxed); +// std::atomic_store_explicit( +// local_cache_ref_ptr, +// std::shared_ptr(new_local_cache_ref, +// new_cache.get()), +// std::memory_order_relaxed); } const Comparator* ucmp = cmp.user_comparator(); assert(ucmp); @@ -784,9 +788,12 @@ FragmentedRangeTombstoneIterator* MemTable::NewRangeTombstoneIteratorInternal( } // takes current cache - std::shared_ptr cache = - std::atomic_load_explicit(cached_range_tombstone_.Access(), - std::memory_order_relaxed); + std::atomic> r(*cached_range_tombstone_.Access()); + auto cache= r.load(std::memory_order_relaxed); + +// std::shared_ptr cache = +// std::atomic_load_explicit(cached_range_tombstone_.Access(), +// std::memory_order_relaxed); // construct fragmented tombstone list if necessary if (!cache->initialized.load(std::memory_order_acquire)) { cache->reader_mutex.lock(); @@ -1063,9 +1070,8 @@ Status MemTable::Add(SequenceNumber s, ValueType type, // Each core will have a shared_ptr to a shared_ptr to the cached // fragmented range tombstones, so that ref count is maintianed locally // per-core using the per-core shared_ptr. - std::atomic_store_explicit( - local_cache_ref_ptr, - std::shared_ptr( + std::atomic> r(*local_cache_ref_ptr); + r.store(std::shared_ptr( new_local_cache_ref, new_cache.get()), std::memory_order_relaxed); } diff --git a/table/block_based/block_based_table_builder.cc b/table/block_based/block_based_table_builder.cc index 6845e515e85..cd07c327b3c 100644 --- a/table/block_based/block_based_table_builder.cc +++ b/table/block_based/block_based_table_builder.cc @@ -267,6 +267,316 @@ class BlockBasedTableBuilder::BlockBasedTablePropertiesCollector bool decoupled_partitioned_filters_; }; + struct BlockBasedTableBuilder::ParallelCompressionRep { + // TODO: consider replacing with autovector or similar + // Keys is a wrapper of vector of strings avoiding + // releasing string memories during vector clear() + // in order to save memory allocation overhead + class Keys { + public: + Keys() : keys_(kKeysInitSize), size_(0) {} + void PushBack(const Slice& key) { + if (size_ == keys_.size()) { + keys_.emplace_back(key.data(), key.size()); + } else { + keys_[size_].assign(key.data(), key.size()); + } + size_++; + } + void SwapAssign(std::vector& keys) { + size_ = keys.size(); + std::swap(keys_, keys); + } + void Clear() { size_ = 0; } + size_t Size() { return size_; } + std::string& Back() { return keys_[size_ - 1]; } + std::string& operator[](size_t idx) { + assert(idx < size_); + return keys_[idx]; + } + + private: + const size_t kKeysInitSize = 32; + std::vector keys_; + size_t size_; + }; + std::unique_ptr curr_block_keys; + + class BlockRepSlot; + + // BlockRep instances are fetched from and recycled to + // block_rep_pool during parallel compression. + struct BlockRep { + Slice contents; + Slice compressed_contents; + std::unique_ptr data; + std::unique_ptr compressed_data; + CompressionType compression_type; + std::unique_ptr first_key_in_next_block; + std::unique_ptr keys; + std::unique_ptr slot; + Status status; + }; + // Use a vector of BlockRep as a buffer for a determined number + // of BlockRep structures. All data referenced by pointers in + // BlockRep will be freed when this vector is destructed. + using BlockRepBuffer = std::vector; + BlockRepBuffer block_rep_buf; + // Use a thread-safe queue for concurrent access from block + // building thread and writer thread. + using BlockRepPool = WorkQueue; + BlockRepPool block_rep_pool; + + // Use BlockRepSlot to keep block order in write thread. + // slot_ will pass references to BlockRep + class BlockRepSlot { + public: + BlockRepSlot() : slot_(1) {} + template + void Fill(T&& rep) { + slot_.push(std::forward(rep)); + } + void Take(BlockRep*& rep) { slot_.pop(rep); } + + private: + // slot_ will pass references to BlockRep in block_rep_buf, + // and those references are always valid before the destruction of + // block_rep_buf. + WorkQueue slot_; + }; + + // Compression queue will pass references to BlockRep in block_rep_buf, + // and those references are always valid before the destruction of + // block_rep_buf. + using CompressQueue = WorkQueue; + CompressQueue compress_queue; + std::vector compress_thread_pool; + + // Write queue will pass references to BlockRep::slot in block_rep_buf, + // and those references are always valid before the corresponding + // BlockRep::slot is destructed, which is before the destruction of + // block_rep_buf. + using WriteQueue = WorkQueue; + WriteQueue write_queue; + std::unique_ptr write_thread; + + // Estimate output file size when parallel compression is enabled. This is + // necessary because compression & flush are no longer synchronized, + // and BlockBasedTableBuilder::FileSize() is no longer accurate. + // memory_order_relaxed suffices because accurate statistics is not required. + class FileSizeEstimator { + public: + explicit FileSizeEstimator() + : uncomp_bytes_compressed(0), + uncomp_bytes_curr_block(0), + uncomp_bytes_curr_block_set(false), + uncomp_bytes_inflight(0), + blocks_inflight(0), + curr_compression_ratio(0), + estimated_file_size(0) {} + + // Estimate file size when a block is about to be emitted to + // compression thread + void EmitBlock(uint64_t uncomp_block_size, uint64_t curr_file_size) { + uint64_t new_uncomp_bytes_inflight = + uncomp_bytes_inflight.fetch_add(uncomp_block_size, + std::memory_order_relaxed) + + uncomp_block_size; + + uint64_t new_blocks_inflight = + blocks_inflight.fetch_add(1, std::memory_order_relaxed) + 1; + + estimated_file_size.store( + curr_file_size + + static_cast( + static_cast(new_uncomp_bytes_inflight) * + curr_compression_ratio.load(std::memory_order_relaxed)) + + new_blocks_inflight * kBlockTrailerSize, + std::memory_order_relaxed); + } + + // Estimate file size when a block is already reaped from + // compression thread + void ReapBlock(uint64_t compressed_block_size, uint64_t curr_file_size) { + assert(uncomp_bytes_curr_block_set); + + uint64_t new_uncomp_bytes_compressed = + uncomp_bytes_compressed + uncomp_bytes_curr_block; + assert(new_uncomp_bytes_compressed > 0); + + curr_compression_ratio.store( + (curr_compression_ratio.load(std::memory_order_relaxed) * + uncomp_bytes_compressed + + compressed_block_size) / + static_cast(new_uncomp_bytes_compressed), + std::memory_order_relaxed); + uncomp_bytes_compressed = new_uncomp_bytes_compressed; + + uint64_t new_uncomp_bytes_inflight = + uncomp_bytes_inflight.fetch_sub(uncomp_bytes_curr_block, + std::memory_order_relaxed) - + uncomp_bytes_curr_block; + + uint64_t new_blocks_inflight = + blocks_inflight.fetch_sub(1, std::memory_order_relaxed) - 1; + + estimated_file_size.store( + curr_file_size + + static_cast( + static_cast(new_uncomp_bytes_inflight) * + curr_compression_ratio.load(std::memory_order_relaxed)) + + new_blocks_inflight * kBlockTrailerSize, + std::memory_order_relaxed); + + uncomp_bytes_curr_block_set = false; + } + + void SetEstimatedFileSize(uint64_t size) { + estimated_file_size.store(size, std::memory_order_relaxed); + } + + uint64_t GetEstimatedFileSize() { + return estimated_file_size.load(std::memory_order_relaxed); + } + + void SetCurrBlockUncompSize(uint64_t size) { + uncomp_bytes_curr_block = size; + uncomp_bytes_curr_block_set = true; + } + + private: + // Input bytes compressed so far. + uint64_t uncomp_bytes_compressed; + // Size of current block being appended. + uint64_t uncomp_bytes_curr_block; + // Whether uncomp_bytes_curr_block has been set for next + // ReapBlock call. + bool uncomp_bytes_curr_block_set; + // Input bytes under compression and not appended yet. + std::atomic uncomp_bytes_inflight; + // Number of blocks under compression and not appended yet. + std::atomic blocks_inflight; + // Current compression ratio, maintained by BGWorkWriteMaybeCompressedBlock. + std::atomic curr_compression_ratio; + // Estimated SST file size. + std::atomic estimated_file_size; + }; + FileSizeEstimator file_size_estimator; + + // Facilities used for waiting first block completion. Need to Wait for + // the completion of first block compression and flush to get a non-zero + // compression ratio. + std::atomic first_block_processed; + std::condition_variable first_block_cond; + std::mutex first_block_mutex; + + explicit ParallelCompressionRep(uint32_t parallel_threads) + : curr_block_keys(new Keys()), + block_rep_buf(parallel_threads), + block_rep_pool(parallel_threads), + compress_queue(parallel_threads), + write_queue(parallel_threads), + first_block_processed(false) { + for (uint32_t i = 0; i < parallel_threads; i++) { + block_rep_buf[i].contents = Slice(); + block_rep_buf[i].compressed_contents = Slice(); + block_rep_buf[i].data.reset(new std::string()); + block_rep_buf[i].compressed_data.reset(new std::string()); + block_rep_buf[i].compression_type = CompressionType(); + block_rep_buf[i].first_key_in_next_block.reset(new std::string()); + block_rep_buf[i].keys.reset(new Keys()); + block_rep_buf[i].slot.reset(new BlockRepSlot()); + block_rep_buf[i].status = Status::OK(); + block_rep_pool.push(&block_rep_buf[i]); + } + } + + ~ParallelCompressionRep() { block_rep_pool.finish(); } + + // Make a block prepared to be emitted to compression thread + // Used in non-buffered mode + BlockRep* PrepareBlock(CompressionType compression_type, + const Slice* first_key_in_next_block, + BlockBuilder* data_block) { + BlockRep* block_rep = + PrepareBlockInternal(compression_type, first_key_in_next_block); + assert(block_rep != nullptr); + data_block->SwapAndReset(*(block_rep->data)); + block_rep->contents = *(block_rep->data); + std::swap(block_rep->keys, curr_block_keys); + curr_block_keys->Clear(); + return block_rep; + } + + // Used in EnterUnbuffered + BlockRep* PrepareBlock(CompressionType compression_type, + const Slice* first_key_in_next_block, + std::string* data_block, + std::vector* keys) { + BlockRep* block_rep = + PrepareBlockInternal(compression_type, first_key_in_next_block); + assert(block_rep != nullptr); + std::swap(*(block_rep->data), *data_block); + block_rep->contents = *(block_rep->data); + block_rep->keys->SwapAssign(*keys); + return block_rep; + } + + // Emit a block to compression thread + void EmitBlock(BlockRep* block_rep) { + assert(block_rep != nullptr); + assert(block_rep->status.ok()); + if (!write_queue.push(block_rep->slot.get())) { + return; + } + if (!compress_queue.push(block_rep)) { + return; + } + + if (!first_block_processed.load(std::memory_order_relaxed)) { + std::unique_lock lock(first_block_mutex); + first_block_cond.wait(lock, [this] { + return first_block_processed.load(std::memory_order_relaxed); + }); + } + } + + // Reap a block from compression thread + void ReapBlock(BlockRep* block_rep) { + assert(block_rep != nullptr); + block_rep->compressed_data->clear(); + block_rep_pool.push(block_rep); + + if (!first_block_processed.load(std::memory_order_relaxed)) { + std::lock_guard lock(first_block_mutex); + first_block_processed.store(true, std::memory_order_relaxed); + first_block_cond.notify_one(); + } + } + + private: + BlockRep* PrepareBlockInternal(CompressionType compression_type, + const Slice* first_key_in_next_block) { + BlockRep* block_rep = nullptr; + block_rep_pool.pop(block_rep); + assert(block_rep != nullptr); + + assert(block_rep->data); + + block_rep->compression_type = compression_type; + + if (first_key_in_next_block == nullptr) { + block_rep->first_key_in_next_block.reset(nullptr); + } else { + block_rep->first_key_in_next_block->assign( + first_key_in_next_block->data(), first_key_in_next_block->size()); + } + + return block_rep; + } + }; + + struct BlockBasedTableBuilder::Rep { const ImmutableOptions ioptions; // BEGIN from MutableCFOptions @@ -667,314 +977,6 @@ struct BlockBasedTableBuilder::Rep { IOStatus io_status; }; -struct BlockBasedTableBuilder::ParallelCompressionRep { - // TODO: consider replacing with autovector or similar - // Keys is a wrapper of vector of strings avoiding - // releasing string memories during vector clear() - // in order to save memory allocation overhead - class Keys { - public: - Keys() : keys_(kKeysInitSize), size_(0) {} - void PushBack(const Slice& key) { - if (size_ == keys_.size()) { - keys_.emplace_back(key.data(), key.size()); - } else { - keys_[size_].assign(key.data(), key.size()); - } - size_++; - } - void SwapAssign(std::vector& keys) { - size_ = keys.size(); - std::swap(keys_, keys); - } - void Clear() { size_ = 0; } - size_t Size() { return size_; } - std::string& Back() { return keys_[size_ - 1]; } - std::string& operator[](size_t idx) { - assert(idx < size_); - return keys_[idx]; - } - - private: - const size_t kKeysInitSize = 32; - std::vector keys_; - size_t size_; - }; - std::unique_ptr curr_block_keys; - - class BlockRepSlot; - - // BlockRep instances are fetched from and recycled to - // block_rep_pool during parallel compression. - struct BlockRep { - Slice contents; - Slice compressed_contents; - std::unique_ptr data; - std::unique_ptr compressed_data; - CompressionType compression_type; - std::unique_ptr first_key_in_next_block; - std::unique_ptr keys; - std::unique_ptr slot; - Status status; - }; - // Use a vector of BlockRep as a buffer for a determined number - // of BlockRep structures. All data referenced by pointers in - // BlockRep will be freed when this vector is destructed. - using BlockRepBuffer = std::vector; - BlockRepBuffer block_rep_buf; - // Use a thread-safe queue for concurrent access from block - // building thread and writer thread. - using BlockRepPool = WorkQueue; - BlockRepPool block_rep_pool; - - // Use BlockRepSlot to keep block order in write thread. - // slot_ will pass references to BlockRep - class BlockRepSlot { - public: - BlockRepSlot() : slot_(1) {} - template - void Fill(T&& rep) { - slot_.push(std::forward(rep)); - } - void Take(BlockRep*& rep) { slot_.pop(rep); } - - private: - // slot_ will pass references to BlockRep in block_rep_buf, - // and those references are always valid before the destruction of - // block_rep_buf. - WorkQueue slot_; - }; - - // Compression queue will pass references to BlockRep in block_rep_buf, - // and those references are always valid before the destruction of - // block_rep_buf. - using CompressQueue = WorkQueue; - CompressQueue compress_queue; - std::vector compress_thread_pool; - - // Write queue will pass references to BlockRep::slot in block_rep_buf, - // and those references are always valid before the corresponding - // BlockRep::slot is destructed, which is before the destruction of - // block_rep_buf. - using WriteQueue = WorkQueue; - WriteQueue write_queue; - std::unique_ptr write_thread; - - // Estimate output file size when parallel compression is enabled. This is - // necessary because compression & flush are no longer synchronized, - // and BlockBasedTableBuilder::FileSize() is no longer accurate. - // memory_order_relaxed suffices because accurate statistics is not required. - class FileSizeEstimator { - public: - explicit FileSizeEstimator() - : uncomp_bytes_compressed(0), - uncomp_bytes_curr_block(0), - uncomp_bytes_curr_block_set(false), - uncomp_bytes_inflight(0), - blocks_inflight(0), - curr_compression_ratio(0), - estimated_file_size(0) {} - - // Estimate file size when a block is about to be emitted to - // compression thread - void EmitBlock(uint64_t uncomp_block_size, uint64_t curr_file_size) { - uint64_t new_uncomp_bytes_inflight = - uncomp_bytes_inflight.fetch_add(uncomp_block_size, - std::memory_order_relaxed) + - uncomp_block_size; - - uint64_t new_blocks_inflight = - blocks_inflight.fetch_add(1, std::memory_order_relaxed) + 1; - - estimated_file_size.store( - curr_file_size + - static_cast( - static_cast(new_uncomp_bytes_inflight) * - curr_compression_ratio.load(std::memory_order_relaxed)) + - new_blocks_inflight * kBlockTrailerSize, - std::memory_order_relaxed); - } - - // Estimate file size when a block is already reaped from - // compression thread - void ReapBlock(uint64_t compressed_block_size, uint64_t curr_file_size) { - assert(uncomp_bytes_curr_block_set); - - uint64_t new_uncomp_bytes_compressed = - uncomp_bytes_compressed + uncomp_bytes_curr_block; - assert(new_uncomp_bytes_compressed > 0); - - curr_compression_ratio.store( - (curr_compression_ratio.load(std::memory_order_relaxed) * - uncomp_bytes_compressed + - compressed_block_size) / - static_cast(new_uncomp_bytes_compressed), - std::memory_order_relaxed); - uncomp_bytes_compressed = new_uncomp_bytes_compressed; - - uint64_t new_uncomp_bytes_inflight = - uncomp_bytes_inflight.fetch_sub(uncomp_bytes_curr_block, - std::memory_order_relaxed) - - uncomp_bytes_curr_block; - - uint64_t new_blocks_inflight = - blocks_inflight.fetch_sub(1, std::memory_order_relaxed) - 1; - - estimated_file_size.store( - curr_file_size + - static_cast( - static_cast(new_uncomp_bytes_inflight) * - curr_compression_ratio.load(std::memory_order_relaxed)) + - new_blocks_inflight * kBlockTrailerSize, - std::memory_order_relaxed); - - uncomp_bytes_curr_block_set = false; - } - - void SetEstimatedFileSize(uint64_t size) { - estimated_file_size.store(size, std::memory_order_relaxed); - } - - uint64_t GetEstimatedFileSize() { - return estimated_file_size.load(std::memory_order_relaxed); - } - - void SetCurrBlockUncompSize(uint64_t size) { - uncomp_bytes_curr_block = size; - uncomp_bytes_curr_block_set = true; - } - - private: - // Input bytes compressed so far. - uint64_t uncomp_bytes_compressed; - // Size of current block being appended. - uint64_t uncomp_bytes_curr_block; - // Whether uncomp_bytes_curr_block has been set for next - // ReapBlock call. - bool uncomp_bytes_curr_block_set; - // Input bytes under compression and not appended yet. - std::atomic uncomp_bytes_inflight; - // Number of blocks under compression and not appended yet. - std::atomic blocks_inflight; - // Current compression ratio, maintained by BGWorkWriteMaybeCompressedBlock. - std::atomic curr_compression_ratio; - // Estimated SST file size. - std::atomic estimated_file_size; - }; - FileSizeEstimator file_size_estimator; - - // Facilities used for waiting first block completion. Need to Wait for - // the completion of first block compression and flush to get a non-zero - // compression ratio. - std::atomic first_block_processed; - std::condition_variable first_block_cond; - std::mutex first_block_mutex; - - explicit ParallelCompressionRep(uint32_t parallel_threads) - : curr_block_keys(new Keys()), - block_rep_buf(parallel_threads), - block_rep_pool(parallel_threads), - compress_queue(parallel_threads), - write_queue(parallel_threads), - first_block_processed(false) { - for (uint32_t i = 0; i < parallel_threads; i++) { - block_rep_buf[i].contents = Slice(); - block_rep_buf[i].compressed_contents = Slice(); - block_rep_buf[i].data.reset(new std::string()); - block_rep_buf[i].compressed_data.reset(new std::string()); - block_rep_buf[i].compression_type = CompressionType(); - block_rep_buf[i].first_key_in_next_block.reset(new std::string()); - block_rep_buf[i].keys.reset(new Keys()); - block_rep_buf[i].slot.reset(new BlockRepSlot()); - block_rep_buf[i].status = Status::OK(); - block_rep_pool.push(&block_rep_buf[i]); - } - } - - ~ParallelCompressionRep() { block_rep_pool.finish(); } - - // Make a block prepared to be emitted to compression thread - // Used in non-buffered mode - BlockRep* PrepareBlock(CompressionType compression_type, - const Slice* first_key_in_next_block, - BlockBuilder* data_block) { - BlockRep* block_rep = - PrepareBlockInternal(compression_type, first_key_in_next_block); - assert(block_rep != nullptr); - data_block->SwapAndReset(*(block_rep->data)); - block_rep->contents = *(block_rep->data); - std::swap(block_rep->keys, curr_block_keys); - curr_block_keys->Clear(); - return block_rep; - } - - // Used in EnterUnbuffered - BlockRep* PrepareBlock(CompressionType compression_type, - const Slice* first_key_in_next_block, - std::string* data_block, - std::vector* keys) { - BlockRep* block_rep = - PrepareBlockInternal(compression_type, first_key_in_next_block); - assert(block_rep != nullptr); - std::swap(*(block_rep->data), *data_block); - block_rep->contents = *(block_rep->data); - block_rep->keys->SwapAssign(*keys); - return block_rep; - } - - // Emit a block to compression thread - void EmitBlock(BlockRep* block_rep) { - assert(block_rep != nullptr); - assert(block_rep->status.ok()); - if (!write_queue.push(block_rep->slot.get())) { - return; - } - if (!compress_queue.push(block_rep)) { - return; - } - - if (!first_block_processed.load(std::memory_order_relaxed)) { - std::unique_lock lock(first_block_mutex); - first_block_cond.wait(lock, [this] { - return first_block_processed.load(std::memory_order_relaxed); - }); - } - } - - // Reap a block from compression thread - void ReapBlock(BlockRep* block_rep) { - assert(block_rep != nullptr); - block_rep->compressed_data->clear(); - block_rep_pool.push(block_rep); - - if (!first_block_processed.load(std::memory_order_relaxed)) { - std::lock_guard lock(first_block_mutex); - first_block_processed.store(true, std::memory_order_relaxed); - first_block_cond.notify_one(); - } - } - - private: - BlockRep* PrepareBlockInternal(CompressionType compression_type, - const Slice* first_key_in_next_block) { - BlockRep* block_rep = nullptr; - block_rep_pool.pop(block_rep); - assert(block_rep != nullptr); - - assert(block_rep->data); - - block_rep->compression_type = compression_type; - - if (first_key_in_next_block == nullptr) { - block_rep->first_key_in_next_block.reset(nullptr); - } else { - block_rep->first_key_in_next_block->assign( - first_key_in_next_block->data(), first_key_in_next_block->size()); - } - - return block_rep; - } -}; BlockBasedTableBuilder::BlockBasedTableBuilder( const BlockBasedTableOptions& table_options, const TableBuilderOptions& tbo, diff --git a/util/xxhash.h b/util/xxhash.h index 293a331d994..abd1f90daad 100644 --- a/util/xxhash.h +++ b/util/xxhash.h @@ -2066,20 +2066,6 @@ static int XXH_isLittleEndian(void) # define XXH_HAS_BUILTIN(x) 0 #endif -namespace std{ -[[noreturn]] inline void unreachable() -{ - // Uses compiler specific extensions if possible. - // Even if no extension is used, undefined behavior is still raised by - // an empty function body and the noreturn attribute. -#if defined(_MSC_VER) && !defined(__clang__) // MSVC - __assume(false); -#else // GCC, Clang - __builtin_unreachable(); -#endif -} -} - #if defined(__STDC_VERSION__) && (__STDC_VERSION__ > 201710L) /* C23 and future versions have standard "unreachable()" */ # include