如何使用Redis实现分布式锁?(含代码示例)

使用 Redis 实现分布式锁:原理、方法与代码示例

在分布式系统中,多个服务或进程可能需要同时访问共享资源。为了防止数据不一致和竞态条件,我们需要一种机制来确保在任何给定时刻,只有一个客户端可以访问该资源。这就是分布式锁发挥作用的地方。

Redis,作为一个高性能的键值存储数据库,经常被用作实现分布式锁的工具。它提供了原子操作、过期时间等特性,非常适合构建可靠且高效的分布式锁。

1. 分布式锁的基本概念

在深入了解如何使用 Redis 实现分布式锁之前,让我们先回顾一下分布式锁需要满足的关键特性:

  • 互斥性 (Mutual Exclusion): 这是最基本的要求。在任何时刻,只有一个客户端能够持有锁。
  • 无死锁 (Deadlock Freedom): 即使持有锁的客户端崩溃或无法正常释放锁,系统也必须能够保证最终锁会被释放,避免死锁。
  • 容错性 (Fault Tolerance): 只要大多数 Redis 节点正常运行,客户端就应该能够获取和释放锁。
  • 可重入性(Reentrancy)(可选): 同一个客户端(或线程)可以多次获取同一把锁,而不会导致死锁。这对于某些场景非常有用。
  • 高性能 (High Performance): 获取和释放锁的操作应该尽可能快,对系统性能的影响最小。

2. 基于 Redis 的分布式锁实现方法

2.1. 简单的 SETNX/DEL 方法 (不推荐)

最简单的方法是使用 Redis 的 SETNX (SET if Not eXists) 命令。如果键不存在,SETNX 会设置键的值并返回 1;如果键已经存在,则不做任何操作并返回 0。客户端可以通过检查返回值来判断是否成功获取锁。释放锁则使用 DEL 命令删除对应的键。

Python 代码示例 (不推荐):

```python
import redis
import time

def acquire_lock(conn, lock_name):
"""尝试获取锁"""
return conn.setnx(lock_name, 1)

def release_lock(conn, lock_name):
"""释放锁"""
conn.delete(lock_name)

使用示例

conn = redis.StrictRedis(host='localhost', port=6379, db=0)
if acquire_lock(conn, 'my_lock'):
try:
# 执行需要保护的代码
print("Acquired the lock! Doing some work...")
time.sleep(5) # 模拟工作
finally:
release_lock(conn, 'my_lock')
print("Released the lock.")
else:
print("Failed to acquire the lock.")

```

问题:

这种方法虽然简单,但存在严重的缺陷:

  1. 缺乏过期时间: 如果客户端在获取锁后崩溃,而没有执行 DEL 操作,锁将永远不会被释放,导致死锁。
  2. 非原子操作: SETNX 和后续的设置过期时间操作(例如 EXPIRE)不是一个原子操作。如果在 SETNX 成功后,但在 EXPIRE 执行之前客户端崩溃,同样会导致锁无法释放。
  3. 误删其他客户端的锁: 如果客户端A获取锁后执行时间过长,导致锁自动过期,此时客户端B获取锁,然后客户端A执行完毕,会误删客户端B的锁。

2.2. SETNX + EXPIRE 方法 (仍然不推荐)

为了解决上述第一个问题,可以尝试在 SETNX 之后立即使用 EXPIRE 命令设置锁的过期时间。

Python 代码示例 (仍然不推荐):

```python
import redis
import time

def acquire_lock(conn, lock_name, lock_timeout):
"""尝试获取锁"""
if conn.setnx(lock_name, 1):
conn.expire(lock_name, lock_timeout) # 设置过期时间
return True
return False

release_lock 函数与之前相同

使用示例 (与之前类似,只是 acquire_lock 多了一个 lock_timeout 参数)

```

问题:
虽然增加了过期时间,但这种方法解决了问题1,但依然存在2,3两个问题,还是不可靠。

2.3. SET 命令与选项 (推荐)

Redis 2.6.12 及更高版本提供了一个更好的解决方案:SET 命令本身可以携带多个选项,从而在一个原子操作中完成锁的获取。

SET key value [EX seconds] [PX milliseconds] [NX|XX]

  • key: 锁的名称。
  • value: 锁的值(通常是一个唯一的标识符,用于防止误删)。
  • EX seconds: 设置键的过期时间(秒)。
  • PX milliseconds: 设置键的过期时间(毫秒)。
  • NX: 仅当键不存在时才设置键(相当于 SETNX)。
  • XX: 仅当键已经存在时才设置键。

