Skip to content

Commit aa57b0e

Browse files
authored
Add integration test that should fail (#5950)
1 parent fe571f6 commit aa57b0e

2 files changed

Lines changed: 79 additions & 1 deletion

File tree

build.sbt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1749,6 +1749,7 @@ lazy val integration = project
17491749
`scio-jdbc` % "compile;test->test",
17501750
`scio-managed` % "test->test",
17511751
`scio-neo4j` % "test->test",
1752+
`scio-parquet` % "test->test",
17521753
`scio-smb` % "test->provided,test"
17531754
)
17541755
.settings(commonSettings)
@@ -1802,7 +1803,8 @@ lazy val integration = project
18021803
"org.apache.iceberg" % "iceberg-core" % icebergVersion % Test,
18031804
"org.apache.iceberg" % "iceberg-parquet" % icebergVersion % Test,
18041805
"org.apache.parquet" % "parquet-common" % parquetVersion % Test,
1805-
"org.apache.parquet" % "parquet-column" % parquetVersion % Test
1806+
"org.apache.parquet" % "parquet-column" % parquetVersion % Test,
1807+
"com.google.cloud.bigdataoss" % "gcs-connector" % bigdataossVersion % Test
18061808
)
18071809
)
18081810

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright 2026 Spotify AB.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing,
11+
* software distributed under the License is distributed on an
12+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
13+
* KIND, either express or implied. See the License for the
14+
* specific language governing permissions and limitations
15+
* under the License.
16+
*/
17+
18+
package com.spotify.scio.parquet
19+
20+
import com.spotify.scio.ContextAndArgs
21+
import com.spotify.scio.parquet.ParquetConfiguration
22+
import com.spotify.scio.parquet.read.ParquetReadConfiguration
23+
import com.spotify.scio.parquet.types._
24+
import com.spotify.scio.testing.PipelineSpec
25+
import com.spotify.scio.testing.util.ItUtils
26+
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions
27+
import org.apache.beam.sdk.io.FileSystems
28+
import org.apache.beam.sdk.options.PipelineOptionsFactory
29+
30+
import scala.jdk.CollectionConverters._
31+
32+
case class ParquetITRecord(id: Int, value: String)
33+
34+
class ParquetReadIT extends PipelineSpec {
35+
36+
private def gcsDir: String = ItUtils.gcpTempLocation("parquet-read-it")
37+
38+
private def gcpOptions: Array[String] = {
39+
val tempLocation = ItUtils.gcpTempLocation("parquet-read-it-tmp")
40+
Array(s"--project=${ItUtils.project}", s"--tempLocation=$tempLocation")
41+
}
42+
43+
private def deleteDir(dir: String): Unit = {
44+
val opts = PipelineOptionsFactory.create()
45+
opts.as(classOf[GcpOptions]).setProject(ItUtils.project)
46+
FileSystems.setDefaultPipelineOptions(opts)
47+
val files = FileSystems.`match`(s"$dir/**").metadata().asScala.map(_.resourceId())
48+
if (files.nonEmpty) {
49+
FileSystems.delete(files.asJava)
50+
}
51+
}
52+
53+
"typedParquetFile" should "round-trip through GCS" in {
54+
val dir = gcsDir
55+
val records = (1 to 10).map(i => ParquetITRecord(i, s"value-$i")).toList
56+
57+
try {
58+
val (sc1, _) = ContextAndArgs(gcpOptions)
59+
sc1.parallelize(records).saveAsTypedParquetFile(dir)
60+
sc1.run().waitUntilDone()
61+
62+
val legacyReadConf = ParquetConfiguration.of(
63+
ParquetReadConfiguration.UseSplittableDoFn -> false
64+
)
65+
66+
val (sc2, _) = ContextAndArgs(gcpOptions)
67+
sc2.typedParquetFile[ParquetITRecord](
68+
path = s"$dir/*.parquet",
69+
conf = legacyReadConf
70+
) should containInAnyOrder(records)
71+
sc2.run()
72+
} finally {
73+
deleteDir(dir)
74+
}
75+
}
76+
}

0 commit comments

Comments
 (0)