Skip to content

Conversation

@codeflash-ai
Copy link

@codeflash-ai codeflash-ai bot commented Nov 13, 2025

📄 21% (0.21x) speedup for AsyncLoggingQueue._fetch_batch_from_queue in mlflow/utils/async_logging/async_logging_queue.py

⏱️ Runtime : 929 microseconds 769 microseconds (best of 106 runs)

📝 Explanation and details

The optimized code achieves a 20% speedup by replacing inefficient queue polling with a more performant batch draining approach and eliminating redundant computations.

Key optimizations:

  1. Queue draining strategy: Instead of using queue.qsize() estimate and iterative queue.get() calls, the code now drains the entire queue upfront using get_nowait() in a try/except loop. This eliminates the unreliable queue size estimation and reduces queue locking overhead from multiple individual operations to a single batch drain.

  2. Precomputed length calculations: The original code repeatedly computed len(merged_batch.metrics + merged_batch.params + merged_batch.tags) which creates temporary concatenated lists. The optimization precomputes individual lengths once per iteration and reuses them, avoiding expensive list concatenation operations.

  3. Reduced queue interaction: The original approach made multiple queue.empty() and queue.get() calls within the loop, each requiring thread synchronization. The optimized version minimizes this to a single draining phase, reducing locking contention.

Performance characteristics from test results:

  • Large batch scenarios show the biggest gains (33-118% faster) when merging many small batches, as the optimization eliminates O(n²) queue polling overhead
  • Small batch cases may be slightly slower (3-30%) due to the overhead of the try/catch mechanism, but this is negligible in absolute terms
  • The optimization is particularly effective for workloads with high queue throughput where batches frequently hit size limits

The changes maintain identical functionality while significantly improving performance for typical async logging scenarios where multiple batches need to be processed and merged efficiently.

Correctness verification report:

Test Status
⚙️ Existing Unit Tests 🔘 None Found
🌀 Generated Regression Tests 72 Passed
⏪ Replay Tests 🔘 None Found
🔎 Concolic Coverage Tests 🔘 None Found
📊 Tests Coverage 100.0%
🌀 Generated Regression Tests and Runtime
import threading
from queue import Queue
from typing import Callable

# imports
import pytest
from mlflow.utils.async_logging.async_logging_queue import AsyncLoggingQueue


# --- Minimal stubs for dependencies ---
class Metric:
    def __init__(self, key, value, step, timestamp):
        self.key = key
        self.value = value
        self.step = step
        self.timestamp = timestamp

class Param:
    def __init__(self, key, value):
        self.key = key
        self.value = value

class RunTag:
    def __init__(self, key, value):
        self.key = key
        self.value = value

class RunBatch:
    def __init__(self, run_id, metrics=None, params=None, tags=None):
        self.run_id = run_id
        self.metrics = metrics if metrics is not None else []
        self.params = params if params is not None else []
        self.tags = tags if tags is not None else []
        self.child_batches = []

    def add_child_batch(self, batch):
        self.child_batches.append(batch)

class QueueStatus:
    IDLE = 0
    BUSY = 1

# --- Constants ---
_MAX_ITEMS_PER_BATCH = 1000
_MAX_PARAMS_PER_BATCH = 100
_MAX_TAGS_PER_BATCH = 100
from mlflow.utils.async_logging.async_logging_queue import AsyncLoggingQueue


# --- Helper functions for tests ---
def make_metric(i):
    return Metric(f"metric_{i}", i, i, i)

def make_param(i):
    return Param(f"param_{i}", str(i))

def make_tag(i):
    return RunTag(f"tag_{i}", str(i))

def make_batch(run_id, num_metrics=0, num_params=0, num_tags=0):
    return RunBatch(
        run_id,
        metrics=[make_metric(i) for i in range(num_metrics)],
        params=[make_param(i) for i in range(num_params)],
        tags=[make_tag(i) for i in range(num_tags)],
    )