使用 SET 命令结合 NXEX 选项,我们可以原子性地获取锁并设置过期时间。

Python 代码示例 (推荐):

```python
import redis
import time
import uuid

def acquire_lock(conn, lock_name, lock_timeout, acquire_timeout=10):
"""尝试获取锁

Args:
    conn: Redis 连接对象。
    lock_name: 锁的名称。
    lock_timeout: 锁的过期时间(秒)。
    acquire_timeout: 获取锁的超时时间(秒)。

Returns:
    如果成功获取锁,返回锁的唯一标识符(用于释放锁);否则返回 None。
"""
identifier = str(uuid.uuid4())  # 生成唯一标识符
end = time.time() + acquire_timeout

while time.time() < end:
    # 原子性地获取锁并设置过期时间
    if conn.set(lock_name, identifier, ex=lock_timeout, nx=True):
        return identifier
    time.sleep(0.001)  # 稍作等待,避免过于频繁的循环

return None

def release_lock(conn, lock_name, identifier):
"""释放锁

Args:
    conn: Redis 连接对象。
    lock_name: 锁的名称。
    identifier: 锁的唯一标识符。

Returns:
    如果成功释放锁,返回 True;否则返回 False。
"""
pipe = conn.pipeline(True)  # 使用流水线,保证原子性

while True:
    try:
        pipe.watch(lock_name)  # 监视锁,确保在释放锁期间没有其他客户端修改它
        lock_value = pipe.get(lock_name)
        if lock_value and lock_value.decode() == identifier:
            pipe.multi()
            pipe.delete(lock_name)  # 释放锁
            pipe.execute()
            return True
        else: # 锁不存在,或者锁的值已经不匹配
            pipe.unwatch() # 没有获取到锁,或者锁的值不匹配,取消watch命令
            return False
    except redis.exceptions.WatchError:
        # 锁被其他客户端修改,重试
        continue

使用示例

conn = redis.StrictRedis(host='localhost', port=6379, db=0)
identifier = acquire_lock(conn, 'my_lock', lock_timeout=10)

if identifier:
try:
# 执行需要保护的代码
print(f"Acquired the lock with identifier: {identifier}")
time.sleep(5)
finally:
released = release_lock(conn, 'my_lock', identifier)
if released:
print("Successfully released the lock.")
else:
print("Failed to release the lock.")
else:
print("Failed to acquire the lock.")

```

解释:

  • acquire_lock 函数:
    • 使用 uuid.uuid4() 生成一个唯一的标识符,作为锁的值。
    • 使用 conn.set(lock_name, identifier, ex=lock_timeout, nx=True) 原子性地尝试获取锁。如果锁不存在(nx=True),则设置锁的值为标识符,并设置过期时间(ex=lock_timeout)。
    • 使用循环和 acquire_timeout 参数来控制获取锁的超时时间。
  • release_lock 函数:
    • 使用 Redis 的 WATCH 命令来监视锁。WATCH 保证了在执行 MULTIEXEC 之间的操作期间,如果被监视的键被其他客户端修改,事务将失败。
    • 获取锁的值(pipe.get(lock_name))。
    • 检查锁的值是否与获取锁时生成的标识符匹配。只有匹配时才执行删除操作(pipe.delete(lock_name))。
    • 使用 pipe.multi()pipe.execute() 来执行事务。

这种方法通过以下方式解决了之前的问题:

  1. 原子性: SET 命令与 NXEX 选项一起使用,保证了获取锁和设置过期时间是原子操作。
  2. 防止误删: 只有当锁的值与客户端持有的标识符匹配时,才允许释放锁。这防止了客户端 A 释放客户端 B 获取的锁。
  3. 避免死锁: 通过设置过期时间,即使客户端崩溃,锁也会在一段时间后自动释放。

2.4. Redlock 算法 (更严格的分布式锁)

Redlock 是 Redis 的作者 Salvatore Sanfilippo 提出的一种更严格的分布式锁算法,它旨在解决单实例 Redis 存在的单点故障问题。Redlock 需要使用多个独立的 Redis 实例(例如 5 个),并且只有在大多数实例上成功获取锁时,才认为客户端成功获取了锁。

