Skip to content

Commit

Permalink
refactor: remove db transactions for single-query operations
Browse files Browse the repository at this point in the history
  • Loading branch information
nalgeon committed Apr 28, 2024
1 parent 1b55ad8 commit ce5edee
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 160 deletions.
9 changes: 2 additions & 7 deletions internal/rhash/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,8 @@ func New(db *sql.DB) *DB {
// Does nothing if the key does not exist or is not a hash.
// Does not delete the key if the hash becomes empty.
func (d *DB) Delete(key string, fields ...string) (int, error) {
var count int
err := d.Update(func(tx *Tx) error {
var err error
count, err = tx.Delete(key, fields...)
return err
})
return count, err
tx := NewTx(d.SQL)
return tx.Delete(key, fields...)
}

// Exists checks if a field exists in a hash.
Expand Down
34 changes: 10 additions & 24 deletions internal/rkey/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,8 @@ func (db *DB) Count(keys ...string) (int, error) {
// Delete deletes keys and their values, regardless of the type.
// Returns the number of deleted keys. Non-existing keys are ignored.
func (db *DB) Delete(keys ...string) (int, error) {
var count int
err := db.Update(func(tx *Tx) error {
var err error
count, err = tx.Delete(keys...)
return err
})
return count, err
tx := NewTx(db.SQL)
return tx.Delete(keys...)
}

// DeleteAll deletes all keys and their values, effectively resetting
Expand All @@ -53,11 +48,8 @@ func (db *DB) DeleteAll() error {
// DeleteExpired deletes keys with expired TTL, but no more than n keys.
// If n = 0, deletes all expired keys.
func (db *DB) DeleteExpired(n int) (count int, err error) {
err = db.Update(func(tx *Tx) error {
count, err = tx.deleteExpired(n)
return err
})
return count, err
tx := NewTx(db.SQL)
return tx.deleteExpired(n)
}

// Exists reports whether the key exists.
Expand All @@ -70,20 +62,16 @@ func (db *DB) Exists(key string) (bool, error) {
// After the ttl passes, the key is expired and no longer exists.
// If the key does not exist, returns ErrNotFound.
func (db *DB) Expire(key string, ttl time.Duration) error {
err := db.Update(func(tx *Tx) error {
return tx.Expire(key, ttl)
})
return err
tx := NewTx(db.SQL)
return tx.Expire(key, ttl)
}

// ExpireAt sets an expiration time for the key. After this time,
// the key is expired and no longer exists.
// If the key does not exist, returns ErrNotFound.
func (db *DB) ExpireAt(key string, at time.Time) error {
err := db.Update(func(tx *Tx) error {
return tx.ExpireAt(key, at)
})
return err
tx := NewTx(db.SQL)
return tx.ExpireAt(key, at)
}

// Get returns a specific key with all associated details.
Expand All @@ -108,10 +96,8 @@ func (db *DB) Keys(pattern string) ([]core.Key, error) {
// Persist removes the expiration time for the key.
// If the key does not exist, returns ErrNotFound.
func (db *DB) Persist(key string) error {
err := db.Update(func(tx *Tx) error {
return tx.Persist(key)
})
return err
tx := NewTx(db.SQL)
return tx.Persist(key)
}

// Random returns a random key.
Expand Down
12 changes: 4 additions & 8 deletions internal/rstring/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,15 @@ func (d *DB) IncrFloat(key string, delta float64) (float64, error) {
// Set sets the key value that will not expire.
// Overwrites the value if the key already exists.
func (d *DB) Set(key string, value any) error {
err := d.Update(func(tx *Tx) error {
return tx.Set(key, value)
})
return err
tx := NewTx(d.SQL)
return tx.Set(key, value)
}

// SetExpires sets the key value with an optional expiration time (if ttl > 0).
// Overwrites the value and ttl if the key already exists.
func (d *DB) SetExpires(key string, value any, ttl time.Duration) error {
err := d.Update(func(tx *Tx) error {
return tx.SetExpires(key, value, ttl)
})
return err
tx := NewTx(d.SQL)
return tx.SetExpires(key, value, ttl)
}

// SetMany sets the values of multiple keys.
Expand Down
18 changes: 8 additions & 10 deletions internal/rzset/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,14 @@ func (d *DB) Count(key string, min, max float64) (int, error) {
// Does nothing if the key does not exist or is not a set.
// Does not delete the key if the set becomes empty.
func (d *DB) Delete(key string, elems ...any) (int, error) {
var count int
err := d.Update(func(tx *Tx) error {
var err error
count, err = tx.Delete(key, elems...)
return err
})
return count, err
tx := NewTx(d.SQL)
return tx.Delete(key, elems...)
}

// DeleteWith removes elements from a set with additional options.
func (d *DB) DeleteWith(key string) DeleteCmd {
return DeleteCmd{db: d, key: key}
tx := NewTx(d.SQL)
return tx.DeleteWith(key)
}

// GetRank returns the rank and score of an element in a set.
Expand Down Expand Up @@ -137,7 +133,8 @@ func (d *DB) Inter(keys ...string) ([]SetItem, error) {

// InterWith intersects multiple sets with additional options.
func (d *DB) InterWith(keys ...string) InterCmd {
return InterCmd{db: d, keys: keys, aggregate: sqlx.Sum}
tx := NewTx(d.SQL)
return tx.InterWith(keys...)
}

// Len returns the number of elements in a set.
Expand Down Expand Up @@ -196,5 +193,6 @@ func (d *DB) Union(keys ...string) ([]SetItem, error) {

// UnionWith unions multiple sets with additional options.
func (d *DB) UnionWith(keys ...string) UnionCmd {
return UnionCmd{db: d, keys: keys, aggregate: sqlx.Sum}
tx := NewTx(d.SQL)
return tx.UnionWith(keys...)
}
32 changes: 7 additions & 25 deletions internal/rzset/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ const (

// DeleteCmd removes elements from a set.
type DeleteCmd struct {
db *DB
tx *Tx
tx sqlx.Tx
key string
byRank *byRank
byScore *byScore
Expand Down Expand Up @@ -63,34 +62,17 @@ func (c DeleteCmd) ByScore(start, stop float64) DeleteCmd {
// Does nothing if the key does not exist or is not a set.
// Does not delete the key if the set becomes empty.
func (c DeleteCmd) Run() (int, error) {
if c.db != nil {
var count int
err := c.db.Update(func(tx *Tx) error {
var err error
count, err = c.delete(tx.tx)
return err
})
return count, err
}
if c.tx != nil {
return c.delete(c.tx.tx)
}
return 0, nil
}

// delete removes elements from a set in a transaction.
func (c DeleteCmd) delete(tx sqlx.Tx) (int, error) {
if c.byRank != nil {
return c.deleteRank(tx)
return c.deleteRank()
}
if c.byScore != nil {
return c.deleteScore(tx)
return c.deleteScore()
}
return 0, nil
}

// deleteRank removes elements from a set by rank.
func (c DeleteCmd) deleteRank(tx sqlx.Tx) (int, error) {
func (c DeleteCmd) deleteRank() (int, error) {
// Check start and stop values.
if c.byRank.start < 0 || c.byRank.stop < 0 {
return 0, nil
Expand All @@ -103,7 +85,7 @@ func (c DeleteCmd) deleteRank(tx sqlx.Tx) (int, error) {
c.byRank.start, // start
c.byRank.stop - c.byRank.start + 1, // count
}
res, err := tx.Exec(sqlDeleteRank, args...)
res, err := c.tx.Exec(sqlDeleteRank, args...)
if err != nil {
return 0, err
}
Expand All @@ -112,14 +94,14 @@ func (c DeleteCmd) deleteRank(tx sqlx.Tx) (int, error) {
}

// deleteScore removes elements from a set by score.
func (c DeleteCmd) deleteScore(tx sqlx.Tx) (int, error) {
func (c DeleteCmd) deleteScore() (int, error) {
args := []any{
c.key,
time.Now().UnixMilli(),
c.byScore.start,
c.byScore.stop,
}
res, err := tx.Exec(sqlDeleteScore, args...)
res, err := c.tx.Exec(sqlDeleteScore, args...)
if err != nil {
return 0, err
}
Expand Down
49 changes: 10 additions & 39 deletions internal/rzset/inter.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ const (

// InterCmd intersects multiple sets.
type InterCmd struct {
db *DB
tx *Tx
tx sqlx.Tx
dest string
keys []string
aggregate string
Expand Down Expand Up @@ -84,39 +83,6 @@ func (c InterCmd) Max() InterCmd {
// The score of each element is the aggregate of its scores in the given sets.
// If any of the source keys do not exist or are not sets, returns an empty slice.
func (c InterCmd) Run() ([]SetItem, error) {
if c.db != nil {
return c.inter(c.db.SQL)
}
if c.tx != nil {
return c.inter(c.tx.tx)
}
return nil, nil
}

// Store intersects multiple sets and stores the result in a new set.
// Returns the number of elements in the resulting set.
// If the destination key already exists, it is fully overwritten
// (all old elements are removed and the new ones are inserted).
// If any of the source keys do not exist or are not sets, does nothing,
// except deleting the destination key if it exists.
func (c InterCmd) Store() (int, error) {
if c.db != nil {
var count int
err := c.db.Update(func(tx *Tx) error {
var err error
count, err = c.store(tx.tx)
return err
})
return count, err
}
if c.tx != nil {
return c.store(c.tx.tx)
}
return 0, nil
}

// inter returns the intersection of multiple sets.
func (c InterCmd) inter(tx sqlx.Tx) ([]SetItem, error) {
// Prepare query arguments.
query := sqlInter
if c.aggregate != sqlx.Sum {
Expand All @@ -131,7 +97,7 @@ func (c InterCmd) inter(tx sqlx.Tx) ([]SetItem, error) {

// Execute the query.
var rows *sql.Rows
rows, err := tx.Query(query, args...)
rows, err := c.tx.Query(query, args...)
if err != nil {
return nil, err
}
Expand All @@ -153,8 +119,13 @@ func (c InterCmd) inter(tx sqlx.Tx) ([]SetItem, error) {
return items, nil
}

// store intersects multiple sets and stores the result in a new set.
func (c InterCmd) store(tx sqlx.Tx) (int, error) {
// Store intersects multiple sets and stores the result in a new set.
// Returns the number of elements in the resulting set.
// If the destination key already exists, it is fully overwritten
// (all old elements are removed and the new ones are inserted).
// If any of the source keys do not exist or are not sets, does nothing,
// except deleting the destination key if it exists.
func (c InterCmd) Store() (int, error) {
// Insert the destination key and get its ID.
now := time.Now().UnixMilli()
args := []any{
Expand All @@ -179,7 +150,7 @@ func (c InterCmd) store(tx sqlx.Tx) (int, error) {
query, keyArgs := sqlx.ExpandIn(query, ":keys", c.keys)
args = slices.Concat(args, keyArgs, []any{len(c.keys)})

res, err := tx.Exec(query, args...)
res, err := c.tx.Exec(query, args...)
if err != nil {
return 0, err
}
Expand Down
12 changes: 5 additions & 7 deletions internal/rzset/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (tx *Tx) Delete(key string, elems ...any) (int, error) {

// DeleteWith removes elements from a set with additional options.
func (tx *Tx) DeleteWith(key string) DeleteCmd {
return DeleteCmd{tx: tx, key: key}
return DeleteCmd{tx: tx.tx, key: key}
}

// GetRank returns the rank and score of an element in a set.
Expand Down Expand Up @@ -259,13 +259,12 @@ func (tx *Tx) Incr(key string, elem any, delta float64) (float64, error) {
// The score of each element is the sum of its scores in the given sets.
// If any of the source keys do not exist or are not sets, returns an empty slice.
func (tx *Tx) Inter(keys ...string) ([]SetItem, error) {
cmd := InterCmd{tx: tx, keys: keys, aggregate: sqlx.Sum}
return cmd.Run()
return tx.InterWith(keys...).Run()
}

// InterWith intersects multiple sets with additional options.
func (tx *Tx) InterWith(keys ...string) InterCmd {
return InterCmd{tx: tx, keys: keys, aggregate: sqlx.Sum}
return InterCmd{tx: tx.tx, keys: keys, aggregate: sqlx.Sum}
}

// Len returns the number of elements in a set.
Expand Down Expand Up @@ -346,13 +345,12 @@ func (tx *Tx) Scanner(key, pattern string, pageSize int) *Scanner {
// Ignores the keys that do not exist or are not sets.
// If no keys exist, returns a nil slice.
func (tx *Tx) Union(keys ...string) ([]SetItem, error) {
cmd := UnionCmd{tx: tx, keys: keys, aggregate: sqlx.Sum}
return cmd.Run()
return tx.UnionWith(keys...).Run()
}

// UnionWith unions multiple sets with additional options.
func (tx *Tx) UnionWith(keys ...string) UnionCmd {
return UnionCmd{tx: tx, keys: keys, aggregate: sqlx.Sum}
return UnionCmd{tx: tx.tx, keys: keys, aggregate: sqlx.Sum}
}

// add adds or updates the element in a set.
Expand Down

0 comments on commit ce5edee

Please sign in to comment.