# --- Unit tests ---
# 1. Basic Test Cases
def test_empty_queue_returns_empty_list():
    # Scenario: Queue is empty
    queue = AsyncLoggingQueue(lambda *a: None)
    codeflash_output = queue._fetch_batch_from_queue() # 1.64μs -> 1.70μs (3.53% slower)

def test_single_batch_in_queue():
    # Scenario: One batch in the queue
    queue = AsyncLoggingQueue(lambda *a: None)
    batch = make_batch("run_1", 2, 1, 1)
    queue._queue.put(batch)
    codeflash_output = queue._fetch_batch_from_queue(); result = codeflash_output # 4.64μs -> 6.70μs (30.7% slower)

def test_multiple_batches_same_run_id_under_limits():
    # Scenario: Multiple batches, same run_id, total under all limits
    queue = AsyncLoggingQueue(lambda *a: None)
    b1 = make_batch("run_1", 2, 1, 1)
    b2 = make_batch("run_1", 3, 2, 2)
    queue._queue.put(b1)
    queue._queue.put(b2)
    codeflash_output = queue._fetch_batch_from_queue(); result = codeflash_output # 7.50μs -> 8.70μs (13.8% slower)
    merged = result[0]

def test_multiple_batches_different_run_ids():
    # Scenario: Multiple batches with different run_ids
    queue = AsyncLoggingQueue(lambda *a: None)
    b1 = make_batch("run_1", 2, 1, 1)
    b2 = make_batch("run_2", 3, 2, 2)
    queue._queue.put(b1)
    queue._queue.put(b2)
    codeflash_output = queue._fetch_batch_from_queue(); result = codeflash_output # 5.78μs -> 7.80μs (25.8% slower)

def test_batch_merging_stops_at_max_items():
    # Scenario: Batches merged until _MAX_ITEMS_PER_BATCH is reached
    queue = AsyncLoggingQueue(lambda *a: None)
    b1 = make_batch("run_1", 500, 200, 200)  # 500 metrics, 200 params, 200 tags = 900 items
    b2 = make_batch("run_1", 99, 0, 0)      # 99 metrics, total now 999
    b3 = make_batch("run_1", 2, 0, 0)       # Would push over 1000
    queue._queue.put(b1)
    queue._queue.put(b2)
    queue._queue.put(b3)
    codeflash_output = queue._fetch_batch_from_queue(); result = codeflash_output # 13.7μs -> 10.2μs (33.8% faster)
    # First batch should have 999 items, second batch 2 items

# 2. Edge Test Cases
def test_batch_merging_stops_at_max_params():
    # Scenario: Merging stops when params exceed _MAX_PARAMS_PER_BATCH
    queue = AsyncLoggingQueue(lambda *a: None)
    b1 = make_batch("run_1", 1, 99, 1)
    b2 = make_batch("run_1", 1, 2, 1)  # Would push params to 101 (>100)
    queue._queue.put(b1)
    queue._queue.put(b2)
    codeflash_output = queue._fetch_batch_from_queue(); result = codeflash_output # 7.01μs -> 7.97μs (12.0% slower)

def test_batch_merging_stops_at_max_tags():
    # Scenario: Merging stops when tags exceed _MAX_TAGS_PER_BATCH
    queue = AsyncLoggingQueue(lambda *a: None)
    b1 = make_batch("run_1", 1, 1, 99)
    b2 = make_batch("run_1", 1, 1, 2)  # Would push tags to 101 (>100)
    queue._queue.put(b1)
    queue._queue.put(b2)
    codeflash_output = queue._fetch_batch_from_queue(); result = codeflash_output # 6.87μs -> 7.77μs (11.6% slower)

def test_batch_merging_exactly_at_limits():
    # Scenario: Merging stops exactly at limits
    queue = AsyncLoggingQueue(lambda *a: None)
    b1 = make_batch("run_1", 500, 50, 50)
    b2 = make_batch("run_1", 500, 50, 50)  # Would push to 1000 items, 100 params, 100 tags
    queue._queue.put(b1)
    queue._queue.put(b2)
    codeflash_output = queue._fetch_batch_from_queue(); result = codeflash_output # 10.9μs -> 8.16μs (33.6% faster)
    merged = result[0]

