Skip to content

Commit cd61927

Browse files
authored
Merge pull request #1026 from cartpauj/feat/faster-balloon-detection-with-ka9q
feat: faster balloon detection with ka9q using async
2 parents 8f890f0 + 2b3023d commit cd61927

File tree

6 files changed

+607
-39
lines changed

6 files changed

+607
-39
lines changed

auto_rx/auto_rx.py

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import traceback
2828
import os
2929
from dateutil.parser import parse
30-
from queue import Queue
30+
from queue import Queue, Empty
3131

3232
if sys.version_info < (3, 6):
3333
print("CRITICAL - radiosonde_auto_rx requires Python 3.6 or newer!")
@@ -183,6 +183,7 @@ def start_scanner():
183183
wideband_sondes=config["wideband_sondes"],
184184
temporary_block_list=temporary_block_list,
185185
temporary_block_time=config["temporary_block_time"],
186+
max_async_scan_workers=config["max_async_scan_workers"],
186187
)
187188

188189
# Add a reference into the sdr_list entry
@@ -1103,14 +1104,28 @@ def main():
11031104
if args.type != None:
11041105
handle_scan_results()
11051106

1106-
# Loop.
1107+
# Loop - Event-driven processing with periodic maintenance
11071108
while True:
1108-
# Check for finished tasks.
1109-
clean_task_list()
1110-
# Handle any new scan results.
1109+
# Wait for new scan results or timeout after 2 seconds
1110+
# This allows immediate response when results arrive while still doing periodic cleanup
1111+
try:
1112+
# Block until result arrives or timeout
1113+
result = autorx.scan_results.get(timeout=2.0)
1114+
# Put it back for handle_scan_results() to process
1115+
# Note: This get/put pattern allows us to block-wait for queue activity while
1116+
# maintaining handle_scan_results()'s existing logic (which checks qsize and processes all items).
1117+
# Alternative approach would be to refactor handle_scan_results to accept items directly.
1118+
autorx.scan_results.put(result)
1119+
except Empty:
1120+
# Timeout - normal condition when no scan results for 2s
1121+
pass
1122+
1123+
# Process any scan results in the queue (including the one we just got, if any)
1124+
# handle_scan_results() checks qsize and processes all available results
11111125
handle_scan_results()
1112-
# Sleep a little bit.
1113-
time.sleep(2)
1126+
1127+
# Check for finished tasks
1128+
clean_task_list()
11141129

11151130
if len(autorx.sdr_list) == 0:
11161131
# No Functioning SDRs!

auto_rx/autorx/config.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ def read_auto_rx_config(filename, no_sdr_test=False):
139139
"temporary_block_time": 60,
140140
"rs41_drift_tweak": False,
141141
"decoder_stats": False,
142+
"max_async_scan_workers": 4,
142143
"ngp_tweak": False,
143144
# Rotator Settings
144145
"enable_rotator": False,
@@ -370,6 +371,16 @@ def read_auto_rx_config(filename, no_sdr_test=False):
370371
"advanced", "synchronous_upload"
371372
)
372373

374+
# Max async scan workers - validate and cap to reasonable limits
375+
_max_workers = config.getint("advanced", "max_async_scan_workers")
376+
if _max_workers < 1:
377+
logging.warning(f"Config - max_async_scan_workers must be at least 1, setting to 1")
378+
_max_workers = 1
379+
elif _max_workers > 32:
380+
logging.warning(f"Config - max_async_scan_workers capped at 32 (was {_max_workers})")
381+
_max_workers = 32
382+
auto_rx_config["max_async_scan_workers"] = _max_workers
383+
373384
# Rotator Settings
374385
auto_rx_config["rotator_enabled"] = config.getboolean(
375386
"rotator", "rotator_enabled"

auto_rx/autorx/scan.py

Lines changed: 96 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,14 @@
2525
)
2626
from .sdr_wrappers import test_sdr, reset_sdr, get_sdr_name, get_sdr_iq_cmd, get_sdr_fm_cmd, get_power_spectrum, shutdown_sdr
2727

28+
# Import async scanning for concurrent peak detection
29+
try:
30+
from .scan_async import run_async_scan
31+
ASYNC_SCAN_AVAILABLE = True
32+
except ImportError:
33+
ASYNC_SCAN_AVAILABLE = False
34+
logging.warning("Async scanning not available - falling back to sequential scanning")
35+
2836

2937
try:
3038
from .web import flask_emit_event
@@ -680,7 +688,8 @@ def __init__(
680688
temporary_block_list={},
681689
temporary_block_time=60,
682690
ngp_tweak=False,
683-
wideband_sondes=False
691+
wideband_sondes=False,
692+
max_async_scan_workers=4
684693
):
685694
"""Initialise a Sonde Scanner Object.
686695
@@ -770,6 +779,7 @@ def __init__(
770779
self.callback = callback
771780
self.save_detection_audio = save_detection_audio
772781
self.wideband_sondes = wideband_sondes
782+
self.max_async_scan_workers = max_async_scan_workers
773783

774784
# Temporary block list.
775785
self.temporary_block_list = temporary_block_list.copy()
@@ -1118,44 +1128,98 @@ def sonde_search(self, first_only=False):
11181128
)
11191129

11201130
# Run rs_detect on each peak frequency, to determine if there is a sonde there.
1121-
for freq in peak_frequencies:
1131+
# OPTIMIZATION: Use concurrent async scanning ONLY with KA9Q-radio
1132+
# KA9Q provides virtual SDR channels that can actually scan concurrently
1133+
if ASYNC_SCAN_AVAILABLE and self.sdr_type == "KA9Q" and len(peak_frequencies) > 1:
1134+
try:
1135+
import os
1136+
cpu_count = os.cpu_count() or 1
1137+
1138+
# With KA9Q, we can use multiple virtual channels concurrently
1139+
# The scanner creates temporary KA9Q channels that don't consume task slots
1140+
# Cap concurrency to min of: configured max, CPU cores, and number of peaks
1141+
max_concurrent = min(
1142+
self.max_async_scan_workers,
1143+
cpu_count,
1144+
len(peak_frequencies)
1145+
)
11221146

1123-
_freq = float(freq)
1147+
self.log_info(f"KA9Q: Using concurrent peak detection with {max_concurrent} workers")
1148+
1149+
detections = run_async_scan(
1150+
peak_frequencies=peak_frequencies,
1151+
max_concurrent=max_concurrent,
1152+
rs_path=self.rs_path,
1153+
dwell_time=self.detect_dwell_time,
1154+
sdr_type=self.sdr_type,
1155+
sdr_hostname=self.sdr_hostname,
1156+
sdr_port=self.sdr_port,
1157+
ss_iq_path=self.ss_iq_path,
1158+
rtl_fm_path=self.rtl_fm_path,
1159+
rtl_device_idx=self.rtl_device_idx,
1160+
ppm=self.ppm,
1161+
gain=self.gain,
1162+
bias=self.bias,
1163+
save_detection_audio=self.save_detection_audio,
1164+
wideband_sondes=self.wideband_sondes
1165+
)
11241166

1125-
# Exit opportunity.
1126-
if self.sonde_scanner_running == False:
1127-
return []
1167+
# Process results
1168+
for _freq, detected in detections:
1169+
if self.sonde_scanner_running == False:
1170+
return []
11281171

1129-
(detected, offset_est) = detect_sonde(
1130-
_freq,
1131-
sdr_type=self.sdr_type,
1132-
sdr_hostname=self.sdr_hostname,
1133-
sdr_port=self.sdr_port,
1134-
ss_iq_path = self.ss_iq_path,
1135-
rtl_fm_path=self.rtl_fm_path,
1136-
rtl_device_idx=self.rtl_device_idx,
1137-
ppm=self.ppm,
1138-
gain=self.gain,
1139-
bias=self.bias,
1140-
dwell_time=self.detect_dwell_time,
1141-
save_detection_audio=self.save_detection_audio,
1142-
wideband_sondes=self.wideband_sondes
1143-
)
1172+
_search_results.append([_freq, detected])
1173+
self.send_to_callback([[_freq, detected]])
1174+
1175+
if first_only:
1176+
return _search_results
1177+
1178+
except Exception as e:
1179+
import traceback
1180+
self.log_error(f"Async scanning failed: {e}, falling back to sequential")
1181+
self.log_debug(f"Async scan traceback: {traceback.format_exc()}")
1182+
1183+
# Standard sequential scanning (for RTLSDR, SpyServer, or single peaks)
1184+
else:
1185+
for freq in peak_frequencies:
1186+
1187+
_freq = float(freq)
1188+
1189+
# Exit opportunity.
1190+
if self.sonde_scanner_running == False:
1191+
return []
1192+
1193+
(detected, offset_est) = detect_sonde(
1194+
_freq,
1195+
sdr_type=self.sdr_type,
1196+
sdr_hostname=self.sdr_hostname,
1197+
sdr_port=self.sdr_port,
1198+
ss_iq_path = self.ss_iq_path,
1199+
rtl_fm_path=self.rtl_fm_path,
1200+
rtl_device_idx=self.rtl_device_idx,
1201+
ppm=self.ppm,
1202+
gain=self.gain,
1203+
bias=self.bias,
1204+
dwell_time=self.detect_dwell_time,
1205+
save_detection_audio=self.save_detection_audio,
1206+
wideband_sondes=self.wideband_sondes
1207+
)
11441208

1145-
if detected != None:
1146-
# Quantize the detected frequency (with offset) to 1 kHz
1147-
_freq = round((_freq + offset_est) / 1000.0) * 1000.0
1209+
if detected != None:
1210+
# Quantize the detected frequency (with offset) to 1 kHz
1211+
_freq = round((_freq + offset_est) / 1000.0) * 1000.0
11481212

1149-
# Add a detected sonde to the output array
1150-
_search_results.append([_freq, detected])
1213+
# Add a detected sonde to the output array
1214+
_search_results.append([_freq, detected])
11511215

1152-
# Immediately send this result to the callback.
1153-
self.send_to_callback([[_freq, detected]])
1154-
# If we only want the first detected sonde, then return now.
1155-
if first_only:
1156-
return _search_results
1216+
# Immediately send this result to the callback.
1217+
self.send_to_callback([[_freq, detected]])
1218+
# If we only want the first detected sonde, then return now.
1219+
if first_only:
1220+
return _search_results
11571221

1158-
# Otherwise, we continue....
1222+
# Otherwise, we continue....
11591223

11601224
if len(_search_results) == 0:
11611225
self.log_debug("No sondes detected.")

0 commit comments

Comments
 (0)