fix: hub-side dedup for ESP32 offline status replay (CUB-239) #13

Merged
overseer merged 2 commits from agent/dex/CUB-239-hub-dedup-replay into dev 2026-05-28 07:00:36 -04:00
2 changed files with 48 additions and 12 deletions
+38 -10
View File
@@ -4,6 +4,7 @@ package db
import (
"database/sql"
_ "embed"
"fmt"
"log"
"os"
"path/filepath"
@@ -15,13 +16,16 @@ import (
//go:embed migrations/001_create_tables.sql
var migration001 string
//go:embed migrations/002_dedup_unique_index.sql
var migration002 string
// DB wraps the sql.DB with connection-level settings.
type DB struct {
*sql.DB
}
// Open opens the SQLite database at the given path, enables WAL mode,
// and runs all migrations if the tables don't exist yet.
// and runs all migrations using a schema_version table for tracking.
func Open(path string) (*DB, error) {
// Ensure the directory exists
dir := filepath.Dir(path)
@@ -46,22 +50,46 @@ func Open(path string) (*DB, error) {
return nil, err
}
// Check if tables already exist (idempotent migration)
var count int
if err := db.QueryRow(`
SELECT COUNT(*) FROM sqlite_master
WHERE type='table' AND name IN ('cameras', 'status_logs', 'recording_events', 'settings')
`).Scan(&count); err != nil {
// Ensure schema_version table exists for migration tracking
if _, err := db.Exec(`CREATE TABLE IF NOT EXISTS schema_version (version INTEGER PRIMARY KEY)`); err != nil {
db.Close()
return nil, err
}
if count < 4 {
log.Printf("Running migrations for %s...", path)
if err := migrate(db, migration001); err != nil {
// Read current schema version (0 if table is empty)
var currentVersion int
if err := db.QueryRow(`SELECT COALESCE(MAX(version), 0) FROM schema_version`).Scan(&currentVersion); err != nil {
db.Close()
return nil, err
}
// Migration definitions: ordered list of (version, sql)
type migration struct {
version int
sql string
}
migrations := []migration{
{1, migration001},
{2, migration002},
}
for _, m := range migrations {
if currentVersion >= m.version {
continue
}
log.Printf("Running migration %d for %s...", m.version, path)
if err := migrate(db, m.sql); err != nil {
db.Close()
return nil, fmt.Errorf("migration %d: %w", m.version, err)
}
if _, err := db.Exec(`INSERT INTO schema_version (version) VALUES (?)`, m.version); err != nil {
db.Close()
return nil, fmt.Errorf("record migration %d: %w", m.version, err)
}
log.Printf("Migration %d complete", m.version)
}
if currentVersion < len(migrations) {
log.Println("Migrations complete")
}
@@ -0,0 +1,8 @@
-- Migration: 002_dedup_unique_index
-- Add a UNIQUE index on (camera_id, recorded_at) to enforce hub-side
-- deduplication for ESP32 offline status replay (CUB-239).
-- This prevents race-condition double-inserts that a pure SELECT COUNT(*)
-- check cannot guard against.
CREATE UNIQUE INDEX IF NOT EXISTS idx_status_logs_unique_entry
ON status_logs(camera_id, recorded_at);