def test_batch_merging_zero_items():
    # Scenario: Batches with zero metrics/params/tags
    queue = AsyncLoggingQueue(lambda *a: None)
    b1 = make_batch("run_1", 0, 0, 0)
    b2 = make_batch("run_1", 0, 0, 0)
    queue._queue.put(b1)
    queue._queue.put(b2)
    codeflash_output = queue._fetch_batch_from_queue(); result = codeflash_output # 7.09μs -> 8.65μs (18.0% slower)

def test_batch_merging_with_one_batch_exceeding_limits():
    # Scenario: One batch itself exceeds limits, should not merge
    queue = AsyncLoggingQueue(lambda *a: None)
    b1 = make_batch("run_1", 1001, 101, 101)
    b2 = make_batch("run_1", 1, 1, 1)
    queue._queue.put(b1)
    queue._queue.put(b2)
    codeflash_output = queue._fetch_batch_from_queue(); result = codeflash_output # 12.1μs -> 8.34μs (44.6% faster)

def test_batch_merging_with_run_id_switch():
    # Scenario: Run ID changes in the middle
    queue = AsyncLoggingQueue(lambda *a: None)
    b1 = make_batch("run_1", 2, 2, 2)
    b2 = make_batch("run_2", 2, 2, 2)
    b3 = make_batch("run_1", 2, 2, 2)
    queue._queue.put(b1)
    queue._queue.put(b2)
    queue._queue.put(b3)
    codeflash_output = queue._fetch_batch_from_queue(); result = codeflash_output # 6.57μs -> 8.74μs (24.8% slower)

# 3. Large Scale Test Cases
def test_large_number_of_batches_merging():
    # Scenario: Many small batches, same run_id, should merge until limits
    queue = AsyncLoggingQueue(lambda *a: None)
    # Each batch: 10 metrics, 2 params, 2 tags
    num_batches = 50
    for i in range(num_batches):
        queue._queue.put(make_batch("run_1", 10, 2, 2))
    codeflash_output = queue._fetch_batch_from_queue(); result = codeflash_output # 136μs -> 62.4μs (118% faster)
    merged = result[0]

def test_large_batches_with_run_id_switches():
    # Scenario: Large batches with run_id switches
    queue = AsyncLoggingQueue(lambda *a: None)
    for i in range(10):
        queue._queue.put(make_batch(f"run_{i}", 100, 10, 10))
    codeflash_output = queue._fetch_batch_from_queue(); result = codeflash_output # 12.8μs -> 15.3μs (16.4% slower)
    for i, batch in enumerate(result):
        pass

def test_large_batches_exceeding_limits():
    # Scenario: Many batches, merging should split into multiple batches due to limits
    queue = AsyncLoggingQueue(lambda *a: None)
    # Each batch: 200 metrics, 20 params, 20 tags
    for i in range(6):  # 6*200=1200 metrics, 120 params, 120 tags
        queue._queue.put(make_batch("run_1", 200, 20, 20))
    codeflash_output = queue._fetch_batch_from_queue(); result = codeflash_output # 28.9μs -> 16.6μs (74.5% faster)
    total_metrics = sum(len(b.metrics) for b in result)
    total_params = sum(len(b.params) for b in result)
    total_tags = sum(len(b.tags) for b in result)
    # Each batch should not exceed limits
    for b in result:
        pass

def test_queue_size_estimate_smaller_than_actual():
    # Scenario: queue.qsize() is smaller than actual, so loop ends early and leaves items in queue
    # Simulate by putting batches in queue, then removing one before calling _fetch_batch_from_queue
    queue = AsyncLoggingQueue(lambda *a: None)
    batches = [make_batch("run_1", 1, 1, 1) for _ in range(5)]
    for b in batches:
        queue._queue.put(b)
    # Remove one batch to make qsize estimate off
    queue._queue.get()
    codeflash_output = queue._fetch_batch_from_queue(); result = codeflash_output # 9.16μs -> 9.70μs (5.55% slower)

