把 visionA-backend 6 個 in-memory store 接到資料庫持久化,範圍=完整 (PG 全接 + session 接 Redis + 交易韌性)。interface / handler 不動, 只加 DB 實作 + 換 wiring,config 未設 DB 時保留 in-memory fallback。 - 塊 0 基礎建設:pgx/v5 連線池 + DatabaseConfig/RedisConfig + golang-migrate runner(embed)+ cmd/migrate + testcontainers 測試基礎建設 - 塊 1 model → Postgres:array 映射、upsert 保留 CreatedAt、faa_object_key、 三維 filter(owner/chip/source)、soft-delete partial index - 塊 2 device → Postgres:partial unique(已刪 serial 可重註冊)、雙狀態欄位 - 塊 3 token → Postgres:pairing_tokens + session_tokens 分表、token_hash 當 PK - 塊 4 userSession → Redis:idle + absolute 雙 TTL 取代 cleanup goroutine (tunnel session 維持 in-memory,yamux handle 不可序列化) - 塊 5 交易/韌性:WithTx helper + 刪 device cascade 撤銷 token(同 tx 原子) + /healthz ping PG/Redis(fail-fast 503)+ pgx error 統一映射(不洩漏 raw error) 降級策略(fail-fast):PG 掉 → 持久資料 API 回 503;Redis 掉 → session 失敗 不自動 fallback in-memory(避免多機 session 不同步)。 DB:PostgreSQL 14.23(gen_random_uuid 內建、無 citext → email 用 lower() unique index)。每塊經 Reviewer 審查 + 真 PG/Redis testcontainers 全量 dbtest 綠燈, in-memory fallback 未受影響。 docs: 同步更新 database.md(schema/config/migration 清單)+ api-spec.md (409/503 錯誤碼、/healthz 新行為、device unpair cascade)。 Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
329 lines
10 KiB
Go
329 lines
10 KiB
Go
// Package model 的 Postgres 持久層實作(DB 接入塊 1)。
|
||
//
|
||
// PostgresRepository 實作與 InMemoryRepository 完全相同的 Repository interface,
|
||
// 讓 main.go 在 cfg.Database.Enabled() 時無痛切換、handler 與呼叫端一行都不需改。
|
||
//
|
||
// 對齊:
|
||
// - database.md §2.3(Model 欄位)、§4(models 表 schema:faa_object_key / uploaded_at +
|
||
// owner/chip/source filter index + soft-delete partial index)
|
||
// - migrations/0001_create_users_models.up.sql(models 表已含全部欄位,塊 1 不需新 migration)
|
||
//
|
||
// 語意對齊 in-memory(見 model.go):
|
||
// - Get / List 略過 deleted_at IS NOT NULL 的紀錄。
|
||
// - Save 為 upsert by ID;existing 且未刪除時保留原 created_at(in-memory model.go ~line 209)。
|
||
// - Delete 為軟刪除(寫 deleted_at = now());已刪除或不存在回 ErrNotFound。
|
||
//
|
||
// array 映射(pgx v5):
|
||
// - input_shape INT[] <-> []int32(pgx 對 INT[] 預設 decode 成 []int32,Model.InputShape 為 []int)
|
||
// - classes TEXT[] <-> []string
|
||
package model
|
||
|
||
import (
|
||
"context"
|
||
"errors"
|
||
"fmt"
|
||
"strings"
|
||
|
||
"github.com/jackc/pgx/v5"
|
||
"github.com/jackc/pgx/v5/pgxpool"
|
||
)
|
||
|
||
// PostgresRepository 是 Model 的 PostgreSQL 持久層實作。
|
||
type PostgresRepository struct {
|
||
pool *pgxpool.Pool
|
||
}
|
||
|
||
// NewPostgresRepository 建立一個以 pgxpool 為後端的 Repository。
|
||
//
|
||
// pool 由 internal/db 的 NewPool 建立並注入;本套件不持有建池 / 關閉責任。
|
||
func NewPostgresRepository(pool *pgxpool.Pool) *PostgresRepository {
|
||
return &PostgresRepository{pool: pool}
|
||
}
|
||
|
||
// 編譯時檢查:確保 PostgresRepository 實作 Repository。
|
||
var _ Repository = (*PostgresRepository)(nil)
|
||
|
||
// modelColumns 是 SELECT / RETURNING 共用的欄位清單(順序必須與 scanModel 對齊)。
|
||
const modelColumns = `id, owner_user_id, name, description, storage_key, file_size,
|
||
file_checksum, faa_object_key, target_chip, input_shape, classes, framework,
|
||
source, source_job_id, created_at, updated_at, uploaded_at, deleted_at`
|
||
|
||
// Get 取得單一 Model;不存在或已軟刪除回 ErrNotFound。
|
||
func (r *PostgresRepository) Get(ctx context.Context, id string) (*Model, error) {
|
||
const q = `SELECT ` + modelColumns + `
|
||
FROM models
|
||
WHERE id = $1 AND deleted_at IS NULL`
|
||
|
||
row := r.pool.QueryRow(ctx, q, id)
|
||
m, err := scanModel(row)
|
||
if errors.Is(err, pgx.ErrNoRows) {
|
||
return nil, ErrNotFound
|
||
}
|
||
if err != nil {
|
||
return nil, fmt.Errorf("model: pg Get: %w", err)
|
||
}
|
||
return m, nil
|
||
}
|
||
|
||
// List 依 filter 列出未刪除的 Model。
|
||
//
|
||
// filter 三維(OwnerUserID / TargetChip / Source)皆為可選;空字串表示不過濾該維度。
|
||
// 條件以動態 WHERE 拼接(參數化,無字串拼接使用者輸入),對齊 in-memory List 行為。
|
||
// 結果以 created_at DESC 排序(最新在前),提供穩定且對前端友善的順序。
|
||
func (r *PostgresRepository) List(ctx context.Context, filter ListFilter) ([]*Model, error) {
|
||
var (
|
||
conds = []string{"deleted_at IS NULL"}
|
||
args []any
|
||
)
|
||
if filter.OwnerUserID != "" {
|
||
args = append(args, filter.OwnerUserID)
|
||
conds = append(conds, fmt.Sprintf("owner_user_id = $%d", len(args)))
|
||
}
|
||
if filter.TargetChip != "" {
|
||
args = append(args, filter.TargetChip)
|
||
conds = append(conds, fmt.Sprintf("target_chip = $%d", len(args)))
|
||
}
|
||
if filter.Source != "" {
|
||
args = append(args, filter.Source)
|
||
conds = append(conds, fmt.Sprintf("source = $%d", len(args)))
|
||
}
|
||
|
||
q := `SELECT ` + modelColumns + `
|
||
FROM models
|
||
WHERE ` + strings.Join(conds, " AND ") + `
|
||
ORDER BY created_at DESC`
|
||
|
||
rows, err := r.pool.Query(ctx, q, args...)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("model: pg List query: %w", err)
|
||
}
|
||
defer rows.Close()
|
||
|
||
out := make([]*Model, 0)
|
||
for rows.Next() {
|
||
m, scanErr := scanModel(rows)
|
||
if scanErr != nil {
|
||
return nil, fmt.Errorf("model: pg List scan: %w", scanErr)
|
||
}
|
||
out = append(out, m)
|
||
}
|
||
if err := rows.Err(); err != nil {
|
||
return nil, fmt.Errorf("model: pg List rows: %w", err)
|
||
}
|
||
return out, nil
|
||
}
|
||
|
||
// Save 新增或更新 Model(upsert by id)。
|
||
//
|
||
// 語意對齊 in-memory(model.go ~line 209):
|
||
// - 既有且未刪除(deleted_at IS NULL)→ 保留原 created_at;
|
||
// - 不存在 / 已刪除 → 以傳入 created_at(zero 時用 now())為準。
|
||
//
|
||
// updated_at 一律設為 now()。透過 ON CONFLICT (id) DO UPDATE 的 GREATEST/COALESCE 無法
|
||
// 表達「保留原值僅當未刪除」,故 created_at 用 CASE:當 conflict 既有列未刪除時保留
|
||
// models.created_at,否則用 EXCLUDED.created_at。
|
||
func (r *PostgresRepository) Save(ctx context.Context, m *Model) error {
|
||
if m == nil || m.ID == "" {
|
||
return errors.New("model: Save requires non-nil model with ID")
|
||
}
|
||
|
||
// created_at:zero 時交給 DB now()(用 NULL 觸發 COALESCE)。
|
||
var createdAt any
|
||
if !m.CreatedAt.IsZero() {
|
||
createdAt = m.CreatedAt.UTC()
|
||
} // else: 留 nil → COALESCE($n, now())
|
||
|
||
// input_shape:Model.InputShape 為 []int,DB 為 INT[]。pgx encode []int 可行;
|
||
// 為與 decode([]int32)對稱、避免型別歧義,這裡顯式轉 []int32。
|
||
inputShape := toInt32Slice(m.InputShape)
|
||
|
||
// nullable 欄位以指標 / 空值交給 pgx 處理;空字串對 nullable TEXT 欄位寫入空字串(非 NULL),
|
||
// 對齊 in-memory「zero value 即空字串」語意(faa_object_key 等查詢端以 != '' 判斷)。
|
||
const q = `
|
||
INSERT INTO models (
|
||
id, owner_user_id, name, description, storage_key, file_size,
|
||
file_checksum, faa_object_key, target_chip, input_shape, classes, framework,
|
||
source, source_job_id, created_at, updated_at, uploaded_at, deleted_at
|
||
) VALUES (
|
||
$1, $2, $3, $4, $5, $6,
|
||
$7, $8, $9, $10, $11, $12,
|
||
$13, $14, COALESCE($15, now()), now(), $16, $17
|
||
)
|
||
ON CONFLICT (id) DO UPDATE SET
|
||
owner_user_id = EXCLUDED.owner_user_id,
|
||
name = EXCLUDED.name,
|
||
description = EXCLUDED.description,
|
||
storage_key = EXCLUDED.storage_key,
|
||
file_size = EXCLUDED.file_size,
|
||
file_checksum = EXCLUDED.file_checksum,
|
||
faa_object_key = EXCLUDED.faa_object_key,
|
||
target_chip = EXCLUDED.target_chip,
|
||
input_shape = EXCLUDED.input_shape,
|
||
classes = EXCLUDED.classes,
|
||
framework = EXCLUDED.framework,
|
||
source = EXCLUDED.source,
|
||
source_job_id = EXCLUDED.source_job_id,
|
||
-- 保留原 created_at 僅當既有列未刪除;已刪除(復活)或值不同則用新值。
|
||
created_at = CASE
|
||
WHEN models.deleted_at IS NULL THEN models.created_at
|
||
ELSE EXCLUDED.created_at
|
||
END,
|
||
updated_at = now(),
|
||
uploaded_at = EXCLUDED.uploaded_at,
|
||
deleted_at = EXCLUDED.deleted_at`
|
||
|
||
_, err := r.pool.Exec(ctx, q,
|
||
m.ID, // $1
|
||
m.OwnerUserID, // $2
|
||
m.Name, // $3
|
||
m.Description, // $4
|
||
m.StorageKey, // $5
|
||
m.FileSize, // $6
|
||
m.FileChecksum, // $7
|
||
m.FAAObjectKey, // $8
|
||
m.TargetChip, // $9
|
||
inputShape, // $10
|
||
m.Classes, // $11
|
||
m.Framework, // $12
|
||
string(m.Source), // $13
|
||
nullableUUID(m.SourceJobID), // $14
|
||
createdAt, // $15
|
||
m.UploadedAt, // $16
|
||
m.DeletedAt, // $17
|
||
)
|
||
if err != nil {
|
||
return fmt.Errorf("model: pg Save upsert: %w", err)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// Delete 軟刪除:寫 deleted_at = now()。已刪除或不存在回 ErrNotFound。
|
||
func (r *PostgresRepository) Delete(ctx context.Context, id string) error {
|
||
const q = `UPDATE models
|
||
SET deleted_at = now(), updated_at = now()
|
||
WHERE id = $1 AND deleted_at IS NULL`
|
||
|
||
tag, err := r.pool.Exec(ctx, q, id)
|
||
if err != nil {
|
||
return fmt.Errorf("model: pg Delete: %w", err)
|
||
}
|
||
if tag.RowsAffected() == 0 {
|
||
return ErrNotFound
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// ==========================================================================
|
||
// scan / 型別 helper
|
||
// ==========================================================================
|
||
|
||
// rowScanner 抽象 pgx.Row 與 pgx.Rows 的共同 Scan 介面,讓 scanModel 同時服務 Get 與 List。
|
||
type rowScanner interface {
|
||
Scan(dest ...any) error
|
||
}
|
||
|
||
// scanModel 從一列掃出 *Model。欄位順序必須與 modelColumns 對齊。
|
||
//
|
||
// nullable 欄位(faa_object_key / target_chip / description / framework / file_checksum)
|
||
// 在 DB 為 NULL 時掃進空字串;source_job_id(UUID, NULL)以 *string 接再轉空字串;
|
||
// input_shape(NULL 或空陣列)掃成 nil/空,再轉回 []int。
|
||
func scanModel(row rowScanner) (*Model, error) {
|
||
var (
|
||
m Model
|
||
description *string
|
||
fileChecksum *string
|
||
faaObjectKey *string
|
||
targetChip *string
|
||
inputShape []int32
|
||
framework *string
|
||
sourceJobID *string
|
||
)
|
||
|
||
err := row.Scan(
|
||
&m.ID,
|
||
&m.OwnerUserID,
|
||
&m.Name,
|
||
&description,
|
||
&m.StorageKey,
|
||
&m.FileSize,
|
||
&fileChecksum,
|
||
&faaObjectKey,
|
||
&targetChip,
|
||
&inputShape,
|
||
&m.Classes,
|
||
&framework,
|
||
&m.Source,
|
||
&sourceJobID,
|
||
&m.CreatedAt,
|
||
&m.UpdatedAt,
|
||
&m.UploadedAt,
|
||
&m.DeletedAt,
|
||
)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
m.Description = derefString(description)
|
||
m.FileChecksum = derefString(fileChecksum)
|
||
m.FAAObjectKey = derefString(faaObjectKey)
|
||
m.TargetChip = derefString(targetChip)
|
||
m.Framework = derefString(framework)
|
||
m.SourceJobID = derefString(sourceJobID)
|
||
m.InputShape = toIntSlice(inputShape)
|
||
|
||
// 正規化時間為 UTC,對齊 in-memory(time.Now().UTC())。
|
||
m.CreatedAt = m.CreatedAt.UTC()
|
||
m.UpdatedAt = m.UpdatedAt.UTC()
|
||
if m.UploadedAt != nil {
|
||
u := m.UploadedAt.UTC()
|
||
m.UploadedAt = &u
|
||
}
|
||
if m.DeletedAt != nil {
|
||
d := m.DeletedAt.UTC()
|
||
m.DeletedAt = &d
|
||
}
|
||
|
||
return &m, nil
|
||
}
|
||
|
||
// derefString 解指標字串,nil 視為空字串(對齊 in-memory zero value)。
|
||
func derefString(s *string) string {
|
||
if s == nil {
|
||
return ""
|
||
}
|
||
return *s
|
||
}
|
||
|
||
// nullableUUID 把可能為空的 source_job_id(UUID 欄位)轉成寫入值:
|
||
// 空字串 → nil(寫 NULL,避免空字串無法 cast 成 UUID 報錯);否則原樣傳入。
|
||
func nullableUUID(s string) any {
|
||
if s == "" {
|
||
return nil
|
||
}
|
||
return s
|
||
}
|
||
|
||
// toInt32Slice 把 []int 轉 []int32(pgx INT[] encode 用)。nil 維持 nil(寫 NULL)。
|
||
func toInt32Slice(in []int) []int32 {
|
||
if in == nil {
|
||
return nil
|
||
}
|
||
out := make([]int32, len(in))
|
||
for i, v := range in {
|
||
out[i] = int32(v)
|
||
}
|
||
return out
|
||
}
|
||
|
||
// toIntSlice 把 DB decode 出的 []int32 轉回 []int。nil/空維持 nil(對齊 in-memory omitempty)。
|
||
func toIntSlice(in []int32) []int {
|
||
if len(in) == 0 {
|
||
return nil
|
||
}
|
||
out := make([]int, len(in))
|
||
for i, v := range in {
|
||
out[i] = int(v)
|
||
}
|
||
return out
|
||
}
|