diff --git a/CHANGELOG.md b/CHANGELOG.md index 0672dcf8..4582ed8d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,8 @@ # Changelog ## [Unreleased] +### Fixed +- Fixed http sink NEP issue when flink job restore from checkpoint. ## [0.23.0] - 2025-11-07 diff --git a/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkInternal.java b/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkInternal.java index de37faac..8fca3085 100644 --- a/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkInternal.java +++ b/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkInternal.java @@ -117,11 +117,7 @@ protected HttpSinkInternal( public StatefulSinkWriter> createWriter( InitContext context) throws IOException { - ElementConverter elementConverter = getElementConverter(); - if (elementConverter instanceof SchemaLifecycleAwareElementConverter) { - // This cast is needed for Flink 1.15.3 build - ((SchemaLifecycleAwareElementConverter) elementConverter).open(context); - } + ElementConverter elementConverter = initElementConverterOfSchema(context); return new HttpSinkWriter<>( elementConverter, @@ -144,12 +140,22 @@ public StatefulSinkWriter> cr ); } + private ElementConverter initElementConverterOfSchema(InitContext context) { + ElementConverter elementConverter = getElementConverter(); + if (elementConverter instanceof SchemaLifecycleAwareElementConverter) { + ((SchemaLifecycleAwareElementConverter) elementConverter).open(context); + } + return elementConverter; + } + @Override public StatefulSinkWriter> restoreWriter( InitContext context, Collection> recoveredState) throws IOException { + initElementConverterOfSchema(context); + return new HttpSinkWriter<>( getElementConverter(), context, diff --git a/src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkInternalTest.java b/src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkInternalTest.java new file mode 100644 index 00000000..74d080b6 --- /dev/null +++ b/src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkInternalTest.java @@ -0,0 +1,70 @@ +package com.getindata.connectors.http.internal.sink; + +import com.getindata.connectors.http.HttpPostRequestCallback; +import com.getindata.connectors.http.SchemaLifecycleAwareElementConverter; +import com.getindata.connectors.http.internal.HeaderPreprocessor; +import com.getindata.connectors.http.internal.SinkHttpClient; +import com.getindata.connectors.http.internal.SinkHttpClientBuilder; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.metrics.groups.OperatorIOMetricGroup; +import org.apache.flink.metrics.groups.SinkWriterMetricGroup; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.Properties; + +import static org.mockito.Mockito.*; + +public class HttpSinkInternalTest { + + @Test + void testCreateWriterInitializesLifecycleAwareConverter() throws Exception { + SchemaLifecycleAwareElementConverter mockConverter = mock(SchemaLifecycleAwareElementConverter.class); + Sink.InitContext mockContext = mock(Sink.InitContext.class); + SinkWriterMetricGroup mockMetricGroup = mock(SinkWriterMetricGroup.class); + when(mockContext.metricGroup()).thenReturn(mockMetricGroup); + OperatorIOMetricGroup mockIOMetricGroup = mock(OperatorIOMetricGroup.class); + when(mockMetricGroup.getIOMetricGroup()).thenReturn(mockIOMetricGroup); + + HttpSinkInternal httpSink = createTestSink((ElementConverter) mockConverter); + httpSink.createWriter(mockContext); + // com.getindata.connectors.http.internal.sink.HttpSinkInternal.initElementConverterOfSchema + // org.apache.flink.connector.base.sink.writer.AsyncSinkWriter + verify(mockConverter, times(2)).open(mockContext); + } + + @Test + void testRestoreWriterInitializesLifecycleAwareConverter() throws Exception { + + SchemaLifecycleAwareElementConverter mockConverter = mock(SchemaLifecycleAwareElementConverter.class); + Sink.InitContext mockContext = mock(Sink.InitContext.class); + SinkWriterMetricGroup mockMetricGroup = mock(SinkWriterMetricGroup.class); + when(mockContext.metricGroup()).thenReturn(mockMetricGroup); + when(mockMetricGroup.getIOMetricGroup()).thenReturn(mock(OperatorIOMetricGroup.class)); + HttpSinkInternal httpSink = createTestSink((ElementConverter) mockConverter); + + httpSink.restoreWriter(mockContext, Collections.emptyList()); + // com.getindata.connectors.http.internal.sink.HttpSinkInternal.initElementConverterOfSchema + // org.apache.flink.connector.base.sink.writer.AsyncSinkWriter + verify(mockConverter, times(2)).open(mockContext); + } + + + private HttpSinkInternal createTestSink(ElementConverter converter) { + SinkHttpClientBuilder mockClientBuilder = mock(SinkHttpClientBuilder.class); + when(mockClientBuilder.build(any(), any(), any(), any())) + .thenReturn(mock(SinkHttpClient.class)); + + return new HttpSinkInternal<>( + converter, + 10, 1, 20, 1024L, 1000L, 1024L, + "http://test-endpoint.com", + mock(HttpPostRequestCallback.class), + mock(HeaderPreprocessor.class), + mockClientBuilder, + new Properties() + ); + } + +}