面试复盘6.0

发布于:2025-06-30 ⋅ 阅读:(23) ⋅ 点赞:(0)

1. get和post的区别 在http上和在TCP上

一、HTTP 层面的区别(应用层协议)

HTTP 是基于请求 - 响应模式的应用层协议,GET 和 POST 是其中最常用的两种请求方法,二者在设计目标、数据传输方式等方面存在显著差异:

对比维度 GET 方法 POST 方法
设计用途 用于获取资源(读取数据),例如获取网页、查询信息。 用于提交资源(创建或更新数据),例如提交表单、上传文件。
参数位置 参数附加在 URL 中(如 ?key1=value1&key2=value2),通过 URL 传递。 参数放在请求体(Request Body)中,URL 仅包含资源路径。
URL 长度限制 受浏览器和服务器限制(通常浏览器限制 URL 长度在 2000 字符以内),不适合传输大量数据。 无固定长度限制,可传输大量数据(受服务器配置影响)。
幂等性 具有幂等性:多次请求同一 URL 时,结果相同(不会修改服务器状态)。 不具有幂等性:多次提交可能导致资源重复创建或数据多次修改。
安全性 参数暴露在 URL 中,可能被缓存、记录或篡改,不适合传递敏感数据(如密码)。 参数在请求体中,相对隐蔽,但安全性仍依赖 HTTP 协议本身(如未加密时仍可被监听)。
缓存机制 通常可被浏览器缓存,响应结果可通过缓存直接返回。 默认不被缓存,如需缓存需手动设置响应头。
HTTP 规范约束 RFC 7231 规定:GET 应仅用于获取资源,不应产生副作用。 RFC 7231 规定:POST 用于提交数据,可修改服务器状态。
常见场景 搜索查询、获取文章内容、API 数据查询等。 提交表单、上传文件、创建用户账户、支付操作等。
二、TCP 层面的区别(传输层协议)

TCP 是面向连接的可靠传输协议,负责将 HTTP 数据封装成数据包并确保传输可靠性。GET 和 POST 在 TCP 层面的差异主要体现在 数据组织形式 和 传输行为 上:

  1. TCP 数据包结构差异

    • GET 请求
      • URL 包含完整参数(如 GET /api/data?name=test HTTP/1.1),请求行和请求头后直接跟空行(无请求体)。
      • TCP 数据包中的数据部分仅包含请求行、请求头和空行,数据量通常较小。
    • POST 请求
      • URL 仅包含资源路径(如 POST /api/submit HTTP/1.1),参数放在请求体中。
      • 请求头中需包含 Content-Type 和 Content-Length 字段,指明请求体的类型和长度。
      • TCP 数据包中的数据部分包含请求行、请求头、空行和请求体,数据量可较大(如文件上传时)。
  2. TCP 传输行为影响

    • 数据分段
      • GET 的 URL 和请求头若超过 TCP 最大段大小(MSS),会被拆分为多个 TCP 段传输。
      • POST 的请求体若较大(如超过 MSS),也会被分段传输,但请求头中的 Content-Length 可让服务器明确数据总长度。
    • 流量控制与拥塞控制
      • TCP 层的流量控制(滑动窗口)和拥塞控制(慢启动、拥塞避免)机制对 GET 和 POST 一视同仁,不因 HTTP 方法而改变。
    • 连接状态
      • 二者均基于 TCP 连接传输,可复用同一 TCP 连接(如 HTTP/1.1 的持久连接)或新建连接。
  3. 底层无本质协议差异

    • TCP 作为传输层协议,不关心应用层数据的具体含义(如 GET/POST 方法),仅负责按字节流传输数据。
    • 二者的差异本质上是 应用层协议(HTTP)对 TCP 数据的组织方式不同,而非 TCP 协议本身的行为差异。
三、总结:分层视角下的核心差异
  • HTTP 层:关注 “如何使用数据”,体现在方法语义、参数位置、安全性等应用逻辑差异。
  • TCP 层:关注 “如何传输数据”,体现在数据包结构、数据分段等传输形式差异,但不改变 TCP 协议的基本机制(如连接建立、可靠性保证)。

理解这种分层差异有助于更清晰地分析网络请求的行为 —— 例如,GET 的参数暴露问题本质是 HTTP 设计导致,而 TCP 层仅负责传输字节流,无法解决应用层的安全隐患(需通过 HTTPS 加密或规范参数使用来弥补)。

