TALOS v0.5 -- Data Resilience¶
Date: April 2026 Scope: TLE source redundancy, CelesTrak fallback, telemetry persistence with TimescaleDB, coverage gate Author: Engineering (automated review)
1. TLE Source Redundancy¶
TALOS currently sources all TLE (Two-Line Element) data exclusively from the SatNOGS DB API. If SatNOGS DB is unavailable, the system falls back to a stale response cache, but there is no secondary live source. This section designs a multi-source TLE pipeline with automatic failover.
1.1 Current TLE Flow¶
SatNOGS DB API (/api/tle/)
|
v
SatNOGSClient (shared/satnogs_client.py)
|-- Response caching (TTL-based)
|-- Stale fallback on connection error
|
v
TLEManager (director/tle_manager.py)
|
v
Director propagation loop
If SatNOGS DB returns an error and the cache is empty (first startup, or cache cleared), the Director has no TLEs and cannot propagate. The stale fallback mitigates short outages but is not a substitute for a secondary source.
1.2 CelesTrak GP API¶
CelesTrak provides the General Perturbations (GP) API, which serves the same orbital data in multiple formats.
Endpoint: https://celestrak.org/NORAD/elements/gp.php
Query parameters:
| Parameter | Description | Example |
|---|---|---|
CATNR |
NORAD catalog number | CATNR=25544 (ISS) |
NAME |
Satellite name (partial match) | NAME=ISS |
GROUP |
Predefined group | GROUP=stations |
FORMAT |
Output format | FORMAT=json (OMM JSON), FORMAT=tle (legacy 2-line), FORMAT=xml (OMM XML) |
Example request:
Example OMM JSON response:
[
{
"OBJECT_NAME": "ISS (ZARYA)",
"OBJECT_ID": "1998-067A",
"EPOCH": "2026-04-03T12:00:00.000000",
"MEAN_MOTION": 15.50000000,
"ECCENTRICITY": 0.0001234,
"INCLINATION": 51.6400,
"RA_OF_ASC_NODE": 120.0000,
"ARG_OF_PERICENTER": 90.0000,
"MEAN_ANOMALY": 270.0000,
"NORAD_CAT_ID": 25544,
"ELEMENT_SET_NO": 999,
"REV_AT_EPOCH": 45000,
"BSTAR": 0.00001234,
"MEAN_MOTION_DOT": 0.00000000,
"MEAN_MOTION_DDOT": 0.0000000000000
}
]
1.3 Critical: Catalog Number Overflow (July 2026)¶
NORAD catalog numbers will exceed 5 digits (99999) in approximately July 2026. The legacy TLE format uses a fixed-width 5-character field for the catalog number and cannot represent 6-digit numbers.
Impact on TALOS:
- Any satellite cataloged after the overflow will have a catalog number > 99999.
- Legacy TLE format cannot encode these numbers.
- SatNOGS DB may adopt the alpha-5 encoding scheme or switch to OMM.
- CelesTrak's OMM JSON format already supports arbitrary catalog numbers via the
NORAD_CAT_IDinteger field.
Mitigation: Migrate TLE storage from raw 2-line strings to OMM JSON or structured objects that use integer catalog numbers. This must be completed before July 2026.
1.4 TLE Source Priority Cascade¶
Priority 1: SatNOGS DB API (/api/tle/)
|
|-- Success? Use this TLE.
|-- Failure? Try next source.
|
v
Priority 2: CelesTrak GP API (gp.php?FORMAT=json)
|
|-- Success? Use this TLE. Log warning about SatNOGS unavailability.
|-- Failure? Try next source.
|
v
Priority 3: Stale cache
|
|-- Cache hit? Use stale TLE. Log error about all sources unavailable.
|-- Cache miss? No TLE available. Log critical error.
1.5 CelesTrak Client Implementation¶
import httpx
from datetime import datetime
class CelesTrakClient:
"""Fallback TLE source using CelesTrak GP API."""
BASE_URL = "https://celestrak.org/NORAD/elements/gp.php"
TIMEOUT = 10.0 # seconds
def __init__(self):
self._client = httpx.AsyncClient(timeout=self.TIMEOUT)
self._cache: dict[int, tuple[dict, datetime]] = {}
async def get_tle_by_norad_id(self, norad_id: int) -> dict | None:
"""Fetch OMM data for a single satellite by NORAD catalog number."""
try:
response = await self._client.get(
self.BASE_URL,
params={"CATNR": norad_id, "FORMAT": "json"},
)
response.raise_for_status()
data = response.json()
if data and len(data) > 0:
self._cache[norad_id] = (data[0], datetime.utcnow())
return data[0]
return None
except (httpx.HTTPError, ValueError) as exc:
logger.warning("CelesTrak fetch failed for NORAD %d: %s", norad_id, exc)
return None
async def get_tle_by_group(self, group: str) -> list[dict]:
"""Fetch OMM data for a predefined satellite group."""
try:
response = await self._client.get(
self.BASE_URL,
params={"GROUP": group, "FORMAT": "json"},
)
response.raise_for_status()
return response.json()
except (httpx.HTTPError, ValueError) as exc:
logger.warning("CelesTrak group fetch failed for %s: %s", group, exc)
return []
2. Public Tracker TLE Fix¶
The public satellite tracker page currently fetches TLEs through a dedicated route that calls CelesTrak directly. This bypasses the SatNOGSClient caching layer and creates a second, unmanaged TLE source.
2.1 Current Problem¶
Public tracker page
|
v
/api/tracker/tle/{norad_id} (direct CelesTrak fetch, no caching)
|
v
CelesTrak (unmanaged, no fallback, no rate limiting)
2.2 Fix¶
Route the public tracker through the same SatNOGSClient and CelesTrakClient pipeline:
Public tracker page
|
v
/api/tracker/tle/{norad_id}
|
v
TLE Source Manager (priority cascade)
|-- SatNOGSClient (cached)
|-- CelesTrakClient (fallback)
|-- Stale cache
This ensures consistent caching, rate limiting, and fallback behavior across all TLE consumers.
3. TimescaleDB for Telemetry Persistence¶
TALOS publishes tracking telemetry over MQTT (azimuth, elevation, frequency, Doppler) but does not persist it. Prometheus captures Director metrics (tick timing, message rates) but not per-station tracking data. There is no way to review a completed pass or analyze tracking accuracy over time.
3.1 Why TimescaleDB¶
TimescaleDB is a PostgreSQL extension optimized for time-series data. It provides:
- Hypertables -- Automatic partitioning of time-series tables by time.
- Continuous aggregates -- Materialized views that refresh incrementally.
- Compression -- 90%+ compression ratios on older data.
- SQL compatibility -- Standard PostgreSQL interface; works with SQLAlchemy.
TALOS already uses PostgreSQL. TimescaleDB can be added to the existing database instance or run as a separate database.
3.2 Python Integration¶
Package: sqlalchemy-timescaledb
Version: 0.4.1 (PyPI)
Compatibility: SQLAlchemy 2.0+, PostgreSQL 14+, TimescaleDB 2.x
3.3 Schema Design¶
from sqlalchemy import Column, Integer, Float, String, DateTime, Index
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
pass
class TrackingMeasurement(Base):
"""Per-tick tracking data from the Director."""
__tablename__ = "tracking_measurements"
timestamp = Column(DateTime(timezone=True), primary_key=True, nullable=False)
station_id = Column(String(64), primary_key=True, nullable=False)
campaign_id = Column(String(64), nullable=False)
satellite_id = Column(Integer, nullable=False)
organization_id = Column(String(64), nullable=False)
# Antenna pointing
azimuth = Column(Float, nullable=False) # degrees, 0-360
elevation = Column(Float, nullable=False) # degrees, 0-90
commanded_azimuth = Column(Float, nullable=True) # what Director commanded
commanded_elevation = Column(Float, nullable=True)
# Radio
frequency_hz = Column(Float, nullable=True)
doppler_shift_hz = Column(Float, nullable=True)
mode = Column(String(16), nullable=True) # FM, SSB, CW, etc.
# Signal quality
signal_strength_dbm = Column(Float, nullable=True)
snr_db = Column(Float, nullable=True)
# Satellite position (computed by Director)
sat_latitude = Column(Float, nullable=True)
sat_longitude = Column(Float, nullable=True)
sat_altitude_km = Column(Float, nullable=True)
range_km = Column(Float, nullable=True)
__table_args__ = (
Index("idx_tracking_station_time", "station_id", "timestamp"),
Index("idx_tracking_campaign_time", "campaign_id", "timestamp"),
{"timescaledb_hypertable": {"time_column_name": "timestamp"}},
)
3.4 Hypertable Configuration¶
-- Create hypertable with 1-hour chunks
SELECT create_hypertable(
'tracking_measurements',
'timestamp',
chunk_time_interval => INTERVAL '1 hour'
);
-- Enable compression on chunks older than 7 days
ALTER TABLE tracking_measurements
SET (timescaledb.compress,
timescaledb.compress_segmentby = 'station_id,campaign_id',
timescaledb.compress_orderby = 'timestamp DESC');
SELECT add_compression_policy('tracking_measurements', INTERVAL '7 days');
3.5 Continuous Aggregates¶
-- Rolling 1-minute statistics per station
CREATE MATERIALIZED VIEW tracking_1min
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 minute', timestamp) AS bucket,
station_id,
campaign_id,
AVG(azimuth) AS avg_azimuth,
AVG(elevation) AS avg_elevation,
AVG(signal_strength_dbm) AS avg_signal_dbm,
AVG(snr_db) AS avg_snr_db,
COUNT(*) AS sample_count
FROM tracking_measurements
GROUP BY bucket, station_id, campaign_id;
-- Refresh every 5 minutes, keeping 30 days of aggregates
SELECT add_continuous_aggregate_policy('tracking_1min',
start_offset => INTERVAL '30 days',
end_offset => INTERVAL '5 minutes',
schedule_interval => INTERVAL '5 minutes');
3.6 Data Ingestion Path¶
Director tick loop
|
+-- Compute tracking data (existing)
+-- Publish MQTT command (existing)
+-- Buffer measurement row (new)
|
Background Writer (5-second flush interval)
|
+-- Batch INSERT into tracking_measurements
+-- ~100 rows per flush at 10 stations (2 Hz * 10 * 5s = 100)
+-- ~1,000 rows per flush at 50 stations
Batch inserts amortize the write cost. At 50 stations, the Director generates ~360,000 rows per hour, or ~8.6 million rows per day. With TimescaleDB compression, this is approximately 50-100 MB per day of compressed storage.
3.7 Retention Policy¶
-- Drop raw data older than 90 days
SELECT add_retention_policy('tracking_measurements', INTERVAL '90 days');
-- Keep 1-minute aggregates for 1 year
SELECT add_retention_policy('tracking_1min', INTERVAL '1 year');
4. Coverage Gate CI Job¶
TALOS currently reports test coverage via Cobertura in CI but does not enforce a minimum threshold. A coverage gate prevents regressions by failing the pipeline when coverage drops.
4.1 Design¶
# .gitlab-ci.yml (partial)
coverage-gate:
stage: test
image: python:3.12-slim
script:
- pip install coverage
- coverage run -m pytest tests/ --tb=short
- coverage report --fail-under=75
- coverage xml -o coverage.xml
coverage: '/^TOTAL\s+\d+\s+\d+\s+(\d+)%/'
artifacts:
reports:
coverage_report:
coverage_format: cobertura
path: coverage.xml
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event"'
- if: '$CI_COMMIT_BRANCH == "master"'
4.2 Threshold Selection¶
| Threshold | Rationale |
|---|---|
| 60% | Current estimated coverage (broad integration tests, shallow unit tests) |
| 75% | Target for v0.5 (adding unit tests for scheduler, predictor, telemetry) |
| 85% | Target for v0.6 (mature test suite with property-based tests) |
Start at 75% and ratchet upward. The coverage gate should run on merge requests and master branch pushes.
4.3 Per-Component Minimums¶
Eventually, enforce per-component minimums to prevent coverage hiding:
| Component | Minimum |
|---|---|
core/ |
70% |
director/ |
80% |
shared/ |
90% |
agent/ |
60% |
5. Data Flow Summary¶
The complete data flow after v0.5 changes:
TLE Sources Telemetry Persistence
----------- ---------------------
SatNOGS DB API Director tick loop
| |
v v
TLE Source Manager Measurement buffer
|-- SatNOGSClient |
|-- CelesTrakClient v
|-- Stale cache Background writer
| |
v v
TLE Manager TimescaleDB
| |-- tracking_measurements (raw)
v |-- tracking_1min (aggregate)
Director propagation loop |-- 90-day retention (raw)
| |-- 1-year retention (aggregates)
v
MQTT commands -> Stations
6. Migration Path¶
6.1 Database Changes¶
- Add TimescaleDB extension to PostgreSQL (
CREATE EXTENSION IF NOT EXISTS timescaledb). - Create
tracking_measurementstable via Alembic migration. - Convert to hypertable.
- Add compression and retention policies.
- Create continuous aggregates.
6.2 Application Changes¶
- Add
CelesTrakClienttoshared/package. - Create
TLESourceManagerthat wraps both clients with priority cascade. - Update
TLEManagerto useTLESourceManagerinstead of callingSatNOGSClientdirectly. - Update public tracker route to use
TLESourceManager. - Add
MeasurementBufferclass to Director for batched telemetry writes. - Add
BackgroundWriterthat flushes buffer to TimescaleDB every 5 seconds.
6.3 Dependencies¶
| Package | Version | Purpose |
|---|---|---|
sqlalchemy-timescaledb |
>= 0.4.1 | Hypertable support in SQLAlchemy |
httpx |
>= 0.27 | Async HTTP client for CelesTrak (already in use) |
timescaledb |
>= 2.14 | PostgreSQL extension (infrastructure) |
7. Monitoring¶
7.1 New Prometheus Metrics¶
| Metric | Type | Description |
|---|---|---|
talos_tle_source_requests_total |
Counter | TLE fetch attempts by source (satnogs, celestrak, cache) |
talos_tle_source_failures_total |
Counter | Failed TLE fetches by source |
talos_tle_age_seconds |
Gauge | Age of the oldest active TLE |
talos_telemetry_buffer_size |
Gauge | Pending measurement rows in write buffer |
talos_telemetry_write_duration_seconds |
Histogram | Time to flush measurement buffer |
talos_telemetry_rows_written_total |
Counter | Total rows written to TimescaleDB |
7.2 Alerting Rules¶
| Alert | Condition | Severity |
|---|---|---|
| TLE sources exhausted | Both SatNOGS and CelesTrak failed for > 1 hour | Critical |
| Stale TLE in use | Oldest active TLE age > 48 hours | Warning |
| Telemetry write backlog | Buffer size > 10,000 rows | Warning |
| Telemetry write failure | Write failures > 0 in last 5 minutes | Warning |
Summary¶
Data resilience in v0.5 addresses two independent gaps: TLE source redundancy and telemetry persistence. The CelesTrak fallback provides a safety net for SatNOGS outages and positions TALOS for the July 2026 catalog number overflow by adopting OMM JSON format. TimescaleDB adds historical tracking data storage with minimal overhead (batch writes, automatic compression, SQL compatibility). The coverage gate ensures that new code maintains test quality as the codebase grows.