All checks were successful
Dev Build / build-test (pull_request) Successful in 2m23s
- Create repository/ package with pgx-backed CRUD for agents, sessions, tasks, projects - Define AgentRepo/SessionRepo/TaskRepo/ProjectRepo interfaces - Update handler to use repository interfaces instead of in-memory stores - Add SSE broker with GET /api/events endpoint (text/event-stream) - Add gateway client that polls OpenClaw for agent states - Add GATEWAY_URL and GATEWAY_POLL_INTERVAL config fields - Seed 5 demo agents (Otto, Rex, Dex, Hex, Pip) on empty DB - Update router to wire SSE broker - All 21 handler tests pass with mock repos
187 lines
5.4 KiB
Go
187 lines
5.4 KiB
Go
// Package repository provides PostgreSQL-backed CRUD implementations
|
|
// for the Control Center domain entities. Each repository takes a
|
|
// *pgxpool.Pool in its constructor and uses pgx.CollectRows() for scanning.
|
|
package repository
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models"
|
|
"github.com/jackc/pgx/v5"
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
)
|
|
|
|
// AgentRepository provides PostgreSQL-backed CRUD for agents.
|
|
type AgentRepository struct {
|
|
pool *pgxpool.Pool
|
|
}
|
|
|
|
// NewAgentRepository returns a repository wired to the given connection pool.
|
|
func NewAgentRepository(pool *pgxpool.Pool) *AgentRepository {
|
|
return &AgentRepository{pool: pool}
|
|
}
|
|
|
|
// Create inserts a new agent. It maps the models.AgentCardData fields onto
|
|
// the agents table columns (uuid id, text name, text status, text task,
|
|
// int progress, text session_key, text channel).
|
|
func (r *AgentRepository) Create(ctx context.Context, a models.AgentCardData) error {
|
|
prog := 0
|
|
if a.TaskProgress != nil {
|
|
prog = *a.TaskProgress
|
|
}
|
|
|
|
_, err := r.pool.Exec(ctx, `
|
|
INSERT INTO agents (id, name, status, task, progress, session_key, channel, last_activity)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
|
`, a.ID, a.DisplayName, string(a.Status), a.CurrentTask, prog,
|
|
a.SessionKey, a.Channel, time.Now().UTC())
|
|
|
|
return err
|
|
}
|
|
|
|
// Get returns a single agent by its string id.
|
|
func (r *AgentRepository) Get(ctx context.Context, id string) (models.AgentCardData, error) {
|
|
var a models.AgentCardData
|
|
var task *string
|
|
var prog int
|
|
var lastActivity time.Time
|
|
|
|
err := r.pool.QueryRow(ctx, `
|
|
SELECT id, name, status, task, progress, session_key, channel, last_activity
|
|
FROM agents WHERE id = $1
|
|
`, id).Scan(&a.ID, &a.DisplayName, &a.Status, &task, &prog,
|
|
&a.SessionKey, &a.Channel, &lastActivity)
|
|
|
|
if err != nil {
|
|
return a, err
|
|
}
|
|
|
|
a.CurrentTask = task
|
|
if prog > 0 || task != nil {
|
|
p := prog
|
|
a.TaskProgress = &p
|
|
}
|
|
a.LastActivity = lastActivity.UTC().Format(time.RFC3339)
|
|
|
|
// Role is not persisted in the current schema — set a sensible default.
|
|
a.Role = "agent"
|
|
|
|
return a, nil
|
|
}
|
|
|
|
// List returns all agents, optionally filtered by status.
|
|
// Results are ordered by name (display_name).
|
|
func (r *AgentRepository) List(ctx context.Context, statusFilter models.AgentStatus) ([]models.AgentCardData, error) {
|
|
var rows pgx.Rows
|
|
var err error
|
|
|
|
if statusFilter != "" {
|
|
rows, err = r.pool.Query(ctx, `
|
|
SELECT id, name, status, task, progress, session_key, channel, last_activity
|
|
FROM agents WHERE status = $1 ORDER BY name
|
|
`, string(statusFilter))
|
|
} else {
|
|
rows, err = r.pool.Query(ctx, `
|
|
SELECT id, name, status, task, progress, session_key, channel, last_activity
|
|
FROM agents ORDER BY name
|
|
`)
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
return pgx.CollectRows(rows, func(row pgx.CollectableRow) (models.AgentCardData, error) {
|
|
var a models.AgentCardData
|
|
var task *string
|
|
var prog int
|
|
var lastActivity time.Time
|
|
|
|
if err := row.Scan(&a.ID, &a.DisplayName, &a.Status, &task, &prog,
|
|
&a.SessionKey, &a.Channel, &lastActivity); err != nil {
|
|
return a, err
|
|
}
|
|
|
|
a.CurrentTask = task
|
|
if prog > 0 || task != nil {
|
|
p := prog
|
|
a.TaskProgress = &p
|
|
}
|
|
a.LastActivity = lastActivity.UTC().Format(time.RFC3339)
|
|
a.Role = "agent"
|
|
return a, nil
|
|
})
|
|
}
|
|
|
|
// Update applies partial updates to an agent. Returns the updated agent.
|
|
func (r *AgentRepository) Update(ctx context.Context, id string, req models.UpdateAgentRequest) (models.AgentCardData, error) {
|
|
// Build dynamic SET clause.
|
|
setClauses := []string{"last_activity = $2"}
|
|
args := []any{id, time.Now().UTC()}
|
|
argIdx := 3
|
|
|
|
if req.Status != nil {
|
|
setClauses = append(setClauses, fmt.Sprintf("status = $%d", argIdx))
|
|
args = append(args, string(*req.Status))
|
|
argIdx++
|
|
}
|
|
if req.CurrentTask != nil {
|
|
setClauses = append(setClauses, fmt.Sprintf("task = $%d", argIdx))
|
|
args = append(args, *req.CurrentTask)
|
|
argIdx++
|
|
}
|
|
if req.TaskProgress != nil {
|
|
setClauses = append(setClauses, fmt.Sprintf("progress = $%d", argIdx))
|
|
args = append(args, *req.TaskProgress)
|
|
argIdx++
|
|
}
|
|
if req.Channel != nil {
|
|
setClauses = append(setClauses, fmt.Sprintf("channel = $%d", argIdx))
|
|
args = append(args, *req.Channel)
|
|
argIdx++
|
|
}
|
|
|
|
// Build and execute
|
|
query := "UPDATE agents SET "
|
|
for i, clause := range setClauses {
|
|
if i > 0 {
|
|
query += ", "
|
|
}
|
|
query += clause
|
|
}
|
|
query += " WHERE id = $1"
|
|
|
|
ct, err := r.pool.Exec(ctx, query, args...)
|
|
if err != nil {
|
|
return models.AgentCardData{}, err
|
|
}
|
|
if ct.RowsAffected() == 0 {
|
|
return models.AgentCardData{}, fmt.Errorf("agent not found: %s", id)
|
|
}
|
|
|
|
return r.Get(ctx, id)
|
|
}
|
|
|
|
// Delete removes an agent. Returns nil even if the agent doesn't exist
|
|
// (idempotent). Returns a wrapped error only on transport failures.
|
|
func (r *AgentRepository) Delete(ctx context.Context, id string) error {
|
|
_, err := r.pool.Exec(ctx, `DELETE FROM agents WHERE id = $1`, id)
|
|
return err
|
|
}
|
|
|
|
// Count returns the total number of agents.
|
|
func (r *AgentRepository) Count(ctx context.Context) (int, error) {
|
|
var n int
|
|
err := r.pool.QueryRow(ctx, `SELECT COUNT(*) FROM agents`).Scan(&n)
|
|
return n, err
|
|
}
|
|
|
|
// CountByStatus returns the number of agents with the given status.
|
|
func (r *AgentRepository) CountByStatus(ctx context.Context, status models.AgentStatus) (int, error) {
|
|
var n int
|
|
err := r.pool.QueryRow(ctx, `SELECT COUNT(*) FROM agents WHERE status = $1`, string(status)).Scan(&n)
|
|
return n, err
|
|
}
|