2.python的垃圾回收机制,引用计数中什么情况会+1什么情况会-1

一、Python 垃圾回收的核心机制

Python 的垃圾回收(Garbage Collection, GC)主要依赖三种机制:

  1. 引用计数(Reference Counting):最核心的机制,实时追踪对象引用数,为 0 时立即回收。
  2. 分代回收(Generational GC):周期性检查并回收长生命周期对象。
  3. 循环引用检测:通过标记 - 清除算法处理对象间的循环引用。

其中,引用计数 是最基础且直观的机制,下面详细解释其计数规则。

二、引用计数增加(+1)的场景

当对象被创建或被其他变量 / 数据结构引用时,引用计数增加:

  1. 对象被创建

    a = [1, 2, 3]  # 列表对象 [1, 2, 3] 的引用计数初始为 1(被 a 引用)
    
  2. 对象被赋值给其他变量

    b = a  # 引用计数 +1,变为 2(a 和 b 同时引用该列表)
    
  3. 对象作为元素被放入容器(如列表、字典)

    c = [a, 4]  # 列表对象 [1, 2, 3] 的引用计数 +1,变为 3(a、b、c[0] 引用它)
    
  4. 对象作为参数传递给函数 / 方法

    def func(obj):
        print(obj)
    
    func(a)  # 函数调用时,参数 obj 引用该对象,计数 +1(函数内部临时引用)
    
  5. 返回值引用对象

    def return_obj():
        return a
    
    d = return_obj()  # 返回值被 d 接收,引用计数 +1
    
三、引用计数减少(-1)的场景

当引用被删除或超出作用域时,引用计数减少:

  1. 变量被显式删除

    del a  # 引用计数 -1,变为 2(b 和 c[0] 仍引用该对象)
    
  2. 变量被赋值为其他对象

    b = 100  # b 不再引用原列表,引用计数 -1,变为 1(仅 c[0] 引用)
    
  3. 容器对象被销毁

    del c  # 列表 c 被销毁,其内部元素的引用也被释放,原列表引用计数 -1,变为 0
    
  4. 函数 / 方法调用结束

    def func(obj):
        pass  # 函数执行结束后,参数 obj 对对象的引用消失,计数 -1
    
  5. 局部变量超出作用域

    def create_list():
        x = [1, 2, 3]  # x 的引用计数为 1
        return x       # 返回后 x 被释放,但对象被返回值引用,计数仍为 1
    
    # 函数调用结束后,局部变量 x 失效,但返回值可能被接收(如 y = create_list())
    
四、特殊情况:循环引用

当两个或多个对象互相引用时,会形成 循环引用,导致引用计数永远不为 0,此时需要 Python 的 分代回收 和 标记 - 清除 机制介入:

a = []
b = []
a.append(b)  # a 引用 b
b.append(a)  # b 引用 a
# a 和 b 的引用计数均为 2,但互相引用形成循环
# 若 del a 和 del b,引用计数变为 1(互相引用),需 GC 机制处理
五、查看引用计数的方法

可使用 sys.getrefcount() 查看对象的引用计数(注意:函数调用本身会临时增加一次引用):

import sys

a = [1, 2, 3]
print(sys.getrefcount(a))  # 输出 2(a 和函数参数各引用一次)

六、总结

引用计数的增减规则可简化为:

  • +1:对象被创建、赋值、放入容器、作为参数传递、作为返回值。
  • -1:变量被删除、重新赋值、容器销毁、函数调用结束、局部变量超出作用域。

循环引用是引用计数的局限性,需依赖其他 GC 机制解决。理解这些规则有助于编写高效且无内存泄漏的 Python 代码。

3.多线程和协程的区别,python项目中用到的多线程和协程的场景

多线程与协程的区别及应用场景