def test_queue_size_estimate_larger_than_actual():
    # Scenario: queue.qsize() is larger than actual, but queue empties before loop ends
    queue = AsyncLoggingQueue(lambda *a: None)
    batches = [make_batch("run_1", 1, 1, 1) for _ in range(3)]
    for b in batches:
        queue._queue.put(b)
    # Monkeypatch qsize to return higher value
    orig_qsize = queue._queue.qsize
    queue._queue.qsize = lambda: 10
    codeflash_output = queue._fetch_batch_from_queue(); result = codeflash_output # 9.00μs -> 9.81μs (8.27% slower)
    # Restore original method
    queue._queue.qsize = orig_qsize
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.
import threading
from queue import Queue
from typing import Callable

# imports
import pytest
from mlflow.utils.async_logging.async_logging_queue import AsyncLoggingQueue


# Dummy classes to mimic mlflow entities
class Metric:
    def __init__(self, key, value, step, timestamp):
        self.key = key
        self.value = value
        self.step = step
        self.timestamp = timestamp

class Param:
    def __init__(self, key, value):
        self.key = key
        self.value = value

class RunTag:
    def __init__(self, key, value):
        self.key = key
        self.value = value

class RunBatch:
    def __init__(self, run_id, metrics=None, params=None, tags=None):
        self.run_id = run_id
        self.metrics = metrics if metrics is not None else []
        self.params = params if params is not None else []
        self.tags = tags if tags is not None else []
        self.child_batches = []

    def add_child_batch(self, batch):
        self.child_batches.append(batch)

class QueueStatus:
    IDLE = "idle"

# Constants
_MAX_ITEMS_PER_BATCH = 1000
_MAX_PARAMS_PER_BATCH = 100
_MAX_TAGS_PER_BATCH = 100
from mlflow.utils.async_logging.async_logging_queue import AsyncLoggingQueue


# Helper to enqueue batches
def enqueue_batches(queue_obj, batches):
    for batch in batches:
        queue_obj._queue.put(batch)

# ---------------------- UNIT TESTS ----------------------

# 1. Basic Test Cases

def test_empty_queue_returns_empty_list():
    # Test that an empty queue returns an empty list
    q = AsyncLoggingQueue(lambda *_: None)
    codeflash_output = q._fetch_batch_from_queue() # 1.67μs -> 1.68μs (0.596% slower)

def test_single_batch_returns_single_batch():
    # Test that a single batch in the queue is returned as a single-element list
    q = AsyncLoggingQueue(lambda *_: None)
    batch = RunBatch("run_1", [Metric("m", 1, 1, 1)], [Param("p", "v")], [RunTag("t", "v")])
    enqueue_batches(q, [batch])
    codeflash_output = q._fetch_batch_from_queue(); result = codeflash_output # 4.89μs -> 6.47μs (24.4% slower)

def test_two_batches_same_runid_merges():
    # Two batches with same run_id should merge if under all limits
    q = AsyncLoggingQueue(lambda *_: None)
    batch1 = RunBatch("run_1", [Metric("m1", 1, 1, 1)], [Param("p1", "v")], [RunTag("t1", "v")])
    batch2 = RunBatch("run_1", [Metric("m2", 2, 2, 2)], [Param("p2", "v")], [RunTag("t2", "v")])
    enqueue_batches(q, [batch1, batch2])
    codeflash_output = q._fetch_batch_from_queue(); result = codeflash_output # 7.80μs -> 8.51μs (8.33% slower)

def test_two_batches_different_runid_not_merged():
    # Two batches with different run_id should not merge
    q = AsyncLoggingQueue(lambda *_: None)
    batch1 = RunBatch("run_1", [Metric("m1", 1, 1, 1)], [Param("p1", "v")], [RunTag("t1", "v")])
    batch2 = RunBatch("run_2", [Metric("m2", 2, 2, 2)], [Param("p2", "v")], [RunTag("t2", "v")])
    enqueue_batches(q, [batch1, batch2])
    codeflash_output = q._fetch_batch_from_queue(); result = codeflash_output # 6.11μs -> 7.62μs (19.9% slower)

