Skip to content

Commit

Permalink
refactor: separate read-write and read-only db handles
Browse files Browse the repository at this point in the history
  • Loading branch information
nalgeon committed Apr 28, 2024
1 parent ce5edee commit a1941e3
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 90 deletions.
24 changes: 12 additions & 12 deletions internal/rhash/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ type DB struct {

// New connects to the hash repository.
// Does not create the database schema.
func New(db *sql.DB) *DB {
d := sqlx.New(db, NewTx)
func New(rw *sql.DB, ro *sql.DB) *DB {
d := sqlx.New(rw, ro, NewTx)
return &DB{d}
}

Expand All @@ -30,37 +30,37 @@ func New(db *sql.DB) *DB {
// Does nothing if the key does not exist or is not a hash.
// Does not delete the key if the hash becomes empty.
func (d *DB) Delete(key string, fields ...string) (int, error) {
tx := NewTx(d.SQL)
tx := NewTx(d.RW)
return tx.Delete(key, fields...)
}

// Exists checks if a field exists in a hash.
// If the key does not exist or is not a hash, returns false.
func (d *DB) Exists(key, field string) (bool, error) {
tx := NewTx(d.SQL)
tx := NewTx(d.RO)
return tx.Exists(key, field)
}

// Fields returns all fields in a hash.
// If the key does not exist or is not a hash, returns an empty slice.
func (d *DB) Fields(key string) ([]string, error) {
tx := NewTx(d.SQL)
tx := NewTx(d.RO)
return tx.Fields(key)
}

// Get returns the value of a field in a hash.
// If the element does not exist, returns ErrNotFound.
// If the key does not exist or is not a hash, returns ErrNotFound.
func (d *DB) Get(key, field string) (core.Value, error) {
tx := NewTx(d.SQL)
tx := NewTx(d.RO)
return tx.Get(key, field)
}

// GetMany returns a map of values for given fields.
// Ignores fields that do not exist and do not return them in the map.
// If the key does not exist or is not a hash, returns an empty map.
func (d *DB) GetMany(key string, fields ...string) (map[string]core.Value, error) {
tx := NewTx(d.SQL)
tx := NewTx(d.RO)
return tx.GetMany(key, fields...)
}

Expand Down Expand Up @@ -97,14 +97,14 @@ func (d *DB) IncrFloat(key, field string, delta float64) (float64, error) {
// Items returns a map of all fields and values in a hash.
// If the key does not exist or is not a hash, returns an empty map.
func (d *DB) Items(key string) (map[string]core.Value, error) {
tx := NewTx(d.SQL)
tx := NewTx(d.RO)
return tx.Items(key)
}

// Len returns the number of fields in a hash.
// If the key does not exist or is not a hash, returns 0.
func (d *DB) Len(key string) (int, error) {
tx := NewTx(d.SQL)
tx := NewTx(d.RO)
return tx.Len(key)
}

Expand All @@ -115,7 +115,7 @@ func (d *DB) Len(key string) (int, error) {
// If the key does not exist or is not a hash, returns a nil slice.
// Supports glob-style patterns. Set count = 0 for default page size.
func (d *DB) Scan(key string, cursor int, pattern string, count int) (ScanResult, error) {
tx := NewTx(d.SQL)
tx := NewTx(d.RO)
return tx.Scan(key, cursor, pattern, count)
}

Expand All @@ -125,7 +125,7 @@ func (d *DB) Scan(key string, cursor int, pattern string, count int) (ScanResult
// or an error occurs. If the key does not exist or is not a hash, stops immediately.
// Supports glob-style patterns. Set pageSize = 0 for default page size.
func (d *DB) Scanner(key, pattern string, pageSize int) *Scanner {
tx := NewTx(d.SQL)
tx := NewTx(d.RO)
return tx.Scanner(key, pattern, pageSize)
}

Expand Down Expand Up @@ -171,6 +171,6 @@ func (d *DB) SetNotExists(key, field string, value any) (bool, error) {
// Values returns all values in a hash.
// If the key does not exist or is not a hash, returns an empty slice.
func (d *DB) Values(key string) ([]core.Value, error) {
tx := NewTx(d.SQL)
tx := NewTx(d.RO)
return tx.Values(key)
}
30 changes: 15 additions & 15 deletions internal/rkey/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,64 +20,64 @@ type DB struct {

// New creates a new database-backed key repository.
// Does not create the database schema.
func New(db *sql.DB) *DB {
d := sqlx.New(db, NewTx)
func New(rw *sql.DB, ro *sql.DB) *DB {
d := sqlx.New(rw, ro, NewTx)
return &DB{d}
}

// Count returns the number of existing keys among specified.
func (db *DB) Count(keys ...string) (int, error) {
tx := NewTx(db.SQL)
tx := NewTx(db.RO)
return tx.Count(keys...)
}

// Delete deletes keys and their values, regardless of the type.
// Returns the number of deleted keys. Non-existing keys are ignored.
func (db *DB) Delete(keys ...string) (int, error) {
tx := NewTx(db.SQL)
tx := NewTx(db.RW)
return tx.Delete(keys...)
}

// DeleteAll deletes all keys and their values, effectively resetting
// the database. Should not be run inside a database transaction.
func (db *DB) DeleteAll() error {
tx := NewTx(db.SQL)
tx := NewTx(db.RW)
return tx.DeleteAll()
}

// DeleteExpired deletes keys with expired TTL, but no more than n keys.
// If n = 0, deletes all expired keys.
func (db *DB) DeleteExpired(n int) (count int, err error) {
tx := NewTx(db.SQL)
tx := NewTx(db.RW)
return tx.deleteExpired(n)
}

// Exists reports whether the key exists.
func (db *DB) Exists(key string) (bool, error) {
tx := NewTx(db.SQL)
tx := NewTx(db.RO)
return tx.Exists(key)
}

// Expire sets a time-to-live (ttl) for the key using a relative duration.
// After the ttl passes, the key is expired and no longer exists.
// If the key does not exist, returns ErrNotFound.
func (db *DB) Expire(key string, ttl time.Duration) error {
tx := NewTx(db.SQL)
tx := NewTx(db.RW)
return tx.Expire(key, ttl)
}

// ExpireAt sets an expiration time for the key. After this time,
// the key is expired and no longer exists.
// If the key does not exist, returns ErrNotFound.
func (db *DB) ExpireAt(key string, at time.Time) error {
tx := NewTx(db.SQL)
tx := NewTx(db.RW)
return tx.ExpireAt(key, at)
}

// Get returns a specific key with all associated details.
// If the key does not exist, returns ErrNotFound.
func (db *DB) Get(key string) (core.Key, error) {
tx := NewTx(db.SQL)
tx := NewTx(db.RO)
return tx.Get(key)
}

Expand All @@ -89,21 +89,21 @@ func (db *DB) Get(key string) (core.Key, error) {
// Use this method only if you are sure that the number of keys is
// limited. Otherwise, use the [DB.Scan] or [DB.Scanner] methods.
func (db *DB) Keys(pattern string) ([]core.Key, error) {
tx := NewTx(db.SQL)
tx := NewTx(db.RO)
return tx.Keys(pattern)
}

// Persist removes the expiration time for the key.
// If the key does not exist, returns ErrNotFound.
func (db *DB) Persist(key string) error {
tx := NewTx(db.SQL)
tx := NewTx(db.RW)
return tx.Persist(key)
}

// Random returns a random key.
// If there are no keys, returns ErrNotFound.
func (db *DB) Random() (core.Key, error) {
tx := NewTx(db.SQL)
tx := NewTx(db.RO)
return tx.Random()
}

Expand Down Expand Up @@ -137,7 +137,7 @@ func (db *DB) RenameNotExists(key, newKey string) (bool, error) {
// Returns an empty slice when there are no more keys.
// Supports glob-style patterns. Set count = 0 for default page size.
func (db *DB) Scan(cursor int, pattern string, pageSize int) (ScanResult, error) {
tx := NewTx(db.SQL)
tx := NewTx(db.RO)
return tx.Scan(cursor, pattern, pageSize)
}

Expand All @@ -147,5 +147,5 @@ func (db *DB) Scan(cursor int, pattern string, pageSize int) (ScanResult, error)
// Stops when there are no more items or an error occurs.
// Supports glob-style patterns. Set pageSize = 0 for default page size.
func (db *DB) Scanner(pattern string, pageSize int) *Scanner {
return newScanner(NewTx(db.SQL), pattern, pageSize)
return newScanner(NewTx(db.RO), pattern, pageSize)
}
12 changes: 6 additions & 6 deletions internal/rstring/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,23 @@ type DB struct {

// New connects to the string repository.
// Does not create the database schema.
func New(db *sql.DB) *DB {
d := sqlx.New(db, NewTx)
func New(rw *sql.DB, ro *sql.DB) *DB {
d := sqlx.New(rw, ro, NewTx)
return &DB{d}
}

// Get returns the value of the key.
// If the key does not exist or is not a string, returns ErrNotFound.
func (d *DB) Get(key string) (core.Value, error) {
tx := NewTx(d.SQL)
tx := NewTx(d.RO)
return tx.Get(key)
}

// GetMany returns a map of values for given keys.
// Ignores keys that do not exist or not strings,
// and does not return them in the map.
func (d *DB) GetMany(keys ...string) (map[string]core.Value, error) {
tx := NewTx(d.SQL)
tx := NewTx(d.RO)
return tx.GetMany(keys...)
}

Expand Down Expand Up @@ -70,14 +70,14 @@ func (d *DB) IncrFloat(key string, delta float64) (float64, error) {
// Set sets the key value that will not expire.
// Overwrites the value if the key already exists.
func (d *DB) Set(key string, value any) error {
tx := NewTx(d.SQL)
tx := NewTx(d.RW)
return tx.Set(key, value)
}

// SetExpires sets the key value with an optional expiration time (if ttl > 0).
// Overwrites the value and ttl if the key already exists.
func (d *DB) SetExpires(key string, value any, ttl time.Duration) error {
tx := NewTx(d.SQL)
tx := NewTx(d.RW)
return tx.SetExpires(key, value, ttl)
}

Expand Down
34 changes: 17 additions & 17 deletions internal/rzset/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ type DB struct {

// New connects to the sorted set repository.
// Does not create the database schema.
func New(db *sql.DB) *DB {
d := sqlx.New(db, NewTx)
func New(rw *sql.DB, ro *sql.DB) *DB {
d := sqlx.New(rw, ro, NewTx)
return &DB{d}
}

Expand Down Expand Up @@ -60,7 +60,7 @@ func (d *DB) AddMany(key string, items map[any]float64) (int, error) {
// min and max (inclusive). Exclusive ranges are not supported.
// Returns 0 if the key does not exist or is not a set.
func (d *DB) Count(key string, min, max float64) (int, error) {
tx := NewTx(d.SQL)
tx := NewTx(d.RO)
return tx.Count(key, min, max)
}

Expand All @@ -70,13 +70,13 @@ func (d *DB) Count(key string, min, max float64) (int, error) {
// Does nothing if the key does not exist or is not a set.
// Does not delete the key if the set becomes empty.
func (d *DB) Delete(key string, elems ...any) (int, error) {
tx := NewTx(d.SQL)
tx := NewTx(d.RW)
return tx.Delete(key, elems...)
}

// DeleteWith removes elements from a set with additional options.
func (d *DB) DeleteWith(key string) DeleteCmd {
tx := NewTx(d.SQL)
tx := NewTx(d.RW)
return tx.DeleteWith(key)
}

Expand All @@ -86,7 +86,7 @@ func (d *DB) DeleteWith(key string) DeleteCmd {
// If the element does not exist, returns ErrNotFound.
// If the key does not exist or is not a set, returns ErrNotFound.
func (d *DB) GetRank(key string, elem any) (rank int, score float64, err error) {
tx := NewTx(d.SQL)
tx := NewTx(d.RO)
return tx.GetRank(key, elem)
}

Expand All @@ -96,15 +96,15 @@ func (d *DB) GetRank(key string, elem any) (rank int, score float64, err error)
// If the element does not exist, returns ErrNotFound.
// If the key does not exist or is not a set, returns ErrNotFound.
func (d *DB) GetRankRev(key string, elem any) (rank int, score float64, err error) {
tx := NewTx(d.SQL)
tx := NewTx(d.RO)
return tx.GetRankRev(key, elem)
}

// GetScore returns the score of an element in a set.
// If the element does not exist, returns ErrNotFound.
// If the key does not exist or is not a set, returns ErrNotFound.
func (d *DB) GetScore(key string, elem any) (float64, error) {
tx := NewTx(d.SQL)
tx := NewTx(d.RO)
return tx.GetScore(key, elem)
}

Expand All @@ -127,20 +127,20 @@ func (d *DB) Incr(key string, elem any, delta float64) (float64, error) {
// The score of each element is the sum of its scores in the given sets.
// If any of the source keys do not exist or are not sets, returns an empty slice.
func (d *DB) Inter(keys ...string) ([]SetItem, error) {
tx := NewTx(d.SQL)
tx := NewTx(d.RO)
return tx.Inter(keys...)
}

// InterWith intersects multiple sets with additional options.
func (d *DB) InterWith(keys ...string) InterCmd {
tx := NewTx(d.SQL)
tx := NewTx(d.RW)
return tx.InterWith(keys...)
}

// Len returns the number of elements in a set.
// Returns 0 if the key does not exist or is not a set.
func (d *DB) Len(key string) (int, error) {
tx := NewTx(d.SQL)
tx := NewTx(d.RO)
return tx.Len(key)
}

Expand All @@ -150,13 +150,13 @@ func (d *DB) Len(key string) (int, error) {
// Start and stop are 0-based, inclusive. Negative values are not supported.
// If the key does not exist or is not a set, returns a nil slice.
func (d *DB) Range(key string, start, stop int) ([]SetItem, error) {
tx := NewTx(d.SQL)
tx := NewTx(d.RO)
return tx.Range(key, start, stop)
}

// RangeWith ranges elements from a set with additional options.
func (d *DB) RangeWith(key string) RangeCmd {
tx := NewTx(d.SQL)
tx := NewTx(d.RO)
return tx.RangeWith(key)
}

Expand All @@ -167,7 +167,7 @@ func (d *DB) RangeWith(key string) RangeCmd {
// If the key does not exist or is not a set, returns a nil slice.
// Supports glob-style patterns. Set count = 0 for default page size.
func (d *DB) Scan(key string, cursor int, pattern string, count int) (ScanResult, error) {
tx := NewTx(d.SQL)
tx := NewTx(d.RO)
return tx.Scan(key, cursor, pattern, count)
}

Expand All @@ -177,7 +177,7 @@ func (d *DB) Scan(key string, cursor int, pattern string, count int) (ScanResult
// or an error occurs. If the key does not exist or is not a set, stops immediately.
// Supports glob-style patterns. Set pageSize = 0 for default page size.
func (d *DB) Scanner(key, pattern string, pageSize int) *Scanner {
tx := NewTx(d.SQL)
tx := NewTx(d.RO)
return tx.Scanner(key, pattern, pageSize)
}

Expand All @@ -187,12 +187,12 @@ func (d *DB) Scanner(key, pattern string, pageSize int) *Scanner {
// Ignores the keys that do not exist or are not sets.
// If no keys exist, returns a nil slice.
func (d *DB) Union(keys ...string) ([]SetItem, error) {
tx := NewTx(d.SQL)
tx := NewTx(d.RO)
return tx.Union(keys...)
}

// UnionWith unions multiple sets with additional options.
func (d *DB) UnionWith(keys ...string) UnionCmd {
tx := NewTx(d.SQL)
tx := NewTx(d.RW)
return tx.UnionWith(keys...)
}

0 comments on commit a1941e3

Please sign in to comment.