目录
- Python与Web3.py库交互实践
-
- 引言:连接Python与区块链的桥梁
- 1. 环境配置与基础连接
-
- 1.1 安装Web3.py
- 1.2 连接以太坊节点
- 2. 基础区块链交互
-
- 2.1 账户与余额查询
- 2.2 创建并发送交易
- 3. 智能合约交互
-
- 3.1 加载和部署合约
- 3.2 与已部署合约交互
- 4. 高级功能实践
-
- 4.1 事件监听
- 4.2 与ERC-20代币交互
- 5. 完整DApp示例:链上记事本
-
- 5.1 智能合约
- 5.2 Python后端
- 5.3 前端界面
- 6. 安全最佳实践
-
- 6.1 私钥管理
- 6.2 交易安全
- 7. 高级技巧与优化
-
- 7.1 批量查询
- 7.2 Gas优化策略
- 结论:成为区块链开发者
-
- 下一步:
Python与Web3.py库交互实践
引言:连接Python与区块链的桥梁
Web3.py是以太坊官方推荐的Python接口库,它使开发者能够通过Python与以太坊区块链进行交互。本指南将带你从基础到高级应用,全面掌握使用Web3.py进行区块链交互的核心技能,包括智能合约部署、交易签名、事件监听等关键功能。
1. 环境配置与基础连接
1.1 安装Web3.py
pip install web3
1.2 连接以太坊节点
from web3 import Web3
# 使用Infura连接主网
infura_url = "https://mainnet.infura.io/v3/YOUR_INFURA_PROJECT_ID"
w3 = Web3(Web3.HTTPProvider(infura_url))
# 检查连接
if w3.is_connected():
print("Connected to Ethereum Mainnet")
print(f"Latest block: {w3.eth.block_number}")
else:
print("Connection failed")
# 连接本地开发节点(如Ganache)
ganache_url = "http://127.0.0.1:7545"
w3_ganache = Web3(Web3.HTTPProvider(ganache_url))
2. 基础区块链交互
2.1 账户与余额查询
# 查询账户余额(单位:wei)
balance_wei = w3.eth.get_balance("0x742d35Cc6634C0532925a3b844Bc454e4438f44e")
# 转换单位
balance_eth = w3.from_wei(balance_wei, 'ether')
print(f"Balance: {balance_eth} ETH")
# 获取账户列表(Ganache)
ganache_accounts = w3_ganache.eth.accounts
print(f"Ganache accounts: {ganache_accounts}")
2.2 创建并发送交易
def send_eth_transaction(w3, sender, receiver, amount_eth, private_key):
"""发送ETH交易"""
# 转换金额单位
amount_wei = w3.to_wei(amount_eth, 'ether')
# 获取nonce
nonce = w3.eth.get_transaction_count(sender)
# 构建交易字典
tx = {
'nonce': nonce,
'to': receiver,
'value': amount_wei,
'gas': 21000, # 标准ETH转账的gas limit
'gasPrice': w3.to_wei('50', 'gwei'),
'chainId': w3.eth.chain_id
}
# 签名交易
signed_tx = w3.eth.account.sign_transaction(tx, private_key)
# 发送交易
tx_hash = w3.eth.send_raw_transaction(signed_tx.rawTransaction)
# 等待交易确认
receipt = w3.eth.wait_for_transaction_receipt(tx_hash)
return receipt
# 在Ganache上测试
sender = ganache_accounts[0]
receiver = ganache_accounts[1]
private_key = "0x..." # 替换为Ganache中第一个账户的私钥
receipt = send_eth_transaction(w3_ganache, sender, receiver, 1.0, private_key)
print(f"Transaction receipt: {receipt}")
3. 智能合约交互
3.1 加载和部署合约
from solcx import compile_source
# 编译合约
contract_source_code = """
// SPDX-License-Identifier: MIT
pragma solidity ^0.8.0;
contract SimpleStorage {
uint256 storedData;
function set(uint256 x) public {
storedData = x;
}
function get() public view returns (uint256) {
return storedData;
}
}
"""
# 编译合约
compiled_sol = compile_source(contract_source_code)
contract_interface = compiled_sol['<stdin>:SimpleStorage']
# 部署合约
def deploy_contract(w3, contract_interface, deployer_private_key):
# 获取部署者账户
deployer = w3.eth.account.from_key(deployer_private_key).address
# 创建合约对象
Contract = w3.eth.contract(
abi=contract_interface['abi'],
bytecode=contract_interface['bin']
)
# 构建部署交易
tx = Contract.constructor().build_transaction({
'from': deployer,
'nonce': w3.eth.get_transaction_count(deployer),
'gas': 2000000,
'gasPrice': w3.to_wei('50', 'gwei')
})
# 签名并发送
signed_tx = w3.eth.account.sign_transaction(tx, deployer_private_key)
tx_hash = w3.eth.send_raw_transaction(signed_tx.rawTransaction)
# 等待部署完成
receipt = w3.eth.wait_for_transaction_receipt(tx_hash)
return receipt.contractAddress
# 在Ganache上部署
contract_address = deploy_contract(w3_ganache, contract_interface, private_key)
print(f"Contract deployed at: {contract_address}")
3.2 与已部署合约交互
# 创建合约实例
contract = w3_ganache.eth.contract(
address=contract_address,
abi=contract_interface['abi']
)
# 调用只读函数
current_value = contract.functions.get().call()
print(f"Current stored value: {current_value}")
# 调用状态变更函数
def set_contract_value(w3, contract, new_value, caller_private_key):
caller = w3.eth.account.from_key(caller_private_key).address
# 构建交易
tx = contract.functions.set(new_value).build_transaction({
'from': caller,
'nonce': w3.eth.get_transaction_count(caller),
'gas': 2000000,
'gasPrice': w3.to_wei('50', 'gwei')
})
# 签名并发送
signed_tx = w3.eth.account.sign_transaction(tx, caller_private_key)
tx_hash = w3.eth.send_raw_transaction(signed_tx.rawTransaction)
# 等待交易确认
receipt = w3.eth.wait_for_transaction_receipt(tx_hash)
return receipt
# 更新合约状态
receipt = set_contract_value(w3_ganache, contract, 42, private_key)
print(f"Set value transaction receipt: {receipt}")
# 验证更新
updated_value = contract.functions.get().call()
print(f"Updated stored value: {updated_value}")
4. 高级功能实践
4.1 事件监听
# 增强合约以包含事件
contract_with_event = """
// SPDX-License-Identifier: MIT
pragma solidity ^0.8.0;
contract EventfulStorage {
uint256 storedData;
event ValueChanged(address indexed author, uint256 newValue);
function set(uint256 x) public {
storedData = x;
emit ValueChanged(msg.sender, x);
}
function get() public view returns (uint256) {
return storedData;
}
}
"""
# 部署增强版合约
compiled_event = compile_source(contract_with_event)
event_contract_interface = compiled_event['<stdin>:EventfulStorage']
event_contract_address = deploy_contract(w3_ganache, event_contract_interface, private_key)
event_contract = w3_ganache.eth.contract(
address=event_contract_address,
abi=event_contract_interface['abi']
)
# 创建事件过滤器
value_changed_filter = event_contract.events.ValueChanged.create_filter(
fromBlock='latest'
)
# 监听事件
def event_listener():
print("Listening for ValueChanged events...")
while True:
for event in value_changed_filter.get_new_entries():
print(f"New event: Author={event.args.author}, Value={event.args.newValue}")
time.sleep(2)
# 在后台线程运行监听器
import threading
thread = threading.Thread(target=event_listener, daemon=True)
thread.start()
# 触发事件(在另一个线程)
def trigger_events():
for i in range(5):
set_contract_value(w3_ganache, event_contract, i, private_key)
time.sleep(3)
trigger_thread = threading.Thread(target=trigger_events)
trigger_thread.start()
4.2 与ERC-20代币交互
# ERC-20代币ABI(简化版)
erc20_abi = [
{
"name": "balanceOf",
"outputs": [{"type": "uint256", "name": ""}],
"inputs": [{"type": "address", "name": "account"}],
"stateMutability": "view",
"type": "function"
},
{
"name": "transfer",
"outputs": [{"type": "bool", "name": ""}],
"inputs": [
{"type": "address", "name": "recipient"},
{"type": "uint256", "name": "amount"}
],
"stateMutability": "nonpayable",
"type": "function"
}
]
# 连接DAI合约
dai_address = "0x6B175474E89094C44Da98b954EedeAC495271d0F" # 主网DAI合约
dai_contract = w3.eth.contract(address=dai_address, abi=erc20_abi)
# 查询余额
wallet_address = "0x742d35Cc6634C0532925a3b844Bc454e4438f44e"
balance = dai_contract.functions.balanceOf(wallet_address).call()
print(f"DAI balance: {balance / (10 ** 18)} DAI")
# 转账函数(需要私钥)
def transfer_erc20(w3, contract, recipient, amount, sender_private_key):
sender = w3.eth.account.from_key(sender_private_key).address
# 构建交易
tx = contract.functions.transfer(
recipient,
amount
).build_transaction({
'from': sender,
'nonce': w3.eth.get_transaction_count(sender),
'gas': 100000,
'gasPrice': w3.to_wei('50', 'gwei'),
'chainId': w3.eth.chain_id
})
# 签名并发送
signed_tx = w3.eth.account.sign_transaction(tx, sender_private_key)
tx_hash = w3.eth.send_raw_transaction(signed_tx.rawTransaction)
# 等待确认
receipt = w3.eth.wait_for_transaction_receipt(tx_hash)
return receipt
# 注意:实际主网操作需要真实资金和谨慎处理私钥
5. 完整DApp示例:链上记事本
5.1 智能合约
// SPDX-License-Identifier: MIT
pragma solidity ^0.8.0;
contract OnchainNotepad {
struct Note {
string content;
uint256 timestamp;
}
mapping(address => Note[]) private userNotes;
event NoteAdded(address indexed user, uint256 index, string content);
event NoteUpdated(address indexed user, uint256 index, string content);
function addNote(string memory content) public {
userNotes[msg.sender].push(Note(content, block.timestamp));
emit NoteAdded(msg.sender, userNotes[msg.sender].length - 1, content);
}
function updateNote(uint256 index, string memory content) public {
require(index < userNotes[msg.sender].length, "Note does not exist");
userNotes[msg.sender][index].content = content;
userNotes[msg.sender][index].timestamp = block.timestamp;
emit NoteUpdated(msg.sender, index, content);
}
function getNoteCount() public view returns (uint256) {
return userNotes[msg.sender].length;
}
function getNote(uint256 index) public view returns (string memory content, uint256 timestamp) {
require(index < userNotes[msg.sender].length, "Note does not exist");
Note storage note = userNotes[msg.sender][index];
return (note.content, note.timestamp);
}
}
5.2 Python后端
from flask import Flask, jsonify, request
from web3 import Web3
import json
app = Flask(__name__)
w3 = Web3(Web3.HTTPProvider("http://127.0.0.1:7545"))
# 加载合约
with open('OnchainNotepad.json') as f:
contract_data = json.load(f)
contract_address = "0xYourDeployedContractAddress"
contract = w3.eth.contract(address=contract_address, abi=contract_data['abi'])
@app.route('/notes', methods=['GET'])
def get_notes():
address = request.args.get('address')
if not address or not w3.is_address(address):
return jsonify({"error": "Invalid address"}), 400
note_count = contract.functions.getNoteCount().call({'from': address})
notes = []
for i in range(note_count):
content, timestamp = contract.functions.getNote(i).call({'from': address})
notes.append({
"index": i,
"content": content,
"timestamp": timestamp
})
return jsonify(notes)
@app.route('/add', methods=['POST'])
def add_note():
data = request.json
address = data.get('address')
content = data.get('content')
private_key = data.get('private_key')
if not all([address, content, private_key]):
return jsonify({"error": "Missing parameters"}), 400
# 构建交易
tx = contract.functions.addNote(content).build_transaction({
'from': address,
'nonce': w3.eth.get_transaction_count(address),
'gas': 300000,
'gasPrice': w3.to_wei('50', 'gwei')
})
# 签名并发送
signed_tx = w3.eth.account.sign_transaction(tx, private_key)
tx_hash = w3.eth.send_raw_transaction(signed_tx.rawTransaction)
return jsonify({"tx_hash": tx_hash.hex()})
if __name__ == '__main__':
app.run(port=5000)
5.3 前端界面
import tkinter as tk
from web3 import Web3
import requests
class NotepadApp:
def __init__(self, root):
self.root = root
self.root.title("链上记事本")
# 配置
self.w3 = Web3(Web3.HTTPProvider("http://127.0.0.1:7545"))
self.api_url = "http://localhost:5000"
# UI组件
self.address_label = tk.Label(root, text="你的地址:")
self.address_entry = tk.Entry(root, width=50)
self.private_key_label = tk.Label(root, text="私钥(仅本地):")
self.private_key_entry = tk.Entry(root, width=50, show="*")
self.note_label = tk.Label(root, text="笔记内容:")
self.note_text = tk.Text(root, height=10, width=50)
self.add_button = tk.Button(root, text="添加笔记", command=self.add_note)
self.load_button = tk.Button(root, text="加载笔记", command=self.load_notes)
self.notes_listbox = tk.Listbox(root, width=70, height=15)
self.notes_listbox.bind('<<ListboxSelect>>', self.on_note_select)
# 布局
self.address_label.grid(row=0, column=0, sticky="w")
self.address_entry.grid(row=0, column=1)
self.private_key_label.grid(row=1, column=0, sticky="w")
self.private_key_entry.grid(row=1, column=1)
self.note_label.grid(row=2, column=0, sticky="nw")
self.note_text.grid(row=2, column=1)
self.add_button.grid(row=3, column=1, sticky="e", pady=5)
self.load_button.grid(row=4, column=1, sticky="e", pady=5)
self.notes_listbox.grid(row=5, column=0, columnspan=2, padx=10, pady=10)
# 状态变量
self.current_notes = []
self.selected_index = None
def add_note(self):
address = self.address_entry.get()
private_key = self.private_key_entry.get()
content = self.note_text.get("1.0", "end-1c")
if not address or not private_key or not content:
tk.messagebox.showerror("错误", "请填写所有字段")
return
payload = {
"address": address,
"content": content,
"private_key": private_key
}
try:
response = requests.post(f"{self.api_url}/add", json=payload)
if response.status_code == 200:
tk.messagebox.showinfo("成功", f"笔记已添加!\n交易哈希: {response.json()['tx_hash']}")
self.note_text.delete("1.0", tk.END)
else:
tk.messagebox.showerror("错误", f"添加失败: {response.text}")
except Exception as e:
tk.messagebox.showerror("错误", f"连接失败: {str(e)}")
def load_notes(self):
address = self.address_entry.get()
if not address:
tk.messagebox.showerror("错误", "请输入地址")
return
try:
response = requests.get(f"{self.api_url}/notes?address={address}")
if response.status_code == 200:
self.current_notes = response.json()
self.notes_listbox.delete(0, tk.END)
for i, note in enumerate(self.current_notes):
preview = note['content'][:50] + "..." if len(note['content']) > 50 else note['content']
timestamp = note['timestamp']
self.notes_listbox.insert(tk.END, f"[{i}] {preview} (时间戳: {timestamp})")
else:
tk.messagebox.showerror("错误", f"加载失败: {response.text}")
except Exception as e:
tk.messagebox.showerror("错误", f"连接失败: {str(e)}")
def on_note_select(self, event):
selection = event.widget.curselection()
if selection:
self.selected_index = selection[0]
note = self.current_notes[self.selected_index]
self.note_text.delete("1.0", tk.END)
self.note_text.insert(tk.END, note['content'])
if __name__ == "__main__":
root = tk.Tk()
app = NotepadApp(root)
root.mainloop()
6. 安全最佳实践
6.1 私钥管理
import os
from dotenv import load_dotenv
from web3 import Web3
# 从环境变量加载私钥
load_dotenv()
private_key = os.getenv("PRIVATE_KEY")
# 加密存储方案
from cryptography.fernet import Fernet
def encrypt_private_key(key, private_key):
cipher_suite = Fernet(key)
return cipher_suite.encrypt(private_key.encode())
def decrypt_private_key(key, encrypted_key):
cipher_suite = Fernet(key)
return cipher_suite.decrypt(encrypted_key).decode()
# 生成加密密钥
encryption_key = Fernet.generate_key()
# 加密私钥
encrypted_pk = encrypt_private_key(encryption_key, "0xYourPrivateKey")
# 安全存储encryption_key和encrypted_pk
6.2 交易安全
# 安全发送交易
def safe_send_transaction(w3, tx, private_key, retries=3):
for attempt in range(retries):
try:
# 更新nonce(每次尝试都需要更新)
tx['nonce'] = w3.eth.get_transaction_count(
w3.eth.account.from_key(private_key).address
)
signed_tx = w3.eth.account.sign_transaction(tx, private_key)
tx_hash = w3.eth.send_raw_transaction(signed_tx.rawTransaction)
return w3.eth.wait_for_transaction_receipt(tx_hash)
except ValueError as e:
if 'nonce too low' in str(e) and attempt < retries - 1:
continue # 重试
elif 'insufficient funds' in str(e):
raise InsufficientFundsError("账户余额不足")
else:
raise TransactionError(f"交易失败: {str(e)}")
class TransactionError(Exception):
pass
class InsufficientFundsError(TransactionError):
pass
7. 高级技巧与优化
7.1 批量查询
# 批量查询余额
def batch_get_balances(w3, addresses):
"""使用批处理RPC请求优化多个余额查询"""
from web3 import Web3
from web3.middleware import geth_poa_middleware
# 创建本地提供者避免阻塞主连接
local_w3 = Web3(Web3.HTTPProvider(w3.provider.endpoint_uri))
if 'poa' in w3.provider.endpoint_uri:
local_w3.middleware_onion.inject(geth_poa_middleware, layer=0)
# 使用批处理请求
batch = []
for address in addresses:
batch.append(("eth_getBalance", [address, "latest"]))
responses = local_w3.provider.make_batch_request(batch)
# 处理响应
balances = {}
for i, response in enumerate(responses):
if response['error'] is not None:
balances[addresses[i]] = None
else:
balances[addresses[i]] = int(response['result'], 16)
return balances
7.2 Gas优化策略
# 动态Gas定价
def get_optimal_gas_price(w3):
"""获取推荐的Gas价格"""
# 方法1:使用历史数据
history = w3.eth.fee_history(10, 'latest', [10, 30, 50])
base_fee = history['baseFeePerGas'][-1]
# 计算优先级费用(小费)的加权平均
rewards = [tx[0] for block in history['reward'] for tx in block]
if rewards:
avg_priority = sum(rewards) / len(rewards)
else:
avg_priority = w3.to_wei('2', 'gwei')
# 方法2:使用预言机(如Etherchain)
try:
response = requests.get("https://www.etherchain.org/api/gasPriceOracle")
data = response.json()
fast_gas = w3.to_wei(data['fast'], 'gwei')
return min(fast_gas, base_fee * 2 + avg_priority)
except:
# 回退到基本方法
return base_fee * 2 + avg_priority
# 在交易中使用
tx = {
'to': recipient,
'value': amount,
'gas': 21000,
'maxFeePerGas': get_optimal_gas_price(w3),
'maxPriorityFeePerGas': w3.to_wei('2', 'gwei'),
'nonce': w3.eth.get_transaction_count(sender),
'chainId': w3.eth.chain_id,
'type': '0x2' # EIP-1559类型交易
}
结论:成为区块链开发者
通过本指南,您已掌握Web3.py的核心功能:
- 基础操作:连接节点、查询余额、发送交易
- 智能合约:编译、部署和交互
- 事件处理:监听和响应链上事件
- DApp开发:完整的前后端集成
- 安全实践:私钥管理和交易安全
- 高级技巧:批量查询和Gas优化
Web3.py的强大功能为开发者打开了区块链世界的大门:
- DeFi协议交互:集成Uniswap、Compound等协议
- NFT开发:创建和交易数字藏品
- DAO治理:参与去中心化组织决策
- 跨链应用:桥接不同区块链生态系统
在区块链的世界里,每一行代码都是构建去中心化未来的基石。掌握Web3.py,你就拥有了塑造这个未来的工具。
下一步:
- 探索Layer2解决方案(Optimism、Arbitrum)
- 学习智能合约安全审计
- 研究零知识证明应用
- 参与开源Web3项目
- 构建自己的去中心化应用