diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteFilter.java b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteFilter.java index 03c1c465a..18dc50387 100644 --- a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteFilter.java +++ b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteFilter.java @@ -149,7 +149,9 @@ private String serializeContent(DocumentWriteOperation doc) { jc = new JsonCanonicalizer(content); return jc.getEncodedString(); } catch (IOException e) { - // Going to improve this in the next PR, as I think we can throw an exception if Format = JSON. + // If the Format is actually JSON, then the write to MarkLogic should ultimately fail, which is the + // error message the user would want to see via a batch failure listener. So in all cases, if we cannot + // canonicalize something that appears to be JSON, we log a warning and return the original content for hashing. logger.warn("Unable to canonicalize JSON content for URI {}, using original content for hashing; cause: {}", doc.getUri(), e.getMessage()); } diff --git a/marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/IncrementalWriteTest.java b/marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/IncrementalWriteTest.java index 89c1417ee..438784f8e 100644 --- a/marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/IncrementalWriteTest.java +++ b/marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/IncrementalWriteTest.java @@ -5,12 +5,10 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.marklogic.client.DatabaseClient; -import com.marklogic.client.datamovement.DataMovementManager; -import com.marklogic.client.datamovement.WriteBatcher; import com.marklogic.client.document.DocumentWriteOperation; import com.marklogic.client.impl.DocumentWriteOperationImpl; import com.marklogic.client.io.DocumentMetadataHandle; +import com.marklogic.client.io.Format; import com.marklogic.client.io.JacksonHandle; import com.marklogic.client.io.StringHandle; import com.marklogic.client.test.AbstractClientTest; @@ -21,9 +19,9 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; +import java.util.concurrent.atomic.AtomicReference; -import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.*; class IncrementalWriteTest extends AbstractClientTest { @@ -33,23 +31,26 @@ class IncrementalWriteTest extends AbstractClientTest { AtomicInteger writtenCount = new AtomicInteger(); AtomicInteger skippedCount = new AtomicInteger(); + AtomicReference batchFailure = new AtomicReference<>(); ObjectMapper objectMapper = new ObjectMapper(); + List docs = new ArrayList<>(); IncrementalWriteFilter filter; @BeforeEach void setup() { // Need a user with eval privileges so that the eval filter can be tested. Common.client = Common.newEvalClient(); - } - @Test - void opticFilter() { + // Default filter implementation, should be suitable for most tests. filter = IncrementalWriteFilter.newBuilder() .onDocumentsSkipped(docs -> skippedCount.addAndGet(docs.length)) .build(); + } - runTest(); + @Test + void opticFilter() { + verifyIncrementalWriteWorks(); } @Test @@ -59,35 +60,11 @@ void evalFilter() { .onDocumentsSkipped(docs -> skippedCount.addAndGet(docs.length)) .build(); - runTest(); - } - - @Test - void filterRemovesAllDocuments() { - new WriteBatcherTemplate(Common.client).runWriteJob( - writeBatcher -> writeBatcher - .withDocumentWriteSetFilter(context -> context.getDatabaseClient().newDocumentManager().newWriteSet()) - .onBatchSuccess(batch -> writtenCount.addAndGet(batch.getItems().length)), - - writeBatcher -> { - for (int i = 1; i <= 10; i++) { - writeBatcher.add("/incremental/test/doc-" + i + ".xml", METADATA, new StringHandle("")); - } - } - ); - - assertEquals(0, writtenCount.get(), "No documents should have been written since the filter removed them all. " + - "This test is verifying that no error will occur either when the filter doesn't return any documents."); - assertCollectionSize("incremental-test", 0); + verifyIncrementalWriteWorks(); } @Test void jsonKeysOutOfOrder() { - filter = IncrementalWriteFilter.newBuilder() - .onDocumentsSkipped(docs -> skippedCount.addAndGet(docs.length)) - .build(); - - List docs = new ArrayList<>(); for (int i = 1; i <= 10; i++) { ObjectNode doc = objectMapper.createObjectNode(); doc.put("number", i); @@ -146,7 +123,34 @@ void jsonKeysOutOfOrderWithNoCanonicalization() { assertEquals(0, skippedCount.get()); } - private void runTest() { + @Test + void invalidJsonWithNoFormat() { + docs.add(new DocumentWriteOperationImpl("/not-json.txt", METADATA, new StringHandle("{\"not actually json"))); + writeDocs(docs); + + assertEquals(1, writtenCount.get(), "When the format is not specified and the content looks like JSON " + + "because it starts with a '{', the JSON canonicalization should fail and log a warning. The " + + "document should still be written with a hash generated based on the text in the document."); + + assertNull(batchFailure.get(), "No failure should have been thrown since the format on the content is " + + "not JSON, and thus the content should be hashed as text."); + } + + @Test + void invalidJsonWithFormat() { + docs.add(new DocumentWriteOperationImpl("/not.json", METADATA, new StringHandle("not actually json").withFormat(Format.JSON))); + writeDocs(docs); + + assertNotNull(batchFailure.get(), "A failure should have been thrown by the server since the content is not " + + "JSON. But the failure to canonicalize should still be logged, as the user will be far more interested " + + "in the error from the server."); + + String message = batchFailure.get().getMessage(); + assertTrue(message.contains("failed to apply resource at documents"), + "Expecting the server to throw an error. Actual message: " + message); + } + + private void verifyIncrementalWriteWorks() { writeTenDocuments(); assertEquals(10, writtenCount.get()); assertEquals(0, skippedCount.get(), "No docs should have been skipped on the first write."); @@ -201,26 +205,10 @@ private void writeDocs(List docs) { new WriteBatcherTemplate(Common.client).runWriteJob( writeBatcher -> writeBatcher .withDocumentWriteSetFilter(filter) - .onBatchSuccess(batch -> writtenCount.addAndGet(batch.getItems().length)), + .onBatchSuccess(batch -> writtenCount.addAndGet(batch.getItems().length)) + .onBatchFailure((batch, failure) -> batchFailure.set(failure)), writeBatcher -> docs.forEach(writeBatcher::add) ); } - - // Experimenting with a template that gets rid of some annoying DMSDK boilerplate. - private record WriteBatcherTemplate(DatabaseClient databaseClient) { - - public void runWriteJob(Consumer writeBatcherConfigurer, Consumer writeBatcherUser) { - try (DataMovementManager dmm = databaseClient.newDataMovementManager()) { - WriteBatcher writeBatcher = dmm.newWriteBatcher(); - writeBatcherConfigurer.accept(writeBatcher); - - dmm.startJob(writeBatcher); - writeBatcherUser.accept(writeBatcher); - writeBatcher.flushAndWait(); - writeBatcher.awaitCompletion(); - dmm.stopJob(writeBatcher); - } - } - } }