fix: hub-side dedup for ESP32 offline status replay (CUB-239)
CI/CD / lint-and-typecheck (pull_request) Failing after 11m33s
CI/CD / test (pull_request) Has been cancelled
CI/CD / build (pull_request) Has been cancelled
CI/CD / deploy (pull_request) Has been cancelled

- Add migration 002: UNIQUE index on status_logs(camera_id, recorded_at)
- Upgrade migration system to version-tracked (schema_version table)
- Prevents race-condition double-inserts that application-level COUNT(*) check cannot guard against
- Complements existing application-level dedup from CUB-230
This commit is contained in:
2026-05-23 09:01:21 -04:00
parent fe193701ae
commit 74c8697e57
2 changed files with 48 additions and 12 deletions
+40 -12
View File
@@ -4,6 +4,7 @@ package db
import (
"database/sql"
_ "embed"
"fmt"
"log"
"os"
"path/filepath"
@@ -14,13 +15,16 @@ import (
//go:embed migrations/001_create_tables.sql
var migration001 string
//go:embed migrations/002_dedup_unique_index.sql
var migration002 string
// DB wraps the sql.DB with connection-level settings.
type DB struct {
*sql.DB
}
// Open opens the SQLite database at the given path, enables WAL mode,
// and runs all migrations if the tables don't exist yet.
// and runs all migrations using a schema_version table for tracking.
func Open(path string) (*DB, error) {
// Ensure the directory exists
dir := filepath.Dir(path)
@@ -45,22 +49,46 @@ func Open(path string) (*DB, error) {
return nil, err
}
// Check if tables already exist (idempotent migration)
var count int
if err := db.QueryRow(`
SELECT COUNT(*) FROM sqlite_master
WHERE type='table' AND name IN ('cameras', 'status_logs', 'recording_events', 'settings')
`).Scan(&count); err != nil {
// Ensure schema_version table exists for migration tracking
if _, err := db.Exec(`CREATE TABLE IF NOT EXISTS schema_version (version INTEGER PRIMARY KEY)`); err != nil {
db.Close()
return nil, err
}
if count < 4 {
log.Printf("Running migrations for %s...", path)
if err := migrate(db, migration001); err != nil {
db.Close()
return nil, err
// Read current schema version (0 if table is empty)
var currentVersion int
if err := db.QueryRow(`SELECT COALESCE(MAX(version), 0) FROM schema_version`).Scan(&currentVersion); err != nil {
db.Close()
return nil, err
}
// Migration definitions: ordered list of (version, sql)
type migration struct {
version int
sql string
}
migrations := []migration{
{1, migration001},
{2, migration002},
}
for _, m := range migrations {
if currentVersion >= m.version {
continue
}
log.Printf("Running migration %d for %s...", m.version, path)
if err := migrate(db, m.sql); err != nil {
db.Close()
return nil, fmt.Errorf("migration %d: %w", m.version, err)
}
if _, err := db.Exec(`INSERT INTO schema_version (version) VALUES (?)`, m.version); err != nil {
db.Close()
return nil, fmt.Errorf("record migration %d: %w", m.version, err)
}
log.Printf("Migration %d complete", m.version)
}
if currentVersion < len(migrations) {
log.Println("Migrations complete")
}