Skip to content
Open
4 changes: 2 additions & 2 deletions src/turtle_kv/core/algo/compute_running_total.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ namespace turtle_kv {

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
template <bool kDecayValue = false>
template <bool kDecayValue>
inline batt::RunningTotal compute_running_total(
batt::WorkerPool& worker_pool,
const MergeCompactor::ResultSet</*decay_to_items=*/false>& result_set,
const MergeCompactor::ResultSet<kDecayValue>& result_set,
DecayToItem<kDecayValue> decay_to_item [[maybe_unused]] = {})
{
auto merged_edits = result_set.get();
Expand Down
8 changes: 8 additions & 0 deletions src/turtle_kv/core/merge_compactor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,12 @@ template <bool kDecayToItems>
/*static*/ auto MergeCompactor::ResultSet<kDecayToItems>::concat(ResultSet&& first,
ResultSet&& second) -> ResultSet
{
if (first.size() > 0 && second.size() > 0) {
BATT_CHECK_LT(first.get_max_key(), second.get_min_key())
<< "All elements in the first ResultSet should be strictly less than the elements in the "
"second ResultSet!";
}

ResultSet ans;

//----- --- -- - - - -
Expand Down Expand Up @@ -495,6 +501,8 @@ template <bool kDecayToItems>
chunk_from_second.offset += first_size;
});

ans.chunks_.back().offset = first_size + second.chunks_.back().offset;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add regression test for this fix.


first.clear();
second.clear();

Expand Down
178 changes: 178 additions & 0 deletions src/turtle_kv/core/merge_compactor.test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <gmock/gmock.h>
#include <gtest/gtest.h>

#include <turtle_kv/core/testing/generate.hpp>
#include <turtle_kv/import/env.hpp>

#include <batteries/stream_util.hpp>
Expand All @@ -23,7 +24,10 @@ using namespace batt::int_types;
using batt::as_seq;
using batt::WorkerPool;

using llfs::StableStringStore;

using turtle_kv::CInterval;
using turtle_kv::DecayToItem;
using turtle_kv::EditSlice;
using turtle_kv::EditView;
using turtle_kv::getenv_as;
Expand All @@ -39,6 +43,8 @@ using turtle_kv::Status;
using turtle_kv::StatusOr;
using turtle_kv::ValueView;

using turtle_kv::testing::RandomStringGenerator;

namespace seq = turtle_kv::seq;

constexpr usize kNumKeys = 16;
Expand Down Expand Up @@ -482,4 +488,176 @@ TEST(MergeCompactor, ResultSetDropKeyRange)
}
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
class ResultSetConcatTest : public ::testing::Test
{
public:
void generate_edits(usize num_edits, bool needs_sort = true)
{
std::unordered_set<KeyView> keys_set;

std::default_random_engine rng{/*seed=*/30};
RandomStringGenerator generate_key;
while (this->all_edits_.size() < num_edits) {
KeyView key = generate_key(rng, this->store_);
if (keys_set.contains(key)) {
continue;
}
keys_set.emplace(key);
this->all_edits_.emplace_back(key,
ValueView::from_str(this->store_.store(std::string(100, 'a'))));
}

if (needs_sort) {
std::sort(this->all_edits_.begin(), this->all_edits_.end(), KeyOrder{});
} else {
if (std::is_sorted(this->all_edits_.begin(), this->all_edits_.end(), KeyOrder{})) {
std::swap(this->all_edits_.front(), this->all_edits_.back());
}
}
}

template <bool kDecayToItems>
MergeCompactor::ResultSet<kDecayToItems> concat(std::vector<EditView>&& first,
std::vector<EditView>&& second,
DecayToItem<kDecayToItems> decay_to_item)
{
usize first_size = first.size();
usize second_size = second.size();

MergeCompactor::ResultSet<kDecayToItems> first_result_set;
first_result_set.append(std::move(first));
MergeCompactor::ResultSet<kDecayToItems> second_result_set;
second_result_set.append(std::move(second));

EXPECT_EQ(first_result_set.size(), first_size);
EXPECT_EQ(second_result_set.size(), second_size);

MergeCompactor::ResultSet<kDecayToItems> concatenated_result_set =
MergeCompactor::ResultSet<kDecayToItems>::concat(std::move(first_result_set),
std::move(second_result_set));

return concatenated_result_set;
}

template <bool kDecayToItems>
void verify_result_set(const MergeCompactor::ResultSet<kDecayToItems>& result_set,
const std::vector<EditView>& edits)
{
EXPECT_EQ(result_set.size(), edits.size());

usize i = 0;
for (const EditView& edit : result_set.get()) {
EXPECT_EQ(edit, edits[i]);
++i;
}
}

llfs::StableStringStore store_;
std::vector<EditView> all_edits_;
};

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
TEST_F(ResultSetConcatTest, Concat)
{
// Generate an edit batch of size 200.
//
usize n = 200;
this->generate_edits(n);

// Divide the edit batch in half, and create ResultSet objects out of each half.
//
std::vector<EditView> first{this->all_edits_.begin(), this->all_edits_.begin() + (n / 2)};
std::vector<EditView> second{this->all_edits_.begin() + (n / 2), this->all_edits_.end()};

MergeCompactor::ResultSet<false> concatenated_result_set =
this->concat(std::move(first), std::move(second), DecayToItem<false>{});

// Concatenated ResultSet should have the same size as the original edit batch, and should
// also contain the same items in the same order.
//
this->verify_result_set(concatenated_result_set, this->all_edits_);

// Now, repeat the process qith unequal sized inputs
//
first.assign(this->all_edits_.begin(), this->all_edits_.begin() + (n / 4));
second.assign(this->all_edits_.begin() + (n / 4), this->all_edits_.end());

concatenated_result_set = this->concat(std::move(first), std::move(second), DecayToItem<false>{});

this->verify_result_set(concatenated_result_set, this->all_edits_);

// Finally, test with empty input.
//
first = {};
second.assign(this->all_edits_.begin(), this->all_edits_.begin() + (n / 4));

concatenated_result_set = this->concat(std::move(first), std::move(second), DecayToItem<false>{});

this->verify_result_set(concatenated_result_set,
{this->all_edits_.begin(), this->all_edits_.begin() + (n / 4)});

first = {};
second = {};
concatenated_result_set = this->concat(std::move(first), std::move(second), DecayToItem<false>{});
EXPECT_EQ(concatenated_result_set.size(), 0);
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
TEST_F(ResultSetConcatTest, FragmentedConcat)
{
usize n = 200;
this->generate_edits(n);

std::vector<EditView> first{this->all_edits_.begin(), this->all_edits_.begin() + (n / 2)};
std::vector<EditView> second{this->all_edits_.begin() + (n / 2), this->all_edits_.end()};

MergeCompactor::ResultSet<false> first_result_set;
first_result_set.append(std::move(first));
MergeCompactor::ResultSet<false> second_result_set;
second_result_set.append(std::move(second));

// Drop some keys fron the beginning of the ResultSet.
//
first_result_set.drop_before_n(n / 10);

// Drop some keys in the middle of the ResultSet.
//
auto second_range_begin = this->all_edits_.begin() + (3 * n / 5);
auto second_range_end = this->all_edits_.begin() + (3 * n / 4);
Interval<KeyView> second_range{second_range_begin->key, second_range_end->key};
second_result_set.drop_key_range_half_open(second_range);

MergeCompactor::ResultSet<false> concatenated_result_set =
MergeCompactor::ResultSet<false>::concat(std::move(first_result_set),
std::move(second_result_set));

std::vector<EditView> concat_edits{this->all_edits_.begin() + (n / 10),
this->all_edits_.begin() + (3 * n / 5)};
concat_edits.insert(concat_edits.end(),
this->all_edits_.begin() + (3 * n / 4),
this->all_edits_.end());
this->verify_result_set(concatenated_result_set, concat_edits);
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
TEST_F(ResultSetConcatTest, ConcatDeath)
{
usize n = 200;
this->generate_edits(n, /*needs_sort*/ false);

std::vector<EditView> first{this->all_edits_.begin(), this->all_edits_.begin() + (n / 2)};
std::vector<EditView> second{this->all_edits_.begin() + (n / 2), this->all_edits_.end()};

// We should panic since first and second have overlapping key ranges.
//
EXPECT_DEATH(this->concat(std::move(first), std::move(second), DecayToItem<false>{}),
"All elements in the first ResultSet should be strictly less than the elements in "
"the second ResultSet!");
}

} // namespace
23 changes: 19 additions & 4 deletions src/turtle_kv/core/testing/generate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <random>
#include <string>
#include <string_view>
#include <unordered_set>
#include <vector>

namespace turtle_kv {
Expand Down Expand Up @@ -184,21 +185,35 @@ class RandomResultSetGenerator : public MinMaxSize<usize{1} << 24>
}

template <bool kDecayToItems, typename Rng>
MergeCompactor::ResultSet</*kDecayToItems=*/kDecayToItems>
operator()(DecayToItem<kDecayToItems>, Rng& rng, llfs::StableStringStore& store)
MergeCompactor::ResultSet</*kDecayToItems=*/kDecayToItems> operator()(
DecayToItem<kDecayToItems>,
Rng& rng,
llfs::StableStringStore& store,
const std::vector<KeyView>& to_delete)
{
using ResultSet = MergeCompactor::ResultSet</*kDecayToItems=*/kDecayToItems>;
using Item = typename ResultSet::value_type;

const usize n = this->Super::pick_size(rng);
std::vector<EditView> items;

for (const KeyView& delete_key : to_delete) {
items.emplace_back(delete_key, ValueView::deleted());
}

std::unordered_set<KeyView> deleted_items_set{to_delete.begin(), to_delete.end()};
while (items.size() < n) {
for (usize i = items.size(); i < n; ++i) {
for (usize i = items.size(); i < n;) {
char ch = '_' + (i & 31);
items.emplace_back(this->key_generator_(rng, store),
KeyView key = this->key_generator_(rng, store);
if (deleted_items_set.count(key)) {
continue;
}
items.emplace_back(key,
ValueView::from_str(store.store(std::string(this->value_size_, ch))));
++i;
}

std::sort(items.begin(), items.end(), KeyOrder{});
items.erase(std::unique(items.begin(),
items.end(),
Expand Down
25 changes: 24 additions & 1 deletion src/turtle_kv/core/testing/generate.test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,14 @@

namespace {

using batt::int_types::usize;

using turtle_kv::DecayToItem;
using turtle_kv::ItemView;
using turtle_kv::KeyOrder;
using turtle_kv::KeyView;
using turtle_kv::StatusOr;
using turtle_kv::ValueView;
using turtle_kv::testing::RandomResultSetGenerator;

template <bool kDecayToItems>
Expand All @@ -24,10 +29,28 @@ TEST(GenerateTest, Test)

g.set_size(200);

ResultSet<true> result_set = g(DecayToItem<true>{}, rng, store);
std::vector<KeyView> to_delete;
ResultSet<true> result_set = g(DecayToItem<true>{}, rng, store, to_delete);

EXPECT_TRUE(std::is_sorted(result_set.get().begin(), result_set.get().end(), KeyOrder{}));
EXPECT_EQ(result_set.get().size(), 200u);

auto result_set_slice = result_set.get();
usize i = 0;
for (const ItemView& edit : result_set_slice) {
if (i % 2) {
to_delete.emplace_back(edit.key);
}
++i;
}

ResultSet<false> result_set_with_deletes = g(DecayToItem<false>{}, rng, store, to_delete);
for (const KeyView& deleted_key : to_delete) {
StatusOr<ValueView> deleted_value = result_set_with_deletes.find_key(deleted_key);
EXPECT_TRUE(deleted_value.ok());
EXPECT_EQ(*deleted_value, ValueView::deleted());
}
EXPECT_EQ(to_delete.size(), result_set_with_deletes.size() / 2);
}

} // namespace
29 changes: 18 additions & 11 deletions src/turtle_kv/kv_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -650,11 +650,19 @@ StatusOr<ValueView> KVStore::get(const KeyView& key) noexcept /*override*/
this->metrics_.mem_table_get_latency,
observed_state->mem_table_->get(key));

const auto return_memtable_value =
[](Optional<ValueView> mem_table_value,
FastCountMetric<u64>& get_count_metric) -> StatusOr<ValueView> {
get_count_metric.add(1);
if (mem_table_value->is_delete()) {
return {batt::StatusCode::kNotFound};
}
return *mem_table_value;
};

if (value) {
if (!value->needs_combine()) {
this->metrics_.mem_table_get_count.add(1);
// VLOG(1) << "found key " << batt::c_str_literal(key) << " in active MemTable";
return *value;
return return_memtable_value(value, this->metrics_.mem_table_get_count);
}
}

Expand All @@ -676,13 +684,15 @@ StatusOr<ValueView> KVStore::get(const KeyView& key) noexcept /*override*/
if (value) {
*value = combine(*value, *delta_value);
if (!value->needs_combine()) {
this->metrics_.delta_log2_get_count[batt::log2_ceil(observed_deltas_size - i)].add(1);
return *value;
return return_memtable_value(
value,
this->metrics_.delta_log2_get_count[batt::log2_ceil(observed_deltas_size - i)]);
}
} else {
if (!delta_value->needs_combine()) {
this->metrics_.delta_log2_get_count[batt::log2_ceil(observed_deltas_size - i)].add(1);
return *delta_value;
return return_memtable_value(
delta_value,
this->metrics_.delta_log2_get_count[batt::log2_ceil(observed_deltas_size - i)]);
}
value = delta_value;
}
Expand Down Expand Up @@ -757,7 +767,6 @@ StatusOr<usize> KVStore::scan_keys(const KeyView& min_key,
this->metrics_.scan_count.add(1);

KVStoreScanner scanner{*this, min_key};
scanner.set_keys_only(true);
BATT_REQUIRE_OK(scanner.start());

return scanner.read_keys(items_out);
Expand All @@ -766,9 +775,7 @@ StatusOr<usize> KVStore::scan_keys(const KeyView& min_key,
//
Status KVStore::remove(const KeyView& key) noexcept /*override*/
{
(void)key;

return batt::StatusCode::kUnimplemented;
return this->put(key, ValueView::deleted());
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
Expand Down
Loading