Redlock 的基本步骤:

  1. 客户端获取当前时间戳。
  2. 客户端按顺序尝试在所有 Redis 实例上获取锁,使用相同的键名和随机值,并设置一个较短的超时时间(例如几毫秒)。
  3. 客户端计算获取锁所花费的时间。如果客户端在大多数实例上成功获取锁,并且总耗时小于锁的有效时间,则认为获取锁成功。
  4. 如果获取锁成功,锁的实际有效时间是 originally 设置的 TTL 减去获取锁所花费的时间。
  5. 如果客户端获取锁失败(未能在足够多的实例上获取锁,或者总耗时超过了锁的有效时间),它将尝试在所有实例上释放锁(即使它认为自己在某些实例上从未成功获取锁)。

Python 代码示例 (Redlock):

```python
import redis
import time
import uuid

class Redlock:
def init(self, redis_connections, lock_name, lock_timeout):
"""
初始化 Redlock 对象

    Args:
        redis_connections: Redis 连接列表 (例如 [{'host': 'localhost', 'port': 6379, 'db': 0}, ...])。
        lock_name: 锁的名称。
        lock_timeout: 锁的过期时间(秒)。
    """
    self.redis_connections = redis_connections
    self.lock_name = lock_name
    self.lock_timeout = lock_timeout
    self.clients = []
    for conn_info in redis_connections:
        self.clients.append(redis.StrictRedis(**conn_info))
    self.quorum = len(self.clients) // 2 + 1  # 多数派数量


def acquire(self, acquire_timeout=10):
  """
  尝试获取锁
  Args:
        acquire_timeout: 获取锁的超时时间(秒)。
  """

  identifier = str(uuid.uuid4())
  end = time.time() + acquire_timeout
  lock_timeout_ms = int(self.lock_timeout * 1000) # 过期时间,毫秒

  while time.time() < end:
      success_count = 0
      for client in self.clients:
          try:
              if client.set(self.lock_name, identifier, px=lock_timeout_ms, nx=True):
                  success_count += 1
          except redis.RedisError:
              pass  # 忽略连接错误等

      if success_count >= self.quorum:
          return identifier  # 成功获取到多数派的锁

      # 如果没有获取到多数派的锁,尝试释放已经获取到的锁
      for client in self.clients:
          try:
              self._release_single(client, identifier)
          except redis.RedisError:
              pass
      time.sleep(0.01)  # 等待一段时间,避免频繁尝试
  return False

def _release_single(self, client, identifier):
    """
    释放单个 Redis 实例上的锁(内部使用)
    """
    script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
    """
    client.eval(script, 1, self.lock_name, identifier)

def release(self, identifier):
  """释放锁"""
  for client in self.clients:
      try:
          self._release_single(client, identifier)
      except redis.RedisError:
          pass  # 忽略错误

使用示例

redis_connections = [
{'host': 'localhost', 'port': 6379, 'db': 0},
{'host': 'localhost', 'port': 6380, 'db': 0},
{'host': 'localhost', 'port': 6381, 'db': 0},
]

redlock = Redlock(redis_connections, 'my_resource_lock', lock_timeout=10)
identifier = redlock.acquire()

if identifier:
try:
# 执行需要保护的代码
print(f"Acquired the Redlock with identifier: {identifier}")
time.sleep(5)
finally:
redlock.release(identifier)
print("Released the Redlock.")
else:
print("Failed to acquire the Redlock.")

```

解释:

  • Redlock 类封装了 Redlock 算法的逻辑。
  • __init__ 方法初始化 Redis 连接列表、锁的名称、锁的超时时间,并计算多数派数量(quorum)。
  • acquire 方法尝试在所有 Redis 实例上获取锁。只有当成功获取锁的实例数量达到多数派时,才认为获取锁成功。
  • _release_single 使用 Lua 脚本原子性地释放单个 Redis 实例上的锁 (防止误删)。
  • release 方法尝试在所有 Redis 实例上释放锁。

Redlock 的优点:

  • 更高的容错性: 即使部分 Redis 实例宕机,只要多数派实例正常运行,Redlock 仍然可以正常工作。

Redlock 的缺点:

  • 更复杂: Redlock 的实现比单实例 Redis 锁更复杂。
  • 性能开销: 需要与多个 Redis 实例通信,性能开销更大。
  • 仍然可能存在的问题: Redlock 算法虽然比单实例 Redis 锁更可靠,但它并不能保证在所有情况下都完美无缺。例如,如果发生严重网络分区,或者时钟漂移过大,Redlock 仍然可能出现问题。