def test_batch_merging_stops_at_max_items():
    # Batches should not merge if total items exceed _MAX_ITEMS_PER_BATCH
    q = AsyncLoggingQueue(lambda *_: None)
    # First batch: 999 metrics, second batch: 2 metrics
    batch1 = RunBatch("run_1", [Metric(f"m{i}", i, i, i) for i in range(999)], [], [])
    batch2 = RunBatch("run_1", [Metric("m1000", 1000, 1000, 1000), Metric("m1001", 1001, 1001, 1001)], [], [])
    enqueue_batches(q, [batch1, batch2])
    codeflash_output = q._fetch_batch_from_queue(); result = codeflash_output # 11.5μs -> 8.22μs (39.9% faster)

def test_batch_merging_stops_at_max_params():
    # Batches should not merge if total params exceed _MAX_PARAMS_PER_BATCH
    q = AsyncLoggingQueue(lambda *_: None)
    batch1 = RunBatch("run_1", [], [Param(f"p{i}", "v") for i in range(99)], [])
    batch2 = RunBatch("run_1", [], [Param("p100", "v"), Param("p101", "v")], [])
    enqueue_batches(q, [batch1, batch2])
    codeflash_output = q._fetch_batch_from_queue(); result = codeflash_output # 7.35μs -> 8.06μs (8.82% slower)

def test_batch_merging_stops_at_max_tags():
    # Batches should not merge if total tags exceed _MAX_TAGS_PER_BATCH
    q = AsyncLoggingQueue(lambda *_: None)
    batch1 = RunBatch("run_1", [], [], [RunTag(f"t{i}", "v") for i in range(99)])
    batch2 = RunBatch("run_1", [], [], [RunTag("t100", "v"), RunTag("t101", "v")])
    enqueue_batches(q, [batch1, batch2])
    codeflash_output = q._fetch_batch_from_queue(); result = codeflash_output # 7.08μs -> 7.91μs (10.5% slower)

# 2. Edge Test Cases

def test_batch_merging_exactly_at_limits():
    # Batches merge up to exactly the limit, but not over
    q = AsyncLoggingQueue(lambda *_: None)
    batch1 = RunBatch("run_1",
                      [Metric(f"m{i}", i, i, i) for i in range(500)],
                      [Param(f"p{i}", "v") for i in range(50)],
                      [RunTag(f"t{i}", "v") for i in range(50)])
    batch2 = RunBatch("run_1",
                      [Metric(f"m{i+500}", i+500, i+500, i+500) for i in range(500)],
                      [Param(f"p{i+50}", "v") for i in range(50)],
                      [RunTag(f"t{i+50}", "v") for i in range(50)])
    enqueue_batches(q, [batch1, batch2])
    codeflash_output = q._fetch_batch_from_queue(); result = codeflash_output # 11.1μs -> 8.41μs (31.7% faster)

def test_batch_merging_over_items_limit():
    # If adding batch2 would exceed _MAX_ITEMS_PER_BATCH, should not merge
    q = AsyncLoggingQueue(lambda *_: None)
    batch1 = RunBatch("run_1", [Metric(f"m{i}", i, i, i) for i in range(1000)], [], [])
    batch2 = RunBatch("run_1", [Metric("m1001", 1001, 1001, 1001)], [], [])
    enqueue_batches(q, [batch1, batch2])
    codeflash_output = q._fetch_batch_from_queue(); result = codeflash_output # 11.6μs -> 8.21μs (40.8% faster)

def test_batch_merging_over_params_limit():
    q = AsyncLoggingQueue(lambda *_: None)
    batch1 = RunBatch("run_1", [], [Param(f"p{i}", "v") for i in range(100)], [])
    batch2 = RunBatch("run_1", [], [Param("p101", "v")], [])
    enqueue_batches(q, [batch1, batch2])
    codeflash_output = q._fetch_batch_from_queue(); result = codeflash_output # 7.23μs -> 7.90μs (8.48% slower)

