TALOS v0.5 -- SatNOGS Coexistence¶
Date: April 2026 Scope: SatNOGS integration, hardware mutex design, IQ capture pipeline, observation submission Author: Engineering (automated review)
1. Coexistence Problem¶
Many TALOS ground stations also participate in the SatNOGS Network -- a global open-source satellite observation network. A typical station runs satnogs-client to accept observation jobs from the SatNOGS scheduler and talos-agent for real-time tracking campaigns. Both need exclusive access to the same hardware (rotator controller and radio).
If both clients command the antenna simultaneously:
- The rotator receives conflicting pointing commands.
- The radio receives conflicting frequency/mode settings.
- Observations are corrupted for both systems.
A hardware mutex is required to ensure only one client controls the antenna at any given time.
2. SatNOGS API v2 Endpoints¶
The SatNOGS Network API v2 provides the following endpoints relevant to coexistence:
2.1 Station Configuration¶
| Endpoint | Method | Description |
|---|---|---|
/api/stations/ |
GET | List registered stations |
/api/stations/{id}/ |
GET | Station details (lat, lon, alt, capabilities) |
2.2 Observation Management¶
| Endpoint | Method | Description |
|---|---|---|
/api/observations/ |
GET | List observations (filterable by station, satellite, status) |
/api/observations/ |
POST | Submit a new observation |
/api/observations/{id}/ |
GET | Observation details |
2.3 Job Scheduling¶
| Endpoint | Method | Description |
|---|---|---|
/api/jobs/ |
GET | Pending jobs for a station (filter by ground_station param) |
2.4 Artifacts (New in v2)¶
| Endpoint | Method | Description |
|---|---|---|
/api/artifacts/ |
POST | Upload observation artifacts (audio, waterfall, decoded data) |
2.5 Authentication¶
SatNOGS API v2 uses token-based authentication:
The token is configured per station in the SatNOGS client settings.
3. Hardware Mutex Design¶
The mutex must coordinate between talos-agent and satnogs-client on the same Raspberry Pi. Two mechanisms work in combination: a local lock file and MQTT-based coordination for remote visibility.
3.1 Lock File Protocol¶
A lock file at a well-known path indicates which client currently holds hardware control.
Lock file path: /var/run/talos/hardware.lock
Lock file contents:
{
"owner": "talos-agent",
"campaign_id": "campaign-abc-123",
"acquired_at": "2026-04-03T10:15:00Z",
"expires_at": "2026-04-03T10:30:00Z",
"pid": 12345
}
3.2 Lock Acquisition¶
import fcntl
import json
import os
from datetime import datetime, timedelta
from pathlib import Path
LOCK_PATH = Path("/var/run/talos/hardware.lock")
class HardwareMutex:
"""Exclusive hardware access coordination between TALOS and SatNOGS."""
def __init__(self, owner: str, lock_path: Path = LOCK_PATH):
self._owner = owner
self._lock_path = lock_path
self._lock_fd: int | None = None
def acquire(self, campaign_id: str | None = None,
duration_minutes: int = 15) -> bool:
"""Attempt to acquire the hardware lock.
Returns True if acquired, False if another client holds the lock.
"""
self._lock_path.parent.mkdir(parents=True, exist_ok=True)
try:
fd = os.open(str(self._lock_path), os.O_CREAT | os.O_RDWR)
fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
# Check if an existing lock is still valid
existing = self._read_lock(fd)
if existing and not self._is_expired(existing):
if existing["owner"] != self._owner:
fcntl.flock(fd, fcntl.LOCK_UN)
os.close(fd)
return False
# Write our lock
lock_data = {
"owner": self._owner,
"campaign_id": campaign_id,
"acquired_at": datetime.utcnow().isoformat() + "Z",
"expires_at": (datetime.utcnow() + timedelta(minutes=duration_minutes)).isoformat() + "Z",
"pid": os.getpid(),
}
os.ftruncate(fd, 0)
os.lseek(fd, 0, os.SEEK_SET)
os.write(fd, json.dumps(lock_data).encode())
self._lock_fd = fd
return True
except (BlockingIOError, OSError):
return False
def release(self) -> None:
"""Release the hardware lock."""
if self._lock_fd is not None:
try:
os.ftruncate(self._lock_fd, 0)
fcntl.flock(self._lock_fd, fcntl.LOCK_UN)
os.close(self._lock_fd)
except OSError:
pass
self._lock_fd = None
def is_locked(self) -> bool:
"""Check if the hardware is currently locked by any client."""
if not self._lock_path.exists():
return False
try:
with open(self._lock_path) as f:
data = json.load(f)
return not self._is_expired(data)
except (json.JSONDecodeError, KeyError, OSError):
return False
@staticmethod
def _is_expired(lock_data: dict) -> bool:
expires = datetime.fromisoformat(lock_data["expires_at"].rstrip("Z"))
return datetime.utcnow() > expires
@staticmethod
def _read_lock(fd: int) -> dict | None:
os.lseek(fd, 0, os.SEEK_SET)
data = os.read(fd, 4096)
if not data:
return None
try:
return json.loads(data)
except json.JSONDecodeError:
return None
3.3 MQTT Lock Coordination¶
In addition to the local lock file, the agent publishes lock status over MQTT for remote monitoring:
# Topic: talos/{org_slug}/gs/{station_id}/hardware/lock
{
"owner": "talos-agent",
"campaign_id": "campaign-abc-123",
"acquired_at": "2026-04-03T10:15:00Z",
"expires_at": "2026-04-03T10:30:00Z",
"status": "locked"
}
# When released:
{
"owner": null,
"campaign_id": null,
"status": "unlocked"
}
The Core API can display hardware lock status in the station management dashboard.
3.4 SatNOGS Client Wrapper¶
A wrapper script checks the hardware lock before satnogs-client starts an observation:
#!/bin/bash
# /usr/local/bin/satnogs-observe-wrapper.sh
# Called by satnogs-client before starting an observation
LOCK_FILE="/var/run/talos/hardware.lock"
if [ -f "$LOCK_FILE" ]; then
EXPIRES=$(python3 -c "
import json, sys
from datetime import datetime
with open('$LOCK_FILE') as f:
data = json.load(f)
expires = datetime.fromisoformat(data['expires_at'].rstrip('Z'))
if datetime.utcnow() < expires:
print('locked')
sys.exit(1)
print('expired')
" 2>/dev/null)
if [ "$EXPIRES" = "locked" ]; then
echo "Hardware locked by TALOS. Skipping observation."
exit 1
fi
fi
# Hardware is free; proceed with SatNOGS observation
exec satnogs-client "$@"
3.5 Coordination Timeline¶
Time Event
----- --------------------------------------------------
10:00 TALOS campaign starts. Agent acquires hardware lock.
10:00 SatNOGS job queued for 10:05. Wrapper checks lock -> locked. Job deferred.
10:15 TALOS campaign pass ends. Agent releases lock.
10:15 Lock status published to MQTT: "unlocked"
10:15 SatNOGS job retried. Wrapper checks lock -> unlocked. Observation proceeds.
10:25 SatNOGS observation complete. Hardware free.
4. IQ Capture Pipeline¶
IQ (In-phase/Quadrature) capture records the raw radio signal for offline analysis -- waterfall generation, signal decoding, and submission to SatNOGS as observation artifacts. TALOS needs a minimal IQ pipeline that works without GNU Radio.
4.1 Architecture¶
SoapySDR (device abstraction)
|
v
NumPy buffer (IQ samples)
|
+-- FFT -> Power spectrum
| |
| v
| Waterfall accumulator
| |
| v
| matplotlib -> waterfall PNG
|
+-- Raw IQ -> compressed file (optional)
4.2 SoapySDR Device Abstraction¶
SoapySDR provides a vendor-neutral API for SDR hardware. Python bindings are available via SoapySDR (pip/conda).
Supported devices:
| Device | SoapySDR Driver | Typical Use |
|---|---|---|
| RTL-SDR | rtlsdr |
Budget receiver (~$25) |
| USRP B200mini | uhd |
Research-grade transceiver |
| HackRF One | hackrf |
Wideband experimentation |
| Airspy Mini | airspy |
VHF/UHF receiver |
| LimeSDR | lime |
Full-duplex transceiver |
4.3 Capture Implementation¶
import SoapySDR
import numpy as np
from datetime import datetime
class IQCapture:
"""Minimal IQ capture using SoapySDR."""
def __init__(self, device_args: str = "", sample_rate: float = 2.048e6,
center_freq: float = 437.0e6, gain: float = 40.0):
self._device_args = device_args
self._sample_rate = sample_rate
self._center_freq = center_freq
self._gain = gain
self._sdr = None
self._stream = None
def start(self) -> None:
"""Initialize SDR device and start streaming."""
self._sdr = SoapySDR.Device(self._device_args)
self._sdr.setSampleRate(SoapySDR.SOAPY_SDR_RX, 0, self._sample_rate)
self._sdr.setFrequency(SoapySDR.SOAPY_SDR_RX, 0, self._center_freq)
self._sdr.setGain(SoapySDR.SOAPY_SDR_RX, 0, self._gain)
self._stream = self._sdr.setupStream(SoapySDR.SOAPY_SDR_RX, SoapySDR.SOAPY_SDR_CF32)
self._sdr.activateStream(self._stream)
def read_samples(self, num_samples: int = 1024) -> np.ndarray:
"""Read a buffer of IQ samples."""
buf = np.zeros(num_samples, dtype=np.complex64)
sr = self._sdr.readStream(self._stream, [buf], num_samples)
if sr.ret > 0:
return buf[:sr.ret]
return np.array([], dtype=np.complex64)
def stop(self) -> None:
"""Stop streaming and close device."""
if self._stream:
self._sdr.deactivateStream(self._stream)
self._sdr.closeStream(self._stream)
self._sdr = None
self._stream = None
4.4 Waterfall Generation¶
import numpy as np
import matplotlib
matplotlib.use("Agg") # Non-interactive backend
import matplotlib.pyplot as plt
class WaterfallGenerator:
"""Generate waterfall spectrograms from IQ data."""
def __init__(self, fft_size: int = 1024, sample_rate: float = 2.048e6):
self._fft_size = fft_size
self._sample_rate = sample_rate
self._spectra: list[np.ndarray] = []
def add_samples(self, iq_samples: np.ndarray) -> None:
"""Process IQ samples into frequency-domain spectra."""
# Split into FFT-sized chunks
num_chunks = len(iq_samples) // self._fft_size
for i in range(num_chunks):
chunk = iq_samples[i * self._fft_size : (i + 1) * self._fft_size]
# Windowed FFT
windowed = chunk * np.blackman(self._fft_size)
spectrum = np.fft.fftshift(np.fft.fft(windowed))
power_db = 20 * np.log10(np.abs(spectrum) + 1e-12)
self._spectra.append(power_db)
def save_png(self, path: str, title: str = "TALOS Waterfall") -> None:
"""Save waterfall as a PNG image."""
if not self._spectra:
return
waterfall_data = np.array(self._spectra)
fig, ax = plt.subplots(figsize=(10, 6))
freqs = np.linspace(
-self._sample_rate / 2,
self._sample_rate / 2,
self._fft_size
) / 1e3 # kHz
ax.imshow(
waterfall_data,
aspect="auto",
extent=[freqs[0], freqs[-1], len(self._spectra), 0],
cmap="viridis",
vmin=np.percentile(waterfall_data, 5),
vmax=np.percentile(waterfall_data, 95),
)
ax.set_xlabel("Frequency Offset (kHz)")
ax.set_ylabel("Time (FFT frames)")
ax.set_title(title)
fig.savefig(path, dpi=150, bbox_inches="tight")
plt.close(fig)
4.5 Integration with TALOS Agent¶
The IQ capture runs as an optional component within talos-agent:
class AgentIQCapture:
"""IQ capture integration for talos-agent."""
def __init__(self, config: AgentConfig):
self._enabled = config.iq_capture_enabled
self._capture = IQCapture(
device_args=config.sdr_device_args,
sample_rate=config.sdr_sample_rate,
) if self._enabled else None
self._waterfall = WaterfallGenerator() if self._enabled else None
async def start_capture(self, frequency_hz: float, campaign_id: str) -> None:
"""Begin IQ capture for an active campaign."""
if not self._enabled or self._capture is None:
return
self._capture._center_freq = frequency_hz
self._capture.start()
async def stop_capture(self, campaign_id: str) -> str | None:
"""Stop capture and generate waterfall. Returns PNG path."""
if not self._enabled or self._capture is None:
return None
self._capture.stop()
if self._waterfall and self._waterfall._spectra:
path = f"/tmp/talos-waterfall-{campaign_id}.png"
self._waterfall.save_png(path, title=f"Campaign {campaign_id}")
return path
return None
5. SatNOGS Observation Submission¶
When TALOS captures data during a pass, it can optionally submit the observation to the SatNOGS Network for community benefit.
5.1 Submission Flow¶
TALOS pass complete
|
+-- Waterfall PNG generated
+-- Pass metadata collected (station, satellite, start/end, frequency)
|
v
POST /api/observations/
Headers: Authorization: Token <satnogs_token>
Body: {
"ground_station": 1234,
"transmitter_uuid": "abc-def-...",
"start": "2026-04-03T10:15:00Z",
"end": "2026-04-03T10:25:00Z"
}
|
v
POST /api/artifacts/
Headers: Authorization: Token <satnogs_token>
Body: multipart/form-data with waterfall PNG
5.2 Submission Client¶
import httpx
class SatNOGSSubmitter:
"""Submit TALOS observations to SatNOGS Network."""
BASE_URL = "https://network.satnogs.org"
def __init__(self, api_token: str):
self._client = httpx.AsyncClient(
base_url=self.BASE_URL,
headers={"Authorization": f"Token {api_token}"},
timeout=30.0,
)
async def submit_observation(self, station_id: int, transmitter_uuid: str,
start: datetime, end: datetime) -> int | None:
"""Submit an observation. Returns observation ID on success."""
try:
response = await self._client.post("/api/observations/", json={
"ground_station": station_id,
"transmitter_uuid": transmitter_uuid,
"start": start.isoformat() + "Z",
"end": end.isoformat() + "Z",
})
response.raise_for_status()
return response.json().get("id")
except httpx.HTTPError as exc:
logger.warning("SatNOGS submission failed: %s", exc)
return None
async def upload_waterfall(self, observation_id: int, png_path: str) -> bool:
"""Upload a waterfall PNG as an observation artifact."""
try:
with open(png_path, "rb") as f:
response = await self._client.post(
"/api/artifacts/",
data={"observation": observation_id},
files={"file": ("waterfall.png", f, "image/png")},
)
response.raise_for_status()
return True
except (httpx.HTTPError, OSError) as exc:
logger.warning("Waterfall upload failed: %s", exc)
return False
6. Idle-Time SatNOGS Job Acceptance¶
When TALOS has no active campaigns on a station, the station hardware sits idle. During these windows, TALOS can accept SatNOGS observation jobs on behalf of the station, contributing to the global network.
6.1 Idle Detection¶
async def check_station_idle(station_id: str, org_slug: str) -> bool:
"""Check if a station has no active or upcoming TALOS campaigns."""
assignments = await get_active_assignments(station_id, org_slug)
scheduled = await get_upcoming_assignments(station_id, org_slug, hours=1)
return len(assignments) == 0 and len(scheduled) == 0
6.2 Job Polling¶
async def poll_satnogs_jobs(station_id: int, api_token: str) -> list[dict]:
"""Check for pending SatNOGS jobs for this station."""
async with httpx.AsyncClient() as client:
response = await client.get(
"https://network.satnogs.org/api/jobs/",
params={"ground_station": station_id},
headers={"Authorization": f"Token {api_token}"},
)
response.raise_for_status()
return response.json()
6.3 Acceptance Logic¶
Every 60 seconds:
1. Check if station is idle (no TALOS campaigns).
2. If idle, poll SatNOGS for pending jobs.
3. For each job:
a. Verify hardware lock is free.
b. Verify no TALOS campaign starts within the job's time window + 2 min buffer.
c. If safe, accept the job:
- Acquire hardware lock with expiry = job end time.
- Configure rotator and radio per job parameters.
- Start IQ capture.
- On completion, release lock, submit observation + waterfall.
6.4 Configuration¶
# talos-agent environment variables
TALOS_SATNOGS_ENABLED=true
TALOS_SATNOGS_STATION_ID=1234
TALOS_SATNOGS_API_TOKEN=<token>
TALOS_SATNOGS_IDLE_POLL_INTERVAL=60 # seconds
TALOS_SATNOGS_BUFFER_MINUTES=2 # gap before TALOS campaign
7. Priority and Preemption¶
TALOS campaigns always take priority over SatNOGS observations. If a TALOS campaign is scheduled to start and a SatNOGS observation is in progress:
- The agent sends a graceful stop to the SatNOGS observation (if possible).
- The IQ capture is stopped and a partial waterfall is generated.
- The hardware lock is released and immediately reacquired for TALOS.
- The partial observation is submitted to SatNOGS with a note indicating early termination.
This ensures TALOS campaigns are never delayed by SatNOGS observations.
8. Testing Strategy¶
8.1 Unit Tests¶
- Hardware mutex acquire/release/expiry logic.
- Lock file read/write/corruption handling.
- SatNOGS API client mocking (jobs, submissions, artifacts).
- IQ capture sample processing and FFT correctness.
- Waterfall generation with synthetic data.
8.2 Integration Tests¶
- Two processes competing for the hardware lock.
- MQTT lock status publication and consumption.
- SatNOGS wrapper script lock checking.
- End-to-end idle detection and job acceptance (mocked API).
8.3 Hardware-in-the-Loop Tests¶
- Actual SoapySDR device open/close on RTL-SDR.
- Rotator lock contention with real Hamlib.
- Full capture-to-waterfall pipeline on Raspberry Pi.
9. Implementation Plan¶
| Phase | Scope | Effort |
|---|---|---|
| Phase 1 | Hardware mutex (lock file + MQTT) | 2-3 days |
| Phase 2 | SatNOGS wrapper script + integration test | 1-2 days |
| Phase 3 | IQ capture (SoapySDR + NumPy FFT + waterfall) | 3-5 days |
| Phase 4 | SatNOGS observation submission | 2-3 days |
| Phase 5 | Idle-time job acceptance | 2-3 days |
| Total | 10-16 days |
Phase 1-2 solve the coexistence problem. Phases 3-5 add value by contributing TALOS observations back to the SatNOGS community.
Summary¶
SatNOGS coexistence is solved by a two-layer hardware mutex (lock file + MQTT) that gives TALOS campaigns priority while allowing SatNOGS observations during idle time. The IQ capture pipeline (SoapySDR + NumPy FFT) avoids the GNU Radio dependency while providing waterfall generation for observation artifacts. The idle-time job acceptance loop transforms every TALOS station into a contributing member of the SatNOGS network when not actively tracking, maximizing hardware utilization across both systems.