Skip to content

Commit f214977

Browse files
committed
Bump Magnolify to 0.9.5
1 parent dbf5b5c commit f214977

2 files changed

Lines changed: 202 additions & 1 deletion

File tree

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ val kantanCodecsVersion = "0.6.0"
107107
val kantanCsvVersion = "0.8.0"
108108
val kryoVersion = "4.0.3"
109109
val magnoliaVersion = "1.1.10"
110-
val magnolifyVersion = "0.9.4"
110+
val magnolifyVersion = "0.9.5"
111111
val metricsVersion = "4.2.38"
112112
val munitVersion = "1.2.4"
113113
val neo4jDriverVersion = "4.4.22"
Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
/*
2+
* Copyright 2026 Spotify AB.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing,
11+
* software distributed under the License is distributed on an
12+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
13+
* KIND, either express or implied. See the License for the
14+
* specific language governing permissions and limitations
15+
* under the License.
16+
*/
17+
18+
package com.spotify.scio.parquet
19+
20+
import com.spotify.scio.avro._
21+
import com.spotify.scio.io.TapSpec
22+
import com.spotify.scio.testing.PipelineSpec
23+
import com.spotify.scio.parquet.avro._
24+
import com.spotify.scio.parquet.types._
25+
import magnolify.parquet.{ArrayEncoding, MagnolifyParquetProperties, ParquetType}
26+
import org.apache.avro.Schema
27+
import org.apache.avro.generic.{GenericData, GenericRecord}
28+
import org.apache.beam.sdk.Pipeline.PipelineExecutionException
29+
import org.apache.parquet.avro.AvroWriteSupport
30+
31+
import scala.jdk.CollectionConverters._
32+
import java.io.File
33+
34+
private case class Nested(i: Int)
35+
private case class TestRecordScala(a: Int, b: List[String], c: List[Nested], d: Map[String, Int])
36+
37+
object ParquetFormatInteropTest {
38+
val AvroSchema = new Schema.Parser().parse(s"""|{
39+
| "type":"record",
40+
| "name":"TestRecord",
41+
| "namespace":"com.spotify.scio.parquet",
42+
| "fields":[
43+
| {"name":"a","type":"int"},
44+
| {"name":"b","type":{"type":"array","items":"string"}},
45+
| {"name":"c","type":{"type":"array","items":{
46+
| "type":"record","name":"array","namespace":"","fields":[{"name":"i","type":"int"}]
47+
| }}},
48+
| {"name":"d","type":{"type":"map","values":"int"}}]}
49+
| """.stripMargin)
50+
}
51+
class ParquetFormatInteropTest extends PipelineSpec with TapSpec {
52+
import ParquetFormatInteropTest.AvroSchema
53+
54+
private val nestedSchema = AvroSchema.getField("c").schema().getElementType
55+
56+
private val genericRecords: Seq[GenericRecord] = (1 to 10).map { i =>
57+
val nested = new GenericData.Record(nestedSchema)
58+
nested.put("i", i)
59+
val record = new GenericData.Record(AvroSchema)
60+
record.put("a", i)
61+
record.put("b", List(i, i * 2).map(_.toString).asJava)
62+
record.put("c", List(nested).asJava)
63+
record.put("d", Map("x" -> Integer.valueOf(i)).asJava)
64+
record
65+
}
66+
67+
private val typedRecords = (1 to 10).map { i =>
68+
TestRecordScala(i, List(i, i * 2).map(_.toString), List(Nested(i)), Map("x" -> i))
69+
}
70+
71+
implicit val grCoder: com.spotify.scio.coders.Coder[GenericRecord] =
72+
avroGenericRecordCoder(AvroSchema)
73+
74+
private val ptUngroupedListEncoding = ParquetType[TestRecordScala](
75+
new MagnolifyParquetProperties {
76+
override def writeArrayEncoding: ArrayEncoding = ArrayEncoding.Ungrouped
77+
}
78+
)
79+
80+
private val ptOldListEncoding = ParquetType[TestRecordScala](
81+
new MagnolifyParquetProperties {
82+
override def writeArrayEncoding: ArrayEncoding = ArrayEncoding.ThreeLevelArray
83+
}
84+
)
85+
86+
private val ptNewListEncoding = ParquetType[TestRecordScala](
87+
new MagnolifyParquetProperties {
88+
override def writeArrayEncoding: ArrayEncoding = ArrayEncoding.ThreeLevelList
89+
}
90+
)
91+
92+
".typedParquetFile" should "be able to read data written with .saveAsParquetAvroFile with old list encoding" in withTempDir {
93+
dir =>
94+
implicit val pt: ParquetType[TestRecordScala] = ptOldListEncoding
95+
96+
runWithRealContext()(
97+
_.parallelize(genericRecords)
98+
.saveAsParquetAvroFile(dir.toString, schema = AvroSchema)
99+
)
100+
101+
runWithRealContext()(
102+
_.typedParquetFile[TestRecordScala](s"$dir/*.parquet")
103+
.map(identity) should containInAnyOrder(typedRecords)
104+
)
105+
}
106+
107+
it should "be able to read data written with .saveAsParquetAvroFile with new list encoding" in withTempDir {
108+
dir =>
109+
implicit val pt: ParquetType[TestRecordScala] = ptNewListEncoding
110+
val listConf = ParquetConfiguration.of(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE -> false)
111+
112+
runWithRealContext()(
113+
_.parallelize(genericRecords)
114+
.saveAsParquetAvroFile(dir.toString, schema = AvroSchema, conf = listConf)
115+
)
116+
117+
runWithRealContext()(
118+
_.typedParquetFile[TestRecordScala](s"$dir/*.parquet")
119+
.map(identity) should containInAnyOrder(typedRecords)
120+
)
121+
}
122+
123+
def testFailOnMismatchedReadWriteEncodings(
124+
writeEncoding: ParquetType[TestRecordScala],
125+
readEncoding: ParquetType[TestRecordScala],
126+
dir: File
127+
): Unit = {
128+
{
129+
implicit val ptWrite: ParquetType[TestRecordScala] = writeEncoding
130+
runWithRealContext()(
131+
_.parallelize(typedRecords)
132+
.saveAsTypedParquetFile(dir.toString)
133+
)
134+
}
135+
{
136+
val e = the[PipelineExecutionException] thrownBy {
137+
implicit val ptRead: ParquetType[TestRecordScala] = readEncoding
138+
runWithRealContext()(
139+
_.typedParquetFile[TestRecordScala](s"$dir/*.parquet")
140+
.map(identity) should containInAnyOrder(typedRecords)
141+
)
142+
}
143+
e.getCause.getClass shouldBe classOf[org.apache.parquet.io.InvalidRecordException]
144+
}
145+
}
146+
147+
it should "fail quickly when reading data written with new list encoding if read ArrayEncoding is set to 2 level" in withTempDir {
148+
dir =>
149+
testFailOnMismatchedReadWriteEncodings(ptNewListEncoding, ptOldListEncoding, dir)
150+
}
151+
152+
it should "fail quickly when reading data written with new list encoding if read ArrayEncoding is set to ungrouped" in withTempDir {
153+
dir =>
154+
testFailOnMismatchedReadWriteEncodings(ptNewListEncoding, ptUngroupedListEncoding, dir)
155+
}
156+
157+
it should "fail quickly when reading data written with old list encoding if read ArrayEncoding is set to new encoding" in withTempDir {
158+
dir =>
159+
testFailOnMismatchedReadWriteEncodings(ptOldListEncoding, ptNewListEncoding, dir)
160+
}
161+
162+
it should "fail quickly when reading data written with old list encoding if read ArrayEncoding is set to ungrouped" in withTempDir {
163+
dir =>
164+
testFailOnMismatchedReadWriteEncodings(ptOldListEncoding, ptUngroupedListEncoding, dir)
165+
}
166+
167+
".parquetAvroFile" should "be able to read data written with .saveAsTypedParquetFile with old list encoding" in withTempDir {
168+
dir =>
169+
implicit val pt: ParquetType[TestRecordScala] = ptOldListEncoding
170+
171+
runWithRealContext()(
172+
_.parallelize(typedRecords)
173+
.saveAsTypedParquetFile(dir.toString)
174+
)
175+
176+
runWithRealContext()(
177+
_.parquetAvroFile[GenericRecord](s"$dir/*.parquet", projection = pt.avroSchema)
178+
.map(identity) should containInAnyOrder(genericRecords)
179+
)
180+
}
181+
182+
it should "be able to read data written with .saveAsTypedParquetFile with new list encoding" in withTempDir {
183+
dir =>
184+
implicit val pt: ParquetType[TestRecordScala] = ptNewListEncoding
185+
val listConf = ParquetConfiguration.of(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE -> false)
186+
187+
runWithRealContext()(
188+
_.parallelize(typedRecords)
189+
.saveAsTypedParquetFile(dir.toString)
190+
)
191+
192+
runWithRealContext()(
193+
_.parquetAvroFile[GenericRecord](
194+
s"$dir/*.parquet",
195+
projection = pt.avroSchema,
196+
conf = listConf
197+
)
198+
.map(identity) should containInAnyOrder(genericRecords)
199+
)
200+
}
201+
}

0 commit comments

Comments
 (0)