def test_batch_merging_over_tags_limit():
    q = AsyncLoggingQueue(lambda *_: None)
    batch1 = RunBatch("run_1", [], [], [RunTag(f"t{i}", "v") for i in range(100)])
    batch2 = RunBatch("run_1", [], [], [RunTag("t101", "v")])
    enqueue_batches(q, [batch1, batch2])
    codeflash_output = q._fetch_batch_from_queue(); result = codeflash_output # 7.22μs -> 7.94μs (9.05% slower)

def test_merging_stops_when_queue_empty_before_expected():
    # If queue_size is overestimated, should not crash
    q = AsyncLoggingQueue(lambda *_: None)
    batch1 = RunBatch("run_1", [Metric("m1", 1, 1, 1)], [], [])
    enqueue_batches(q, [batch1])
    # Manually fudge qsize to be higher (simulate race condition)
    q._queue.qsize = lambda: 5  # monkeypatch
    codeflash_output = q._fetch_batch_from_queue(); result = codeflash_output # 4.94μs -> 6.20μs (20.4% slower)

def test_merging_multiple_runids_and_limits():
    # Mix of run_ids and limits
    q = AsyncLoggingQueue(lambda *_: None)
    batch1 = RunBatch("run_1", [Metric("m1", 1, 1, 1)], [], [])
    batch2 = RunBatch("run_2", [Metric("m2", 2, 2, 2)], [], [])
    batch3 = RunBatch("run_1", [Metric("m3", 3, 3, 3)], [], [])
    batch4 = RunBatch("run_1", [Metric("m4", 4, 4, 4)], [], [])
    batch5 = RunBatch("run_2", [Metric("m5", 5, 5, 5)], [], [])
    enqueue_batches(q, [batch1, batch2, batch3, batch4, batch5])
    codeflash_output = q._fetch_batch_from_queue(); result = codeflash_output # 10.6μs -> 11.2μs (4.86% slower)

# 3. Large Scale Test Cases

def test_large_batch_merging_under_limits():
    # Merge 10 batches of 100 metrics each (total 1000, at limit)
    q = AsyncLoggingQueue(lambda *_: None)
    batches = [RunBatch("run_1", [Metric(f"m{j}", j, j, j) for j in range(i*100, (i+1)*100)], [], []) for i in range(10)]
    enqueue_batches(q, batches)
    codeflash_output = q._fetch_batch_from_queue(); result = codeflash_output # 41.6μs -> 19.9μs (109% faster)

def test_large_batch_merging_exceeds_items_limit():
    # 11 batches of 100 metrics each (total 1100, should split into 1000 + 100)
    q = AsyncLoggingQueue(lambda *_: None)
    batches = [RunBatch("run_1", [Metric(f"m{j}", j, j, j) for j in range(i*100, (i+1)*100)], [], []) for i in range(11)]
    enqueue_batches(q, batches)
    codeflash_output = q._fetch_batch_from_queue(); result = codeflash_output # 43.6μs -> 21.3μs (105% faster)

def test_large_batch_merging_exceeds_params_limit():
    # 2 batches, first with 99 params, second with 2 params
    q = AsyncLoggingQueue(lambda *_: None)
    batch1 = RunBatch("run_1", [], [Param(f"p{i}", "v") for i in range(99)], [])
    batch2 = RunBatch("run_1", [], [Param("p99", "v"), Param("p100", "v")], [])
    enqueue_batches(q, [batch1, batch2])
    codeflash_output = q._fetch_batch_from_queue(); result = codeflash_output # 7.44μs -> 7.55μs (1.40% slower)

def test_large_batch_merging_exceeds_tags_limit():
    # 2 batches, first with 99 tags, second with 2 tags
    q = AsyncLoggingQueue(lambda *_: None)
    batch1 = RunBatch("run_1", [], [], [RunTag(f"t{i}", "v") for i in range(99)])
    batch2 = RunBatch("run_1", [], [], [RunTag("t99", "v"), RunTag("t100", "v")])
    enqueue_batches(q, [batch1, batch2])
    codeflash_output = q._fetch_batch_from_queue(); result = codeflash_output # 7.02μs -> 7.87μs (10.8% slower)

