diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/RecordCoreTimeoutException.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/RecordCoreTimeoutException.java new file mode 100644 index 0000000000..be861f94e6 --- /dev/null +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/RecordCoreTimeoutException.java @@ -0,0 +1,56 @@ +/* + * RecordCoreTimeoutException.java + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2015-2025 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.apple.foundationdb.record; + +import com.apple.foundationdb.annotation.API; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** + * This exception extends {@link RecordCoreException} and is specifically used to indicate + * timeout-related failures. + */ +@API(API.Status.UNSTABLE) +@SuppressWarnings("serial") +public class RecordCoreTimeoutException extends RecordCoreException { + public RecordCoreTimeoutException(@Nonnull String msg, @Nullable Object ... keyValues) { + super(msg, keyValues); + } + + public RecordCoreTimeoutException(Throwable cause) { + super(cause); + } + + public RecordCoreTimeoutException(@Nonnull String msg, @Nullable Throwable cause) { + super(msg, cause); + } + + public RecordCoreTimeoutException(@Nonnull String msg) { + super(msg); + } + + protected RecordCoreTimeoutException(String message, Throwable cause, + boolean enableSuppression, + boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingBase.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingBase.java index 5095076d57..b23228f837 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingBase.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingBase.java @@ -20,7 +20,6 @@ package com.apple.foundationdb.record.provider.foundationdb; -import com.apple.foundationdb.FDBError; import com.apple.foundationdb.FDBException; import com.apple.foundationdb.MutationType; import com.apple.foundationdb.annotation.API; @@ -59,7 +58,6 @@ import java.time.DateTimeException; import java.time.Instant; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -1213,22 +1211,6 @@ protected static T findException(@Nullable Throwable ex, Class classT) { } return null; } - - protected static boolean shouldLessenWork(@Nullable FDBException ex) { - // These error codes represent a list of errors that can occur if there is too much work to be done - // in a single transaction. - if (ex == null) { - return false; - } - final Set lessenWorkCodes = new HashSet<>(Arrays.asList( - FDBError.TIMED_OUT.code(), - FDBError.TRANSACTION_TOO_OLD.code(), - FDBError.NOT_COMMITTED.code(), - FDBError.TRANSACTION_TIMED_OUT.code(), - FDBError.COMMIT_READ_INCOMPLETE.code(), - FDBError.TRANSACTION_TOO_LARGE.code())); - return lessenWorkCodes.contains(ex.getCode()); - } } diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingMerger.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingMerger.java index b3a4cb6dc6..994a7613ab 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingMerger.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingMerger.java @@ -22,6 +22,7 @@ import com.apple.foundationdb.FDBException; import com.apple.foundationdb.async.AsyncUtil; +import com.apple.foundationdb.record.RecordCoreTimeoutException; import com.apple.foundationdb.record.logging.KeyValueLogMessage; import com.apple.foundationdb.record.logging.LogMessageKeys; import com.apple.foundationdb.record.metadata.Index; @@ -37,6 +38,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -148,11 +150,10 @@ private CompletableFuture handleFailure(final IndexDeferredMaintenanceC // merges. Not perfect, but as long as it's rare the impact should be minimal. mergeControl.mergeHadFailed(); // report to adjust stats - final FDBException ex = IndexingBase.findException(e, FDBException.class); - final IndexDeferredMaintenanceControl.LastStep lastStep = mergeControl.getLastStep(); - if (!IndexingBase.shouldLessenWork(ex)) { + if (shouldAbort(e)) { giveUpMerging(mergeControl, e); } + final IndexDeferredMaintenanceControl.LastStep lastStep = mergeControl.getLastStep(); switch (lastStep) { case REPARTITION: // Here: this exception might be resolved by reducing the number of documents to move during repartitioning @@ -175,6 +176,21 @@ private CompletableFuture handleFailure(final IndexDeferredMaintenanceC return AsyncUtil.READY_TRUE; // and retry } + private boolean shouldAbort(@Nullable Throwable e) { + if (e == null) { + return true; + } + final FDBException fdbException = IndexingBase.findException(e, FDBException.class); + if (fdbException != null) { + // abort for certain fdb codes + return fdbException.getCode() == 1051 || // batch_transaction_throttled + fdbException.getCode() == 1213 ; // tag_throttled + } + // abort of not a timeout error + return (IndexingBase.findException(e, RecordCoreTimeoutException.class) == null && + IndexingBase.findException(e, TimeoutException.class) == null); + } + private void handleRepartitioningFailure(final IndexDeferredMaintenanceControl mergeControl, Throwable e) { repartitionDocumentCount = mergeControl.getRepartitionDocumentCount(); if (repartitionDocumentCount == -1) { diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingThrottle.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingThrottle.java index c9e76ee1ef..e687857f49 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingThrottle.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/IndexingThrottle.java @@ -20,6 +20,7 @@ package com.apple.foundationdb.record.provider.foundationdb; +import com.apple.foundationdb.FDBError; import com.apple.foundationdb.FDBException; import com.apple.foundationdb.annotation.API; import com.apple.foundationdb.async.AsyncUtil; @@ -60,6 +61,13 @@ public class IndexingThrottle { @Nonnull private static final Logger LOGGER = LoggerFactory.getLogger(IndexingThrottle.class); + @Nonnull private static final Set lessenWorkCodes = new HashSet<>(Arrays.asList( + FDBError.TIMED_OUT.code(), + FDBError.TRANSACTION_TOO_OLD.code(), + FDBError.NOT_COMMITTED.code(), + FDBError.TRANSACTION_TIMED_OUT.code(), + FDBError.COMMIT_READ_INCOMPLETE.code(), + FDBError.TRANSACTION_TOO_LARGE.code())); @Nonnull private final IndexingCommon common; @Nonnull private final Booker booker; private final boolean isScrubber; @@ -133,7 +141,7 @@ boolean mayRetryAfterHandlingException(@Nullable FDBException fdbException, @Nullable List additionalLogMessageKeyValues, int currTries, final boolean adjustLimits) { - if (currTries >= common.config.getMaxRetries() || !IndexingBase.shouldLessenWork(fdbException)) { + if (currTries >= common.config.getMaxRetries() || !shouldLessenWork(fdbException)) { // Here: should not retry or no more retries. There is no real need to handle limits. return false; } @@ -144,6 +152,15 @@ boolean mayRetryAfterHandlingException(@Nullable FDBException fdbException, return true; } + private static boolean shouldLessenWork(@Nullable FDBException ex) { + // These error codes represent a list of errors that can occur if there is too much work to be done + // in a single transaction. + if (ex == null) { + return false; + } + return lessenWorkCodes.contains(ex.getCode()); + } + void decreaseLimit(@Nonnull FDBException fdbException, @Nullable List additionalLogMessageKeyValues) { // TODO: decrease the limit only for certain errors diff --git a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/TestHelpers.java b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/TestHelpers.java index f07520e62f..9170496d25 100644 --- a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/TestHelpers.java +++ b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/TestHelpers.java @@ -37,8 +37,11 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.Collections; +import java.util.IdentityHashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -245,6 +248,19 @@ public static void consistently(String description, Supplier underTest, M } } + public static T findCause(@Nullable Throwable ex, Class classT) { + Set seenSet = Collections.newSetFromMap(new IdentityHashMap<>()); + for (Throwable current = ex; + current != null && !seenSet.contains(current); + current = current.getCause()) { + if (classT.isInstance(current)) { + return classT.cast(current); + } + seenSet.add(current); + } + return null; + } + /** * A matcher that an exception's message contains a given string. */ diff --git a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexerMergeTest.java b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexerMergeTest.java index 4aded5dcc7..f2258434e7 100644 --- a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexerMergeTest.java +++ b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/OnlineIndexerMergeTest.java @@ -28,10 +28,12 @@ import com.apple.foundationdb.record.IndexEntry; import com.apple.foundationdb.record.IndexScanType; import com.apple.foundationdb.record.IsolationLevel; +import com.apple.foundationdb.record.RecordCoreTimeoutException; import com.apple.foundationdb.record.RecordCursor; import com.apple.foundationdb.record.RecordMetaData; import com.apple.foundationdb.record.RecordMetaDataBuilder; import com.apple.foundationdb.record.ScanProperties; +import com.apple.foundationdb.record.TestHelpers; import com.apple.foundationdb.record.TestRecords1Proto; import com.apple.foundationdb.record.TupleRange; import com.apple.foundationdb.record.metadata.Index; @@ -44,12 +46,14 @@ import com.apple.foundationdb.record.provider.foundationdb.properties.RecordLayerPropertyStorage; import com.apple.foundationdb.record.query.QueryToKeyMatcher; import com.apple.foundationdb.tuple.Tuple; +import com.apple.test.BooleanSource; import com.apple.test.Tags; import com.google.auto.service.AutoService; import com.google.protobuf.Message; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -58,6 +62,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -67,6 +72,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; /** * Tests specifically of {@link OnlineIndexer#mergeIndex()}. @@ -197,6 +203,166 @@ void testMergeTimeout() { assertEquals(repeat(0, mergeLimits.size()), repartitionLimits); } + /** + * Test that a RuntimeException (non-FDB, non-timeout) causes the merger to abort immediately. + */ + @Test + void testNonRetriableExceptionAborts() { + final String indexType = "nonRetriableExceptionIndex"; + AtomicInteger attemptCount = new AtomicInteger(0); + + TestFactory.register(indexType, state -> { + adjustMergeControl(state); + attemptCount.incrementAndGet(); + + // Throw a RuntimeException that is not retriable + final CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(new IllegalStateException("Non-retriable error")); + return future; + }); + + final FDBRecordStore.Builder storeBuilder = createStore(indexType); + try (OnlineIndexer indexer = OnlineIndexer.newBuilder() + .setRecordStoreBuilder(storeBuilder) + .setTargetIndexesByName(List.of(INDEX_NAME)) + .setMaxAttempts(10) + .build()) { + Assertions.assertThrows(IllegalStateException.class, indexer::mergeIndex); + } + + // Should only attempt once, no retries + assertEquals(1, attemptCount.get()); + } + + /** + * Test that a TimeoutException causes retry behavior (not abort). + */ + @ParameterizedTest + @BooleanSource + void testTimeoutExceptionRetries(boolean customTimeout) { + final String indexType = "timeoutExceptionIndex"; + final String exceptionMessage = "my timeout"; + AtomicInteger attemptCount = new AtomicInteger(0); + + TestFactory.register(indexType, state -> { + adjustMergeControl(state); + + attemptCount.incrementAndGet(); + + // Throw a TimeoutException (not FDB timeout) + final CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally( + customTimeout ? + new CustomOperationTimeoutException(exceptionMessage) : + new TimeoutException(exceptionMessage)); + return future; + }); + + final FDBRecordStore.Builder storeBuilder = createStore(indexType); + try (OnlineIndexer indexer = OnlineIndexer.newBuilder() + .setRecordStoreBuilder(storeBuilder) + .setTargetIndexesByName(List.of(INDEX_NAME)) + .build()) { + Exception thrownException = Assertions.assertThrows(Exception.class, indexer::mergeIndex); + // Assert that the timeout exception is in the cause chain + var timeoutCause = + customTimeout ? + TestHelpers.findCause(thrownException, CustomOperationTimeoutException.class) : + TestHelpers.findCause(thrownException, TimeoutException.class); + Assertions.assertNotNull(timeoutCause); + Assertions.assertEquals(exceptionMessage, timeoutCause.getMessage()); + } + // Assert multiple retries + assertTrue(1 < attemptCount.get()); + } + + private static void adjustMergeControl(final IndexMaintainerState state) { + final IndexDeferredMaintenanceControl mergeControl = state.store.getIndexDeferredMaintenanceControl(); + mergeControl.setLastStep(IndexDeferredMaintenanceControl.LastStep.MERGE); + if (mergeControl.getMergesLimit() == 0) { + mergeControl.setMergesTried(10); + mergeControl.setMergesFound(10); + } else { + mergeControl.setMergesTried(mergeControl.getMergesLimit()); + mergeControl.setMergesFound(10); + } + } + + /** + * Test that an FDBException wrapped in another exception still triggers retry behavior. + */ + @Test + void testWrappedFDBExceptionRetries() { + final String indexType = "wrappedFdbExceptionIndex"; + AtomicInteger attemptCount = new AtomicInteger(0); + + TestFactory.register(indexType, state -> { + adjustMergeControl(state); + + attemptCount.incrementAndGet(); + + // Wrap FDBException in multiple layers to test deep unwrapping + final CompletableFuture future = new CompletableFuture<>(); + FDBException fdbEx = new FDBException("Transaction too old", FDBError.TRANSACTION_TOO_OLD.code()); + RuntimeException wrapper = new RuntimeException("Wrapper level 1", + new IllegalStateException("Wrapper level 2", fdbEx)); + future.completeExceptionally(wrapper); + return future; + }); + + final FDBRecordStore.Builder storeBuilder = createStore(indexType); + try (OnlineIndexer indexer = OnlineIndexer.newBuilder() + .setRecordStoreBuilder(storeBuilder) + .setTargetIndexesByName(List.of(INDEX_NAME)) + .setMaxAttempts(5) + .build()) { + + Exception thrownException = Assertions.assertThrows(Exception.class, indexer::mergeIndex); + + // Assert that FDBException is in the cause chain + FDBException fdbCause = TestHelpers.findCause(thrownException, FDBException.class); + Assertions.assertNotNull(fdbCause); + assertEquals(FDBError.TRANSACTION_TOO_OLD.code(), fdbCause.getCode()); + } + + // Assert multiple retries + assertTrue(1 < attemptCount.get()); + } + + /** + * Test that a non-retriable exception during REPARTITION phase causes immediate abort. + */ + @Test + void testNonRetriableExceptionDuringRepartitionAborts() { + final String indexType = "nonRetriableRepartitionIndex"; + AtomicInteger attemptCount = new AtomicInteger(0); + + TestFactory.register(indexType, state -> { + final IndexDeferredMaintenanceControl mergeControl = state.store.getIndexDeferredMaintenanceControl(); + mergeControl.setLastStep(IndexDeferredMaintenanceControl.LastStep.REPARTITION); + mergeControl.setRepartitionDocumentCount(20); + + attemptCount.incrementAndGet(); + + // Throw a non-retriable exception during repartition + final CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(new NullPointerException("Unexpected null during repartition")); + return future; + }); + + final FDBRecordStore.Builder storeBuilder = createStore(indexType); + try (OnlineIndexer indexer = OnlineIndexer.newBuilder() + .setRecordStoreBuilder(storeBuilder) + .setTargetIndexesByName(List.of(INDEX_NAME)) + .setMaxAttempts(10) + .build()) { + Assertions.assertThrows(NullPointerException.class, indexer::mergeIndex); + } + + // Should only attempt once, no retries + assertEquals(1, attemptCount.get()); + } + @Nonnull private FDBRecordStore.Builder createStore(@Nonnull final String indexType) { Index index = new Index(INDEX_NAME, Key.Expressions.field("num_value_2"), @@ -333,7 +499,15 @@ public CompletableFuture mergeIndex() { @Nonnull @Override public Iterable getIndexTypes() { - return List.of("repartitionTimeoutIndex", "mergeTimeoutIndex", "mergeLimitedIndex"); + return List.of( + "repartitionTimeoutIndex", + "mergeLimitedIndex", + "nonRetriableExceptionIndex", + "wrappedFdbExceptionIndex", + "mergeTimeoutIndex", + "timeoutExceptionIndex", + "nonRetriableRepartitionIndex" + ); } @Nonnull @@ -348,4 +522,11 @@ public IndexMaintainer getIndexMaintainer(@Nonnull IndexMaintainerState state) { return maintainers.get(state.index.getType()).apply(state); } } + + @SuppressWarnings("serial") + private static class CustomOperationTimeoutException extends RecordCoreTimeoutException { + public CustomOperationTimeoutException(String message) { + super(message); + } + } } diff --git a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneConcurrency.java b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneConcurrency.java index 35de0b7cb5..7d5da16b86 100644 --- a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneConcurrency.java +++ b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneConcurrency.java @@ -22,7 +22,7 @@ import com.apple.foundationdb.annotation.API; import com.apple.foundationdb.async.MoreAsyncUtil; -import com.apple.foundationdb.record.RecordCoreException; +import com.apple.foundationdb.record.RecordCoreTimeoutException; import com.apple.foundationdb.record.logging.LogMessageKeys; import com.apple.foundationdb.record.provider.common.StoreTimer; import com.apple.foundationdb.record.provider.foundationdb.FDBDatabase; @@ -119,7 +119,7 @@ private LuceneConcurrency() { /** * An exception that is thrown when the async to sync operation times out. */ - public static class AsyncToSyncTimeoutException extends RecordCoreException { + public static class AsyncToSyncTimeoutException extends RecordCoreTimeoutException { private static final long serialVersionUID = -1L; public AsyncToSyncTimeoutException(final String message, final Throwable cause) { diff --git a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/FDBLuceneIndexFailureTest.java b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/FDBLuceneIndexFailureTest.java index cf20e7ade8..2fbe10a946 100644 --- a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/FDBLuceneIndexFailureTest.java +++ b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/FDBLuceneIndexFailureTest.java @@ -192,10 +192,11 @@ void repartitionGroupedTestWithExceptionMapping(boolean useLegacyAsyncToSync, bo } // run partitioning without failure - make sure the index is still in good shape + timer.reset(); explicitMergeIndex(index, contextProps, schemaSetup, false, 0); try (FDBRecordContext context = openContext(contextProps)) { schemaSetup.accept(context); - assertEquals(2, getCounter(context, LuceneEvents.Counts.LUCENE_REPARTITION_CALLS).getCount()); + assertEquals(1, getCounter(context, LuceneEvents.Counts.LUCENE_REPARTITION_CALLS).getCount()); } partitionInfos = getPartitionMeta(index, groupingKey, contextProps, schemaSetup); // It should first move 6 from the most-recent to a new, older partition, then move 6 again into a partition diff --git a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java index 7fbe62d057..449133bb73 100644 --- a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java +++ b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java @@ -403,6 +403,10 @@ void flakyMergeQuick() throws IOException { /** * Test that the index is in a good state if the merge operation has errors. + * The test will validate that if Lucene merge fails randomly in the middle it will still be usable, and + * not corrupted. Having retries in the IndexingMerger means that it could heal, but we want to make + * sure that requests coming in while the merge is ongoing don't get a corrupted view. + * * @param isGrouped whether the index has a grouping key * @param isSynthetic whether the index is on a synthetic type * @param primaryKeySegmentIndexEnabled whether to enable the primaryKeySegmentIndex @@ -461,14 +465,18 @@ void flakyMerge(boolean isGrouped, return oldAsyncToSyncTimeout == null ? Duration.ofDays(1L) : oldAsyncToSyncTimeout.apply(wait); } }; - for (int i = 0; i < 100; i++) { + for (int i = 0; i < 20; i++) { fdb.setAsyncToSyncTimeout(asyncToSyncTimeout); waitCounts.set(i); boolean success = false; try { LOGGER.info(KeyValueLogMessage.of("Merge started", "iteration", i)); - explicitMergeIndex(dataModel.index, contextProps, dataModel.schemaSetup); + try (FDBRecordContext context = openContext(contextProps)) { + FDBRecordStore recordStore = Objects.requireNonNull(dataModel.schemaSetup.apply(context)); + final CompletableFuture future = recordStore.getIndexMaintainer(Objects.requireNonNull(dataModel.index)).mergeIndex(); + fdb.asyncToSync(timer, FDBStoreTimer.Waits.WAIT_ONLINE_MERGE_INDEX, future); + } LOGGER.info(KeyValueLogMessage.of("Merge completed", "iteration", i)); assertFalse(requireFailure && i < 15, i + " merge should have failed");