funcmain() { // Create a pool with go-redis (or redigo) which is the pool redisync will // use while communicating with Redis. This can also be any pool that // implements the `redis.Pool` interface. client := goredislib.NewClient(&goredislib.Options{ Addr: "localhost:6379", }) pool := goredis.NewPool(client) // or, pool := redigo.NewPool(...)
// Create an instance of redisync to be used to obtain a mutual exclusion // lock. rs := redsync.New(pool)
// Obtain a new mutex by using the same name for all instances wanting the // same lock. mutexname := "my-global-mutex" mutex := rs.NewMutex(mutexname)
// Obtain a lock for our given mutex. After this is successful, no one else // can obtain the same lock (the same mutex name) until we unlock it. if err := mutex.Lock(); err != nil { panic(err) }
// Do your work that requires the lock.
// Release the lock so other processes or threads can obtain a lock. if ok, err := mutex.Unlock(); !ok || err != nil { panic("unlock failed") } }
// Create a pool with go-redis (or redigo) which is the pool redisync will // use while communicating with Redis. This can also be any pool that // implements the `redis.Pool` interface. client := goredislib.NewClient(&goredislib.Options{ Addr: "localhost:6379", }) pool := goredis.NewPool(client) // or, pool := redigo.NewPool(...)
// Create an instance of redisync to be used to obtain a mutual exclusion // lock. rs := redsync.New(pool)
wg := sync.WaitGroup{} wg.Add(100) for i := 0; i < 100; i++ { err := RedisSync(db, &wg, rs, 1) if err != nil { log.Println(err) } } wg.Wait() }
funcRedisSync(db *gorm.DB, wg *sync.WaitGroup, rs *redsync.Redsync, id uint)error { defer wg.Done()
mutexname := fmt.Sprintf("goods:id:%d", id) // Obtain a new mutex by using the same name for all instances wanting the // same lock. mutex := rs.NewMutex(mutexname)
// Obtain a lock for our given mutex. After this is successful, no one else // can obtain the same lock (the same mutex name) until we unlock it. err := mutex.Lock() if err != nil { return err }
UNLOCK: // Release the lock so other processes or threads can obtain a lock. if ok, err := mutex.Unlock(); !ok || err != nil { return fmt.Errorf("something wrong: %w", err) }
returnnil }
通过查看源码,可以看到redsync的逻辑,思想与自己实现很相似
1 2 3 4 5 6 7 8 9
// 创建客户端和资源池,通过go-redis的包内的client对象 client := goredislib.NewClient(&goredislib.Options{ Addr: "localhost:6379", }) pool := goredis.NewPool(client) // or, pool := redigo.NewPool(...)
// Create an instance of redisync to be used to obtain a mutual exclusion // lock. rs := redsync.New(pool)
// 重试次数 for i := 0; i < m.tries; i++ { if i != 0 { select { case <-ctx.Done(): // Exit early if the context is done. return ErrFailed case <-time.After(m.delayFunc(i)): // Fall-through when the delay timer completes. } }
start := time.Now()
n, err := func() (int, error) { ctx, cancel := context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor))) defer cancel() return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) { // 加锁的核心逻辑 return m.acquire(ctx, pool, value) }) }() if n == 0 && err != nil { return err }
now := time.Now() // driftFactor 解决时钟漂移的问题 // 当前时间,加上(超时时间扣除获取pools中的锁花费的时间,以及漂移时钟的时间)为一个过期时间节点 until := now.Add(m.expiry - now.Sub(start) - time.Duration(int64(float64(m.expiry)*m.driftFactor))) // 大于大多数,并且没有超时,则认为拿到了锁 if n >= m.quorum && now.Before(until) { m.value = value m.until = until returnnil } // 没有拿到锁,则需要将其他的加上了锁的节点上的锁释放掉 _, err = func() (int, error) { ctx, cancel := context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor))) defer cancel() return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) { // 解锁的逻辑 return m.release(ctx, pool, value) }) }() if i == m.tries-1 && err != nil { return err } }
// UnlockContext unlocks m and returns the status of unlock. func(m *Mutex) UnlockContext(ctx context.Context) (bool, error) { n, err := m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) { return m.release(ctx, pool, m.value) }) if n < m.quorum { returnfalse, err } returntrue, nil }
// 用lua脚本,通过value判断是否是自己的锁 var deleteScript = redis.NewScript(1, ` if redis.call("GET", KEYS[1]) == ARGV[1] then return redis.call("DEL", KEYS[1]) else return 0 end `)
func(m *Mutex) release(ctx context.Context, pool redis.Pool, value string) (bool, error) { conn, err := pool.Get(ctx) if err != nil { returnfalse, err } defer conn.Close() status, err := conn.Eval(deleteScript, m.name, value) if err != nil { returnfalse, err } return status != int64(0), nil }