def test_large_batch_many_runids():
    # 500 batches with alternating run_ids, should not merge across run_ids
    q = AsyncLoggingQueue(lambda *_: None)
    batches = []
    for i in range(500):
        batches.append(RunBatch(f"run_{i%2}", [Metric(f"m{i}", i, i, i)], [], []))
    enqueue_batches(q, batches)
    codeflash_output = q._fetch_batch_from_queue(); result = codeflash_output # 397μs -> 377μs (5.41% faster)
    for i, batch in enumerate(result):
        pass

def test_large_batch_merging_with_mixed_limits():
    # 10 batches, each with 10 metrics, 10 params, 10 tags, total 100 each, should merge up to limits
    q = AsyncLoggingQueue(lambda *_: None)
    batches = []
    for i in range(10):
        batches.append(RunBatch("run_1",
                                [Metric(f"m{i*10+j}", i*10+j, i*10+j, i*10+j) for j in range(10)],
                                [Param(f"p{i*10+j}", "v") for j in range(10)],
                                [RunTag(f"t{i*10+j}", "v") for j in range(10)]))
    enqueue_batches(q, batches)
    codeflash_output = q._fetch_batch_from_queue(); result = codeflash_output # 25.9μs -> 18.5μs (40.1% faster)

def test_large_batch_merging_exceeds_mixed_limits():
    # 11 batches, each with 10 metrics, 10 params, 10 tags, total 110 each, should split
    q = AsyncLoggingQueue(lambda *_: None)
    batches = []
    for i in range(11):
        batches.append(RunBatch("run_1",
                                [Metric(f"m{i*10+j}", i*10+j, i*10+j, i*10+j) for j in range(10)],
                                [Param(f"p{i*10+j}", "v") for j in range(10)],
                                [RunTag(f"t{i*10+j}", "v") for j in range(10)]))
    enqueue_batches(q, batches)
    codeflash_output = q._fetch_batch_from_queue(); result = codeflash_output # 27.6μs -> 20.1μs (37.1% faster)
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.
from mlflow.utils.async_logging.async_logging_queue import AsyncLoggingQueue

def test_AsyncLoggingQueue__fetch_batch_from_queue():
    AsyncLoggingQueue._fetch_batch_from_queue(AsyncLoggingQueue(lambda *a: None))

To edit these changes git checkout codeflash/optimize-AsyncLoggingQueue._fetch_batch_from_queue-mhx3f6we and push.

Codeflash Static Badge

The optimized code achieves a **20% speedup** by replacing inefficient queue polling with a more performant batch draining approach and eliminating redundant computations.

**Key optimizations:**

1. **Queue draining strategy**: Instead of using `queue.qsize()` estimate and iterative `queue.get()` calls, the code now drains the entire queue upfront using `get_nowait()` in a try/except loop. This eliminates the unreliable queue size estimation and reduces queue locking overhead from multiple individual operations to a single batch drain.

2. **Precomputed length calculations**: The original code repeatedly computed `len(merged_batch.metrics + merged_batch.params + merged_batch.tags)` which creates temporary concatenated lists. The optimization precomputes individual lengths once per iteration and reuses them, avoiding expensive list concatenation operations.

3. **Reduced queue interaction**: The original approach made multiple `queue.empty()` and `queue.get()` calls within the loop, each requiring thread synchronization. The optimized version minimizes this to a single draining phase, reducing locking contention.

**Performance characteristics from test results:**
- **Large batch scenarios** show the biggest gains (33-118% faster) when merging many small batches, as the optimization eliminates O(n²) queue polling overhead
- **Small batch cases** may be slightly slower (3-30%) due to the overhead of the try/catch mechanism, but this is negligible in absolute terms
- The optimization is particularly effective for workloads with high queue throughput where batches frequently hit size limits

The changes maintain identical functionality while significantly improving performance for typical async logging scenarios where multiple batches need to be processed and merged efficiently.
@codeflash-ai codeflash-ai bot requested a review from mashraf-222 November 13, 2025 07:14
@codeflash-ai codeflash-ai bot added ⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash labels Nov 13, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant