Skip to content

Commit 8690a21

Browse files
authored
Introduce a document summary for collapsing and a way to safely roll it out (#1333)
1 parent b4bae28 commit 8690a21

File tree

17 files changed

+397
-396
lines changed

17 files changed

+397
-396
lines changed

src/marqo/core/constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
MARQO_COLLAPSE_FIELDS_MINIMUM_VERSION = semver.VersionInfo.parse('2.23.0')
3232
MARQO_TYPEAHEAD_SCHEMA_MINIMUM_VERSION = semver.VersionInfo.parse('2.23.0')
3333
MARQO_UPDATE_SCHEMA_MINIMUM_VERSION = semver.VersionInfo.parse('2.23.0')
34+
MARQO_COLLAPSE_MINIMAL_SUMMARY_MINIMUM_VERSION = semver.VersionInfo.parse('2.24.6')
3435

3536
# For score modifiers
3637
QUERY_INPUT_SCORE_MODIFIERS_MULT_WEIGHTS_2_9 = 'marqo__mult_weights'

src/marqo/core/index_management/index_management.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,24 @@ def apply_latest_schema_template(self, index_name: str, force: bool = False, dry
313313
f"Please recreate the index with a newer version of Marqo to use this feature."
314314
)
315315

316+
# Validate that index's marqo_version is not greater than current version
317+
current_version_parsed = semver.VersionInfo.parse(version.get_version())
318+
if existing_index.parsed_marqo_version() > current_version_parsed:
319+
raise InternalError(
320+
f"Cannot update schema for index '{index_name}' created with Marqo version "
321+
f"{existing_index.marqo_version} using current Marqo version {version.get_version()}. "
322+
f"The index was created with a newer version of Marqo than is currently running."
323+
)
324+
325+
# Early return if schema is already at current version
326+
if existing_index.schema_template_version == version.get_version():
327+
logger.info(f'Index {index_name} schema is already at version {version.get_version()}')
328+
return {
329+
"updated": False,
330+
"schemaChanged": False,
331+
"reason": f"Schema is already at current Marqo version {version.get_version()}"
332+
}
333+
316334
# Generate new schema from current settings using latest template
317335
new_schema = SemiStructuredVespaSchema.generate_vespa_schema(existing_index)
318336

@@ -343,7 +361,6 @@ def apply_latest_schema_template(self, index_name: str, force: bool = False, dry
343361
"newSchema": new_schema,
344362
"schemaDiff": schema_diff,
345363
"configChangeActions": {},
346-
"reason": ""
347364
}
348365

349366
# Scenario 1: Schemas are identical - no changes needed

src/marqo/core/index_management/vespa_application_package.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from marqo.core.models import MarqoIndex
2121
from marqo.core.typeahead.typeahead_vespa_schema import TypeaheadVespaSchema
2222
import marqo.logging
23-
from marqo.vespa.exceptions import VespaError
23+
from marqo import version as marqo_version
2424
from marqo.vespa.vespa_client import VespaClient
2525

2626

@@ -772,7 +772,10 @@ def update_index_setting_and_schema(self, index: MarqoIndex, schema: str,
772772

773773
version = index.version + 1 if index.version is not None else 1
774774
self._store.save_file(schema, 'schemas', f'{index.schema_name}.sd')
775-
self._index_setting_store.save_index_setting(index.copy(update={'version': version}))
775+
self._index_setting_store.save_index_setting(index.copy(update={
776+
'version': version,
777+
'schema_template_version': marqo_version.get_version()
778+
}))
776779
self._persist_index_settings()
777780

778781
if prepare_only:

src/marqo/core/models/marqo_index.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -540,6 +540,7 @@ class SemiStructuredMarqoIndex(UnstructuredMarqoIndex):
540540
string_array_fields: Optional[List[
541541
StringArrayField]] # This is required so that when saving a document containing string array fields, we can make changes to the schema on the fly. Ref: https://github.com/marqo-ai/marqo/blob/cfea70adea7039d1586c94e36adae8e66cabe306/src/marqo/core/semi_structured_vespa_index/semi_structured_vespa_schema_template_2_16.sd.jinja2#L83
542542
collapse_fields: Optional[List[CollapseField]] = None
543+
schema_template_version: Optional[str] = None
543544

544545
def __init__(self, **data):
545546
super().__init__(**data)
@@ -560,6 +561,16 @@ def is_collapse_field(self, field_name: str) -> bool:
560561
return False
561562
return field_name in [field.name for field in self.collapse_fields]
562563

564+
def parsed_schema_template_version(self) -> semver.VersionInfo:
565+
"""
566+
Get the schema template version as a semver object.
567+
Falls back to marqo_version if schema_template_version is not set (backward compatibility).
568+
"""
569+
if self.schema_template_version is not None:
570+
return semver.VersionInfo.parse(self.schema_template_version)
571+
# Backward compatibility: old indexes don't have schema_template_version
572+
return self.parsed_marqo_version()
573+
563574
@property
564575
def field_map(self) -> Dict[str, Field]:
565576
"""
@@ -651,6 +662,11 @@ def generate():
651662

652663
return self._cache_or_get('tensor_subfield_map', generate)
653664

665+
# TODO: Update index_supports_* properties to use parsed_schema_template_version()
666+
# instead of parsed_marqo_version() for more accurate feature detection
667+
# after schema updates. This will allow features to be detected based on
668+
# the deployed schema template version rather than index creation version.
669+
654670
@property
655671
def index_supports_partial_updates(self) -> bool:
656672
"""
@@ -687,6 +703,17 @@ def index_supports_sorty_by(self) -> bool:
687703
'index_supports_sort_by',
688704
lambda: self.parsed_marqo_version() >= constants.MARQO_SORT_BY_MINIMUM_VERSION)
689705

706+
@property
707+
def index_supports_collapse_minimal_summary(self) -> bool:
708+
"""
709+
Check if the index schema supports collapse-minimal-summary.
710+
This summary class was added in version 2.24.6 to optimize
711+
collapse query performance by returning minimal fields.
712+
"""
713+
return self._cache_or_get(
714+
'index_supports_collapse_minimal_summary',
715+
lambda: self.parsed_schema_template_version() >= constants.MARQO_COLLAPSE_MINIMAL_SUMMARY_MINIMUM_VERSION)
716+
690717

691718
_PROTECTED_FIELD_NAMES = ['_id', '_tensor_facets', '_highlights', '_score', '_found']
692719
_VESPA_NAME_PATTERN = r'[a-zA-Z_][a-zA-Z0-9_]*'

src/marqo/core/models/marqo_index_request.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ class UnstructuredMarqoIndexRequest(MarqoIndexRequest):
4242
treat_urls_and_pointers_as_media: bool
4343
filter_string_max_length: int
4444
collapse_fields: Optional[List[marqo_index.CollapseField]] = None
45+
schema_template_version: Optional[str] = None # For testing: allows simulating older schema template versions
4546

4647
@root_validator
4748
def validate_collapse_fields(cls, values):

src/marqo/core/semi_structured_vespa_index/semi_structured_vespa_index.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ def _to_vespa_hybrid_query(self, marqo_query):
122122
return query
123123

124124
def _generate_collapse_query_params(self, collapse_field_name: str):
125-
return {
125+
params = {
126126
'collapsefield': collapse_field_name,
127127
'collapsesize': 1, # currently fixed to 1, will support multiple if needed in the future
128128

@@ -133,6 +133,12 @@ def _generate_collapse_query_params(self, collapse_field_name: str):
133133
'marqo__ranking.tensor.lexical': common.RANK_PROFILE_HYBRID_EMBEDDING_SIMILARITY_THEN_BM25 + '_diversity',
134134
}
135135

136+
# Only use minimal summary if the schema supports it (version check)
137+
if self.get_marqo_index().index_supports_collapse_minimal_summary:
138+
params['collapse.summary'] = 'collapse-minimal-summary'
139+
140+
return params
141+
136142
def _add_relevance_cutoff_and_sort_by_params(self, marqo_query, query):
137143
if marqo_query.relevance_cutoff:
138144
query["marqo__hybrid.relevanceCutoff.method"] = marqo_query.relevance_cutoff.method

src/marqo/core/semi_structured_vespa_index/semi_structured_vespa_schema.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ def _generate_marqo_index(self, schema_name: str) -> SemiStructuredMarqoIndex:
5555
vector_numeric_type=self._index_request.vector_numeric_type,
5656
hnsw_config=self._index_request.hnsw_config,
5757
marqo_version=self._index_request.marqo_version,
58+
schema_template_version=self._index_request.schema_template_version or self._index_request.marqo_version,
5859
created_at=self._index_request.created_at,
5960
updated_at=self._index_request.updated_at,
6061
lexical_fields=[],

src/marqo/core/semi_structured_vespa_index/semi_structured_vespa_schema_template_2_16.sd.jinja2

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,4 +403,10 @@ schema {{ index.schema_name }} {
403403
summary {{ field.embeddings_field_name }} type tensor<float>(p{}, x[{{ dimension }}]) {}
404404
{%- endfor %}
405405
}
406+
407+
{% if collapse_field -%}
408+
document-summary collapse-minimal-summary {
409+
summary {{ collapse_field.name }} type string {}
410+
}
411+
{%- endif %}
406412
}

src/marqo/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
__version__ = "2.24.5"
1+
__version__ = "2.24.6"
22

33
def get_version() -> str:
44
return f"{__version__}"

tests/integ_tests/core/index_management/test_index_management_schema_update.py

Lines changed: 70 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from datetime import datetime, timedelta
44
from unittest.mock import patch
55

6+
from marqo import version
67
from marqo.core.exceptions import IndexNotFoundError, InternalError, UnsupportedFeatureError
78
from marqo.core.models.add_docs_params import AddDocsParams
89
from marqo.core.models.marqo_index import FieldType
@@ -41,12 +42,16 @@ def setUpClass(cls) -> None:
4142
# Bootstrap Vespa
4243
cls.index_management.bootstrap_vespa()
4344

45+
cls.original_marqo_version = '2.23.0'
46+
cls.original_schema_template_version = '2.24.0'
47+
4448
# pre-create all indexes for tests
4549
index_requests = [
46-
cls.unstructured_marqo_index_request(name='test_schema_update_index'),
47-
cls.unstructured_marqo_index_request(name='test_config_change_restart'),
48-
cls.unstructured_marqo_index_request(name='test_config_change_reindex'),
49-
cls.unstructured_marqo_index_request(name='test_config_change_refeed'),
50+
cls.unstructured_marqo_index_request(name='test_schema_update_index', marqo_version=cls.original_marqo_version, schema_template_version=cls.original_schema_template_version),
51+
cls.unstructured_marqo_index_request(name='test_config_change_restart', marqo_version=cls.original_marqo_version, schema_template_version=cls.original_schema_template_version),
52+
cls.unstructured_marqo_index_request(name='test_config_change_reindex', marqo_version=cls.original_marqo_version, schema_template_version=cls.original_schema_template_version),
53+
cls.unstructured_marqo_index_request(name='test_config_change_refeed', marqo_version=cls.original_marqo_version, schema_template_version=cls.original_schema_template_version),
54+
cls.unstructured_marqo_index_request(name='test_schema_template_version_current'),
5055
cls.unstructured_marqo_index_request(name='old_version_index', marqo_version='2.22.0'),
5156
cls.unstructured_marqo_index_request(name='legacy_unstructured_index', marqo_version='2.12.0'),
5257
cls.structured_marqo_index_request(
@@ -57,12 +62,13 @@ def setUpClass(cls) -> None:
5762
]
5863
cls.create_indexes(index_requests)
5964

60-
# feed in a doc to add a 'title' lexical field to 'test_config_change_restart'
61-
cls.add_documents(cls.config, AddDocsParams(
62-
index_name='test_config_change_restart',
63-
docs=[{'_id': '1', 'title': 'hello'}],
64-
tensor_fields=[]
65-
))
65+
# feed in a doc to add a 'title' lexical field to 'test_config_change_restart', keep the schema version unchanged
66+
with patch('marqo.version.get_version', return_value=cls.original_schema_template_version):
67+
cls.add_documents(cls.config, AddDocsParams(
68+
index_name='test_config_change_restart',
69+
docs=[{'_id': '1', 'title': 'hello'}],
70+
tensor_fields=[]
71+
))
6672

6773
def _get_validation_overrides(self) -> str:
6874
app = self.vespa_client.download_application()
@@ -96,7 +102,10 @@ def test_update_schema_no_changes(self):
96102
self.assertEqual(original_schema, result['oldSchema'])
97103
self.assertEqual(original_schema, result['newSchema'])
98104

99-
self.assertEqual(original_version, self.index_management.get_index(test_index_name).version)
105+
# Verify version and schema_template_version not updated (remains None when no changes)
106+
updated_index = self.index_management.get_index(test_index_name)
107+
self.assertEqual(self.original_schema_template_version, updated_index.schema_template_version)
108+
self.assertEqual(original_version, updated_index.version)
100109

101110
@patch('marqo.core.semi_structured_vespa_index.semi_structured_vespa_schema.SemiStructuredVespaSchema.generate_vespa_schema')
102111
def test_update_schema_successful(self, mock_generate_schema):
@@ -126,8 +135,11 @@ def test_update_schema_successful(self, mock_generate_schema):
126135

127136
# Verify schema was actually deployed to Vespa
128137
self.assertEqual(modified_schema, self._get_schema_from_vespa(saved_index.schema_name))
129-
# Verify the index version is also updated
130-
self.assertEqual(original_version + 1, self.index_management.get_index(test_index_name).version)
138+
139+
# Verify schema_template_version updated to current version; and index version becomes +1
140+
updated_index = self.index_management.get_index(test_index_name)
141+
self.assertEqual(version.get_version(), updated_index.schema_template_version)
142+
self.assertEqual(original_version + 1, updated_index.version)
131143

132144
@patch('marqo.core.semi_structured_vespa_index.semi_structured_vespa_schema.SemiStructuredVespaSchema.generate_vespa_schema')
133145
def test_update_schema_dry_run_prevents_deployment(self, mock_generate_schema):
@@ -157,6 +169,10 @@ def test_update_schema_dry_run_prevents_deployment(self, mock_generate_schema):
157169
# Verify schema was NOT deployed to Vespa (original schema still present)
158170
self.assertEqual(original_schema, self._get_schema_from_vespa(saved_index.schema_name))
159171

172+
# Verify schema_template_version not updated in dry run
173+
updated_index = self.index_management.get_index(test_index_name)
174+
self.assertEqual(self.original_schema_template_version, updated_index.schema_template_version)
175+
160176
# ============================================================================
161177
# Error Case Tests
162178
# ============================================================================
@@ -182,7 +198,7 @@ def test_update_schema_wrong_index_type_legacy_unstructured(self):
182198

183199
self.assertIn('only semi-structured indexes support schema updates', str(ctx.exception))
184200

185-
def test_update_schema_version_too_old(self):
201+
def test_update_schema_template_version_too_old(self):
186202
"""Should raise UnsupportedFeatureError for indexes created with Marqo < 2.23.0."""
187203
with self.assertRaisesStrict(UnsupportedFeatureError) as ctx:
188204
self.index_management.apply_latest_schema_template('old_version_index')
@@ -191,6 +207,22 @@ def test_update_schema_version_too_old(self):
191207
str(ctx.exception))
192208
self.assertIn('created with Marqo 2.22.0', str(ctx.exception))
193209

210+
def test_shortcut_when_schema_template_version_current(self):
211+
"""When schema_template_version matches current version, shortcut is triggered."""
212+
current_version = version.get_version()
213+
index_name = 'test_schema_template_version_current'
214+
215+
result = self.index_management.apply_latest_schema_template(index_name)
216+
217+
# Verify shortcut response
218+
self.assertFalse(result['updated'])
219+
self.assertFalse(result['schemaChanged'])
220+
self.assertIn(f'already at current Marqo version {current_version}', result['reason'])
221+
222+
# Verify schema_template_version unchanged
223+
index = self.index_management.get_index(index_name)
224+
self.assertEqual(current_version, index.schema_template_version)
225+
194226
# ============================================================================
195227
# configChangeActions Tests
196228
# ============================================================================
@@ -244,6 +276,10 @@ def test_update_schema_with_restart_actions(self, mock_generate_schema):
244276
# Verify schema was NOT deployed
245277
self.assertEqual(original_schema, self._get_schema_from_vespa(saved_index.schema_name))
246278

279+
# Verify schema_template_version not updated when blocked
280+
blocked_index = self.index_management.get_index(test_index_name)
281+
self.assertEqual(self.original_schema_template_version, blocked_index.schema_template_version)
282+
247283
# Verify it updates the schema when forced set to true
248284
result_forced = self.index_management.apply_latest_schema_template(
249285
test_index_name,
@@ -254,6 +290,10 @@ def test_update_schema_with_restart_actions(self, mock_generate_schema):
254290
self.assertIn('Update forced despite required actions', result_forced['reason'])
255291
self.assertEqual(modified_schema, self._get_schema_from_vespa(saved_index.schema_name))
256292

293+
# Verify schema_template_version updated when forced
294+
forced_index = self.index_management.get_index(test_index_name)
295+
self.assertEqual(version.get_version(), forced_index.schema_template_version)
296+
257297
@patch('marqo.core.semi_structured_vespa_index.semi_structured_vespa_schema.SemiStructuredVespaSchema.generate_vespa_schema')
258298
def test_update_schema_with_refeed_actions(self, mock_generate_schema):
259299
"""When re-feed actions are required and force=False, should block deployment.
@@ -296,6 +336,10 @@ def test_update_schema_with_refeed_actions(self, mock_generate_schema):
296336

297337
self.assertEqual(original_schema, self._get_schema_from_vespa(saved_index.schema_name))
298338

339+
# Verify schema_template_version not updated when blocked
340+
blocked_index = self.index_management.get_index(test_index_name)
341+
self.assertEqual(self.original_schema_template_version, blocked_index.schema_template_version)
342+
299343
# Force update
300344
result_forced = self.index_management.apply_latest_schema_template(
301345
test_index_name,
@@ -309,6 +353,10 @@ def test_update_schema_with_refeed_actions(self, mock_generate_schema):
309353

310354
self.assertEqual(modified_schema, self._get_schema_from_vespa(saved_index.schema_name))
311355

356+
# Verify schema_template_version updated when forced
357+
forced_index = self.index_management.get_index(test_index_name)
358+
self.assertEqual(version.get_version(), forced_index.schema_template_version)
359+
312360
@patch('marqo.core.semi_structured_vespa_index.semi_structured_vespa_schema.SemiStructuredVespaSchema.generate_vespa_schema')
313361
def test_update_schema_with_reindex_actions(self, mock_generate_schema):
314362
"""When reindex actions are required and force=False, should block deployment.
@@ -357,6 +405,10 @@ def test_update_schema_with_reindex_actions(self, mock_generate_schema):
357405

358406
self.assertEqual(original_schema, self._get_schema_from_vespa(saved_index.schema_name))
359407

408+
# Verify schema_template_version not updated when blocked
409+
blocked_index = self.index_management.get_index(test_index_name)
410+
self.assertEqual(self.original_schema_template_version, blocked_index.schema_template_version)
411+
360412
# Force update
361413
result_forced = self.index_management.apply_latest_schema_template(
362414
test_index_name,
@@ -369,3 +421,7 @@ def test_update_schema_with_reindex_actions(self, mock_generate_schema):
369421
self.assertIn('Update forced despite required actions', result_forced['reason'])
370422
self.assertIn('reindex', result_forced['configChangeActions'])
371423
self.assertEqual(modified_schema, self._get_schema_from_vespa(saved_index.schema_name))
424+
425+
# Verify schema_template_version updated when forced
426+
forced_index = self.index_management.get_index(test_index_name)
427+
self.assertEqual(version.get_version(), forced_index.schema_template_version)

0 commit comments

Comments
 (0)