2.5 可重入锁

可重入锁是指同一个线程可以多次获取同一把锁,而不会造成死锁。在Redis分布式锁中,实现可重入性需要对锁的value进行改造,不能只存储一个UUID了,可以考虑使用UUID:次数的形式

Python 代码示例 (可重入锁):

```python
import redis
import time
import uuid

def acquire_reentrant_lock(conn, lock_name, lock_timeout, acquire_timeout=10):
"""尝试获取可重入锁"""
identifier = str(uuid.uuid4())
end = time.time() + acquire_timeout
lock_value = identifier + ':1' #初始值
lock_timeout_ms = int(lock_timeout * 1000)

while time.time() < end:
    if conn.set(lock_name, lock_value, px=lock_timeout_ms, nx=True):
        return identifier

    current_value = conn.get(lock_name)
    if current_value:
        current_identifier, count_str = current_value.decode().split(':')
        if current_identifier == identifier:
            new_count = int(count_str) + 1
            new_lock_value = f"{identifier}:{new_count}"
            # 虽然这里没有使用NX,但由于我们使用了WATCH,也能保证原子性
            if conn.watch(lock_name) == [True]: # watch成功
                try:
                    conn.multi()
                    conn.set(lock_name, new_lock_value, px=lock_timeout_ms)
                    conn.execute()
                    return identifier
                except redis.exceptions.WatchError:
                    continue  # WATCH 失败,重试
    time.sleep(0.001)
return None

def release_reentrant_lock(conn, lock_name, identifier):
"""释放可重入锁"""
pipe = conn.pipeline(True)
while True:
try:
pipe.watch(lock_name)
current_value = pipe.get(lock_name)
if current_value:
current_identifier, count_str = current_value.decode().split(':')
if current_identifier == identifier:
count = int(count_str)
if count > 1:
new_count = count - 1
new_lock_value = f"{identifier}:{new_count}"
pipe.multi()
pipe.set(lock_name, new_lock_value)
pipe.execute()
else:
pipe.multi()
pipe.delete(lock_name)
pipe.execute()
return True
pipe.unwatch()
return False # 锁不存在或标识符不匹配
except redis.exceptions.WatchError:
continue
```
这个示例与2.3节的非可重入锁主要区别在于:

  1. 锁的值: 锁的值现在是 identifier:count 的形式,其中 identifier 是客户端的唯一标识符,count 是重入计数。
  2. 获取锁:
    • 如果锁不存在,则创建锁,并将计数设置为 1。
    • 如果锁已存在,检查标识符是否与当前客户端的标识符匹配。如果匹配,则增加计数。注意,这里为了保证增加计数的原子性,使用了WATCH命令。
  3. 释放锁:
    • 检查标识符是否匹配。
    • 如果计数大于 1,则减少计数。
    • 如果计数为 1,则删除锁。

3. 总结与最佳实践

  • 选择合适的实现:
    • 如果你的应用对分布式锁的要求不高,可以使用基于 SET 命令与选项的单实例 Redis 锁。
    • 如果需要更高的容错性,可以考虑 Redlock 算法。
    • 如果需要可重入性,需要修改锁的value结构。
  • 设置合理的超时时间: 锁的超时时间应该根据业务逻辑的执行时间来设置。既不能太短(导致锁过早释放),也不能太长(导致客户端长时间等待)。
  • 使用唯一的标识符: 确保每个客户端使用唯一的标识符来获取和释放锁,防止误删其他客户端的锁。
  • 处理异常: 在获取和释放锁的过程中,可能会发生各种异常(例如网络错误、Redis 宕机等)。务必妥善处理这些异常,避免死锁或其他问题。
  • 考虑使用 Lua 脚本: 对于更复杂的锁操作,可以考虑使用 Lua 脚本来保证操作的原子性。
  • 监控: 监控你的分布式锁系统。跟踪锁的获取和释放时间、失败次数等指标,以便及时发现和解决问题。
  • 不要过度依赖: 分布式锁会增加系统的复杂性和开销。 只有在绝对必要时才使用它们.

希望这篇文章能够帮助你理解如何使用 Redis 实现分布式锁。记住,没有一种完美的分布式锁解决方案,你需要根据你的具体需求选择最合适的实现方式。

THE END