Skip to content

Commit 3bbcdc2

Browse files
committed
Test Parquet format interop with old and new list encodings
1 parent 2935047 commit 3bbcdc2

1 file changed

Lines changed: 149 additions & 0 deletions

File tree

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/*
2+
* Copyright 2026 Spotify AB.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing,
11+
* software distributed under the License is distributed on an
12+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
13+
* KIND, either express or implied. See the License for the
14+
* specific language governing permissions and limitations
15+
* under the License.
16+
*/
17+
18+
package com.spotify.scio.parquet
19+
20+
import com.spotify.scio.avro._
21+
import com.spotify.scio.io.TapSpec
22+
import com.spotify.scio.testing.PipelineSpec
23+
import com.spotify.scio.parquet.avro._
24+
import com.spotify.scio.parquet.types._
25+
import magnolify.parquet.{ArrayEncoding, MagnolifyParquetProperties, ParquetType}
26+
import org.apache.avro.Schema
27+
import org.apache.avro.generic.{GenericData, GenericRecord}
28+
import org.apache.parquet.avro.AvroWriteSupport
29+
30+
import scala.jdk.CollectionConverters._
31+
32+
private case class Nested(i: Int)
33+
private case class TestRecordScala(a: Int, b: List[String], c: List[Nested], d: Map[String, Int])
34+
35+
object ParquetFormatInteropTest {
36+
val AvroSchema = new Schema.Parser().parse(s"""|{
37+
| "type":"record",
38+
| "name":"TestRecord",
39+
| "namespace":"com.spotify.scio.parquet",
40+
| "fields":[
41+
| {"name":"a","type":"int"},
42+
| {"name":"b","type":{"type":"array","items":"string"}},
43+
| {"name":"c","type":{"type":"array","items":{
44+
| "type":"record","name":"array","namespace":"","fields":[{"name":"i","type":"int"}]
45+
| }}},
46+
| {"name":"d","type":{"type":"map","values":"int"}}]}
47+
| """.stripMargin)
48+
}
49+
class ParquetFormatInteropTest extends PipelineSpec with TapSpec {
50+
import ParquetFormatInteropTest.AvroSchema
51+
52+
private val nestedSchema = AvroSchema.getField("c").schema().getElementType
53+
54+
private val genericRecords: Seq[GenericRecord] = (1 to 10).map { i =>
55+
val nested = new GenericData.Record(nestedSchema)
56+
nested.put("i", i)
57+
val record = new GenericData.Record(AvroSchema)
58+
record.put("a", i)
59+
record.put("b", List(i, i * 2).map(_.toString).asJava)
60+
record.put("c", List(nested).asJava)
61+
record.put("d", Map("x" -> Integer.valueOf(i)).asJava)
62+
record
63+
}
64+
65+
private val typedRecords = (1 to 10).map { i =>
66+
TestRecordScala(i, List(i, i * 2).map(_.toString), List(Nested(i)), Map("x" -> i))
67+
}
68+
69+
implicit val grCoder: com.spotify.scio.coders.Coder[GenericRecord] =
70+
avroGenericRecordCoder(AvroSchema)
71+
72+
private val ptOldListEncoding = ParquetType[TestRecordScala](
73+
new MagnolifyParquetProperties {
74+
override def writeArrayEncoding: ArrayEncoding = ArrayEncoding.ThreeLevelArray
75+
}
76+
)
77+
78+
private val ptNewListEncoding = ParquetType[TestRecordScala](
79+
new MagnolifyParquetProperties {
80+
override def writeArrayEncoding: ArrayEncoding = ArrayEncoding.ThreeLevelList
81+
}
82+
)
83+
84+
".typedParquetFile" should "be able to read data written with .saveAsParquetAvroFile" in withTempDir {
85+
dir =>
86+
implicit val pt: ParquetType[TestRecordScala] = ptOldListEncoding
87+
88+
runWithRealContext()(
89+
_.parallelize(genericRecords)
90+
.saveAsParquetAvroFile(dir.toString, schema = AvroSchema)
91+
)
92+
93+
runWithRealContext()(
94+
_.typedParquetFile[TestRecordScala](s"$dir/*.parquet")
95+
.map(identity) should containInAnyOrder(typedRecords)
96+
)
97+
}
98+
99+
it should "be able to read data written with .saveAsParquetAvroFile with new list encoding" in withTempDir {
100+
dir =>
101+
implicit val pt: ParquetType[TestRecordScala] = ptNewListEncoding
102+
val listConf = ParquetConfiguration.of(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE -> false)
103+
104+
runWithRealContext()(
105+
_.parallelize(genericRecords)
106+
.saveAsParquetAvroFile(dir.toString, schema = AvroSchema, conf = listConf)
107+
)
108+
109+
runWithRealContext()(
110+
_.typedParquetFile[TestRecordScala](s"$dir/*.parquet")
111+
.map(identity) should containInAnyOrder(typedRecords)
112+
)
113+
}
114+
115+
".parquetAvroFile" should "be able to read data written with .saveAsTypedParquetFile" in withTempDir {
116+
dir =>
117+
implicit val pt: ParquetType[TestRecordScala] = ptOldListEncoding
118+
119+
runWithRealContext()(
120+
_.parallelize(typedRecords)
121+
.saveAsTypedParquetFile(dir.toString)
122+
)
123+
124+
runWithRealContext()(
125+
_.parquetAvroFile[GenericRecord](s"$dir/*.parquet", projection = pt.avroSchema)
126+
.map(identity) should containInAnyOrder(genericRecords)
127+
)
128+
}
129+
130+
it should "be able to read data written with .saveAsTypedParquetFile with new list encoding" in withTempDir {
131+
dir =>
132+
implicit val pt: ParquetType[TestRecordScala] = ptNewListEncoding
133+
val listConf = ParquetConfiguration.of(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE -> false)
134+
135+
runWithRealContext()(
136+
_.parallelize(typedRecords)
137+
.saveAsTypedParquetFile(dir.toString)
138+
)
139+
140+
runWithRealContext()(
141+
_.parquetAvroFile[GenericRecord](
142+
s"$dir/*.parquet",
143+
projection = pt.avroSchema,
144+
conf = listConf
145+
)
146+
.map(identity) should containInAnyOrder(genericRecords)
147+
)
148+
}
149+
}

0 commit comments

Comments
 (0)