一、核心概念对比
维度 多线程(Thread) 协程(Coroutine)
执行单位 系统级线程,由操作系统调度 用户级协程,由程序(或事件循环)调度
切换机制 内核态切换,涉及上下文保存 / 恢复,开销大 用户态切换,仅保存 / 恢复寄存器和栈信息,开销小
并发性 可实现真正的并行(多核 CPU) 单线程内的并发(逻辑上的并行)
阻塞影响 一个线程阻塞会导致内核调度其他线程 一个协程阻塞会主动让出控制权给其他协程
资源占用 每个线程约占用 MB 级内存(视系统而定) 每个协程仅占用 KB 级内存
编程难度 需处理锁、同步、竞态条件等问题 无需锁机制,代码结构更简洁
适用场景 I/O 密集型和 CPU 密集型任务 高度 I/O 密集型任务(如网络爬虫、Web 服务)
二、Python 中的实现差异
特性 多线程(threading 模块) 协程(asyncio 模块)
调度方式 操作系统抢占式调度 协作式调度(通过 await 主动让出控制权)
GIL 限制 受全局解释器锁(GIL)限制,同一时刻仅一个线程执行 Python 代码 无 GIL 限制,单线程内可高效处理多任务
阻塞处理 阻塞操作(如 time.sleep())会导致整个线程暂停 需使用 asyncio.sleep() 等非阻塞操作
并发库支持 兼容传统同步库(如 requests 需使用异步库(如 aiohttpasyncpg
异常处理 线程间异常独立,需手动捕获和传播 协程异常会向上传播,可统一处理
三、典型应用场景
1. 多线程适用场景
  • 混合 I/O 和 CPU 密集型任务
    • 例:图像处理程序中,主线程处理用户界面,子线程执行图像算法。
  • 需要真正并行的计算任务(需绕过 GIL):
    • 例:使用 multiprocessing 结合线程池处理科学计算。
  • 阻塞操作无法避免的场景
    • 例:调用第三方同步 API(如 requests 发送 HTTP 请求)。

示例代码(多线程下载文件)

import threading
import requests

def download(url):
    response = requests.get(url)
    print(f"下载完成: {url}, 大小: {len(response.content)}")

urls = ["http://example.com/file1", "http://example.com/file2"]
threads = [threading.Thread(target=download, args=(url,)) for url in urls]

for t in threads:
    t.start()
for t in threads:
    t.join()
2. 协程适用场景
  • 高并发 I/O 密集型任务
    • 例:Web 服务器(如 FastAPI、Sanic)处理大量短连接请求。
  • 需要高效利用单线程资源的场景
    • 例:网络爬虫(如 aiohttp 并发抓取 thousands 个页面)。
  • 异步数据流处理
    • 例:实时日志分析、消息队列消费者。

示例代码(协程下载文件)

import asyncio
import aiohttp

async def download(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            content = await response.read()
            print(f"下载完成: {url}, 大小: {len(content)}")

urls = ["http://example.com/file1", "http://example.com/file2"]
asyncio.run(asyncio.gather(*[download(url) for url in urls]))
3. 混合使用场景
  • I/O 密集型 + CPU 密集型任务
    • 例:Web 服务器处理请求时,将计算密集型任务交给进程池,I/O 任务由协程处理。
  • 多阶段异步工作流
    • 例:爬虫先使用协程批量抓取页面,再用线程池解析 HTML。

示例代码(混合使用)

import asyncio
from concurrent.futures import ProcessPoolExecutor

async def fetch_urls(urls):
    # 协程处理 I/O 密集型任务
    async with aiohttp.ClientSession() as session:
        tasks = [session.get(url) for url in urls]
        return await asyncio.gather(*tasks)

def process_data(data):
    # 进程池处理 CPU 密集型任务
    return [len(d) for d in data]

async def main():
    urls = ["http://example.com"] * 100
    responses = await fetch_urls(urls)
    data = [await r.text() for r in responses]
    
    with ProcessPoolExecutor() as pool:
        results = await asyncio.get_running_loop().run_in_executor(pool, process_data, data)
    print(results)
四、选择策略
  1. 优先考虑协程
    若任务为纯 I/O 密集型,且可使用异步库(如 aiohttp),协程是首选(高效、低资源消耗)。

  2. 考虑多线程
    若任务涉及阻塞操作(如调用同步 API),或需要并行执行计算,使用多线程。

  3. 混合使用
    复杂场景中,可结合协程(处理 I/O)和多进程 / 线程(处理计算)。

五、性能对比
场景 单线程 多线程 协程
1000 次 HTTP 请求 中等
1000 次 CPU 密集计算 中等 中等
10000 并发连接 不可用 资源耗尽
六、总结
  • 多线程:适合需要并行执行、无法避免阻塞的场景,但需处理线程安全问题。
  • 协程:适合高并发 I/O 场景,代码简洁且资源消耗少,但需依赖异步库生态。
  • Python 最佳实践
    • I/O 密集型 → 协程(如 asyncio + aiohttp
    • CPU 密集型 → 多进程(如 multiprocessing
    • 混合场景 → 协程 + 线程池 / 进程池

4.https的工作原理

HTTPS 工作原理详解

HTTPS(Hypertext Transfer Protocol Secure)是 HTTP 的安全版本,通过加密和身份验证机制保护数据传输安全。其核心原理是 TLS/SSL 协议,下面从分层架构、加密过程、证书验证三个方面详细解析。

一、HTTPS 协议栈架构

HTTPS = HTTP(应用层) + TLS/SSL(传输层安全协议) + TCP(传输层)

  • HTTP:负责数据传输格式(如请求 / 响应头、JSON/HTML 内容)。
  • TLS/SSL:负责加密和身份验证。
  • TCP:负责可靠连接和数据包传输。
二、TLS 握手过程(核心加密流程)

HTTPS 通信前需通过 TLS 握手 建立安全通道,主要步骤如下:

  1. 客户端问候(Client Hello)

    • 客户端发送支持的 TLS 版本、加密算法列表(如 RSA、ECDHE)、随机数(Client Random)。
  2. 服务器响应(Server Hello)

    • 服务器选择 TLS 版本和加密算法,返回证书(含公钥)和随机数(Server Random)。
  3. 证书验证

    • 客户端验证证书有效性(签名、有效期、域名匹配等),确保服务器身份合法。
  4. 生成会话密钥

    • 客户端使用服务器证书中的公钥加密一个新随机数(Premaster Secret),发送给服务器。
    • 双方通过 Client Random、Server Random 和 Premaster Secret 生成 会话密钥(Session Key)
  5. 握手完成

    • 双方使用会话密钥进行对称加密通信,结束 TLS 握手。

简化流程图

Client                                        Server
  |                                             |
  |  Client Hello                              |
  |  (TLS版本、加密算法、Client Random)       |
  +------------------------------------------->|
  |                                             |
  |          Server Hello                     |
  |          (选择加密算法、Server Random)    |
  |          证书                               |
  |<-------------------------------------------+
  |                                             |
  |  验证证书有效性                             |
  |  生成 Premaster Secret                      |
  |  使用证书公钥加密 Premaster Secret          |
  |                                             |
  |  Client Key Exchange                       |
  |  (加密后的 Premaster Secret)               |
  +------------------------------------------->|
  |                                             |
  |  双方通过三个随机数生成会话密钥              |
  |                                             |
  |  Finished                                  |
  |  (使用会话密钥加密握手摘要)                |
  +------------------------------------------->|
  |                                             |
  |          Finished                         |
  |          (使用会话密钥加密握手摘要)        |
  |<-------------------------------------------+
  |                                             |
  |  开始使用会话密钥进行对称加密通信            |
  +                                             +
三、加密算法组合

TLS 握手使用 非对称加密(如 RSA、ECDHE) 交换会话密钥,后续通信使用 对称加密(如 AES)

  • 非对称加密
    • 优点:无需提前共享密钥,安全性高。
    • 缺点:计算开销大,速度慢。
  • 对称加密
    • 优点:速度快,适合大量数据加密。
    • 缺点:需提前共享密钥,密钥传输易被截获。

组合优势:非对称加密用于安全交换对称加密的会话密钥,后续通信使用高效的对称加密。

四、数字证书与身份验证

数字证书是 HTTPS 的核心安全机制,作用:

  1. 绑定公钥与域名:证书包含服务器公钥、域名、颁发机构等信息。
  2. 身份验证:通过证书链验证服务器身份,防止中间人攻击。
  3. 完整性保证:证书由 CA(证书颁发机构)签名,确保未被篡改。

证书验证流程

  1. 客户端检查证书是否由受信任的 CA 颁发。
  2. 验证证书有效期和域名匹配情况。
  3. 通过 CA 根证书验证证书签名的有效性。
五、HTTPS 通信示例

以下是一个简化的 HTTPS 请求 / 响应流程:

# 伪代码示例,演示 HTTPS 通信过程
import socket
from cryptography import x509
from cryptography.hazmat.backends import default_backend

# 1. 建立 TCP 连接
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(("example.com", 443))

# 2. 开始 TLS 握手
# 发送 Client Hello
client_hello = create_tls_client_hello()
sock.send(client_hello)

# 接收 Server Hello 和证书
server_response = sock.recv(4096)
certificate = extract_certificate(server_response)

# 3. 验证证书
try:
    ca_cert = load_trusted_ca_certificate()
    verify_certificate(certificate, ca_cert)
except Exception as e:
    raise ValueError("证书验证失败:", e)

# 4. 生成会话密钥并完成握手
premaster_secret = generate_premaster_secret()
encrypted_premaster = encrypt_with_certificate_public_key(premaster_secret, certificate)
sock.send(encrypted_premaster)

# 5. 使用会话密钥加密 HTTP 请求
session_key = derive_session_key(premaster_secret)
http_request = "GET / HTTP/1.1\r\nHost: example.com\r\n\r\n"
encrypted_request = encrypt_with_session_key(http_request, session_key)
sock.send(encrypted_request)

# 6. 接收并解密 HTTP 响应
encrypted_response = sock.recv(4096)
response = decrypt_with_session_key(encrypted_response, session_key)
print(response)
六、HTTPS 的安全性保障
  1. 数据加密:防止传输过程中被窃听(如 Wi-Fi 热点监听)。
  2. 身份验证:确保通信对方是真实服务器(如防止钓鱼网站)。
  3. 完整性校验:防止数据被篡改(如运营商注入广告)。
七、常见问题与优化
  1. 性能开销:TLS 握手会引入 1-2 个 RTT(往返延迟),可通过 TLS 会话复用 或 0-RTT 优化。
  2. 证书费用:可使用免费证书(如 Let's Encrypt)降低成本。
  3. 兼容性:需支持现代 TLS 版本(如 TLS 1.3),避免旧版本漏洞(如 SSLv3 POODLE 攻击)。
八、总结

HTTPS 通过 TLS 握手建立安全通道,结合非对称加密和对称加密,确保数据传输的机密性、完整性和身份真实性。理解其工作原理有助于排查网络安全问题(如证书错误)和优化性能(如 TLS 配置)。

5.redis在python中的应用

Redis 是一个高性能的键值对存储数据库,在 Python 中有广泛的应用场景。下面介绍 Redis 在 Python 中的常见应用及示例代码。

安装依赖

首先需要安装 Redis 服务器和 Python 的 Redis 客户端库:

pip install redis

基础操作示例

以下是 Redis 在 Python 中的一些基础应用示例:

import redis

# 连接到 Redis 服务器
r = redis.Redis(
    host='localhost',
    port=6379,
    db=0,
    password=None,  # 如果 Redis 有密码,需要在这里设置
    decode_responses=True  # 自动解码 Redis 返回的字节数据为字符串
)

# 1. 字符串操作
r.set('name', 'Alice')
print(r.get('name'))  # 输出: Alice

# 2. 哈希操作
r.hset('user:1', 'age', 30)
r.hset('user:1', 'city', 'New York')
print(r.hgetall('user:1'))  # 输出: {'age': '30', 'city': 'New York'}

# 3. 列表操作
r.rpush('tasks', 'task1')
r.rpush('tasks', 'task2')
print(r.lrange('tasks', 0, -1))  # 输出: ['task1', 'task2']

# 4. 集合操作
r.sadd('tags', 'python')
r.sadd('tags', 'redis')
print(r.smembers('tags'))  # 输出: {'python', 'redis'}

# 5. 有序集合操作
r.zadd('scores', {'Alice': 100, 'Bob': 85})
print(r.zrevrange('scores', 0, 1, withscores=True))  # 输出: [('Alice', 100.0), ('Bob', 85.0)]

实际应用场景

1. 缓存系统

Redis 最常见的应用是作为缓存层,减少对数据库的访问压力:

def get_data_from_cache_or_db(key):
    # 先尝试从 Redis 缓存中获取数据
    data = r.get(key)
    if data:
        print(f"从缓存中获取数据: {key}")
        return data
    
    # 如果缓存中没有,则从数据库获取
    print(f"缓存未命中,从数据库获取数据: {key}")
    data = fetch_data_from_database(key)  # 假设这是一个数据库查询函数
    
    # 将数据存入 Redis 缓存,设置过期时间为 60 秒
    r.setex(key, 60, data)
    return data
2. 计数器

利用 Redis 的原子性操作实现计数器:

# 文章阅读量计数
def increment_article_views(article_id):
    return r.incr(f'article:{article_id}:views')

# 获取文章阅读量
def get_article_views(article_id):
    return r.get(f'article:{article_id}:views')
3. 分布式锁

使用 Redis 实现分布式锁,确保在分布式系统中同一时间只有一个进程可以执行关键代码:

import time

def acquire_lock(lock_name, acquire_timeout=10, lock_timeout=10):
    end = time.time() + acquire_timeout
    while time.time() < end:
        # 使用 setnx 命令尝试获取锁
        if r.setnx(lock_name, 'locked'):
            # 设置锁的过期时间,防止死锁
            r.expire(lock_name, lock_timeout)
            return True
        # 等待一段时间后重试
        time.sleep(0.1)
    return False

def release_lock(lock_name):
    # 释放锁
    r.delete(lock_name)
4. 消息队列

使用 Redis 的列表结构实现简单的消息队列:

# 生产者:将消息放入队列
def enqueue_message(queue_name, message):
    r.rpush(queue_name, message)

# 消费者:从队列中获取消息
def dequeue_message(queue_name, timeout=0):
    # 使用 blpop 阻塞获取消息,timeout=0 表示永久阻塞
    result = r.blpop(queue_name, timeout=timeout)
    if result:
        # result 是一个元组 (队列名, 消息)
        return result[1]
    return None

连接池管理

在实际应用中,建议使用连接池来管理 Redis 连接,避免频繁创建和销毁连接:

# 创建连接池
pool = redis.ConnectionPool(
    host='localhost',
    port=6379,
    db=0,
    max_connections=100
)

# 通过连接池获取 Redis 客户端
r = redis.Redis(connection_pool=pool)

以上是 Redis 在 Python 中的一些常见应用场景和示例代码。根据具体需求,你可以进一步扩展和优化这些代码。

6.如何在Python中使用Redis进行分布式锁?

在分布式系统中,Redis 是实现分布式锁的常用工具。下面详细介绍如何在 Python 中使用 Redis 实现分布式锁,并提供完整的代码示例。

基本实现原理

分布式锁需要满足以下特性:

  • 互斥性:同一时间只有一个客户端可以持有锁
  • 安全性:锁只能被持有该锁的客户端释放
  • 容错性:在 Redis 节点故障时,锁仍能被正确释放
  • 可重入性(可选):同一客户端可以多次获取同一把锁

基础分布式锁实现

下面是一个基于 Redis 的基础分布式锁实现:

import redis
import time
import uuid

class RedisLock:
    def __init__(self, redis_client, lock_key, lock_timeout=10):
        """
        初始化分布式锁
        
        参数:
            redis_client: Redis 客户端实例
            lock_key: 锁的键名
            lock_timeout: 锁的自动释放时间(秒)
        """
        self.redis_client = redis_client
        self.lock_key = lock_key
        self.lock_timeout = lock_timeout
        self.lock_value = str(uuid.uuid4())  # 唯一标识锁的持有者

    def acquire(self, acquire_timeout=10):
        """
        获取锁
        
        参数:
            acquire_timeout: 获取锁的超时时间(秒)
            
        返回:
            bool: 是否成功获取锁
        """
        end_time = time.time() + acquire_timeout
        
        while time.time() < end_time:
            # 使用 SETNX 命令尝试获取锁
            result = self.redis_client.set(
                self.lock_key, 
                self.lock_value, 
                nx=True, 
                ex=self.lock_timeout
            )
            
            if result:
                return True
                
            # 短暂休眠,避免频繁重试
            time.sleep(0.1)
            
        return False

    def release(self):
        """
        释放锁
        
        返回:
            bool: 是否成功释放锁
        """
        # 使用 Lua 脚本确保操作的原子性
        lua_script = """
        if redis.call("GET", KEYS[1]) == ARGV[1] then
            return redis.call("DEL", KEYS[1])
        else
            return 0
        end
        """
        
        result = self.redis_client.eval(
            lua_script, 
            1,          # KEYS 数量
            self.lock_key, 
            self.lock_value
        )
        
        return bool(result)

    def __enter__(self):
        """支持 with 语句获取锁"""
        if self.acquire():
            return self
        raise RuntimeError("获取锁超时")

    def __exit__(self, exc_type, exc_val, exc_tb):
        """支持 with 语句释放锁"""
        self.release()

使用示例

下面是使用上述分布式锁的示例:

# 创建 Redis 客户端
redis_client = redis.Redis(
    host='localhost',
    port=6379,
    db=0,
    decode_responses=True
)

# 使用示例 1:普通方式
lock = RedisLock(redis_client, "distributed_lock", lock_timeout=30)

if lock.acquire(acquire_timeout=5):
    try:
        # 执行需要加锁的操作
        print("获取到锁,执行关键代码")
        time.sleep(5)  # 模拟耗时操作
    finally:
        lock.release()
        print("释放锁")
else:
    print("获取锁失败")

# 使用示例 2:with 语句方式
with RedisLock(redis_client, "distributed_lock", lock_timeout=30) as lock:
    print("获取到锁,执行关键代码")
    time.sleep(5)  # 模拟耗时操作

优化与扩展

可重入锁实现

可重入锁允许同一客户端多次获取同一把锁,需要记录获取锁的次数:

import threading

class ReentrantRedisLock(RedisLock):
    def __init__(self, redis_client, lock_key, lock_timeout=10):
        super().__init__(redis_client, lock_key, lock_timeout)
        self.local_lock = threading.Lock()
        self.acquire_count = 0

    def acquire(self, acquire_timeout=10):
        with self.local_lock:
            if self.acquire_count > 0 and self._is_held():
                self.acquire_count += 1
                return True
                
            if super().acquire(acquire_timeout):
                self.acquire_count = 1
                return True
                
            return False

    def release(self):
        with self.local_lock:
            if not self._is_held():
                return False
                
            self.acquire_count -= 1
            
            if self.acquire_count == 0:
                return super().release()
                
            return True

    def _is_held(self):
        current_value = self.redis_client.get(self.lock_key)
        return current_value == self.lock_value
红锁算法(RedLock)

当使用 Redis 集群时,可以使用红锁算法提高可靠性:

class RedLock:
    def __init__(self, redis_clients, lock_key, lock_timeout=10):
        """
        初始化红锁
        
        参数:
            redis_clients: Redis 客户端列表(多个独立节点)
            lock_key: 锁的键名
            lock_timeout: 锁的自动释放时间(秒)
        """
        self.redis_clients = redis_clients
        self.lock_key = lock_key
        self.lock_timeout = lock_timeout
        self.lock_value = str(uuid.uuid4())
        self.quorum = len(redis_clients) // 2 + 1  # 多数节点

    def acquire(self, acquire_timeout=10):
        start_time = time.time()
        end_time = start_time + acquire_timeout
        acquired_locks = []
        
        while time.time() < end_time:
            acquired_count = 0
            acquired_locks = []
            
            for client in self.redis_clients:
                try:
                    if client.set(self.lock_key, self.lock_value, nx=True, ex=self.lock_timeout):
                        acquired_count += 1
                        acquired_locks.append(client)
                except Exception:
                    pass
                    
            if acquired_count >= self.quorum:
                return True
                
            # 释放已获取的锁
            for client in acquired_locks:
                try:
                    client.delete(self.lock_key)
                except Exception:
                    pass
                    
            # 短暂休眠后重试
            time.sleep(0.05)
            
        return False

    def release(self):
        lua_script = """
        if redis.call("GET", KEYS[1]) == ARGV[1] then
            return redis.call("DEL", KEYS[1])
        else
            return 0
        end
        """
        
        for client in self.redis_clients:
            try:
                client.eval(lua_script, 1, self.lock_key, self.lock_value)
            except Exception:
                pass

注意事项

  1. 锁的超时时间:需要根据业务逻辑合理设置锁的超时时间,避免长时间持有锁导致系统性能下降
  2. 异常处理:确保在获取锁和释放锁时处理可能的异常情况
  3. 性能考虑:分布式锁会增加系统开销,需要根据实际情况权衡使用
  4. 集群环境:在 Redis 集群环境中,建议使用红锁算法提高可靠性

通过上述代码和说明,你可以在 Python 中使用 Redis 实现可靠的分布式锁机制。


网站公告

今日签到

点亮在社区的每一天
去签到