背景:
conda安装pytorch deepctr-torch存在冲突,无法运行起来.
下面是成功的方案,vscode中终端执行即可.
创建虚拟环境
conda create -n py311 python=3.11 -y && conda activate py311
仓库根目录执行:
bash scripts/setup_deepfm_venv.sh
脚本内容:
// 脚本作用: 目的: 用系统已装的 Python 3.11(或 3.10)创建一个隔离的 venv,安装与之兼容的 PyTorch 和 DeepFM 示例依赖,然后直接运行示例脚本。
#!/usr/bin/env bash
set -euo pipefail
# Prefer Python 3.11 (best compatibility); fallback to 3.10.
PY=""
if command -v python3.11 >/dev/null 2>&1; then
PY=python3.11
elif command -v python3.10 >/dev/null 2>&1; then
PY=python3.10
else
echo "No python3.11 or python3.10 found.\n"
echo "Please install Python 3.11 (e.g. via pyenv or conda) or run:\n conda create -n deepfm python=3.11 -y && conda activate deepfm" >&2
exit 1
fi
VENV_DIR=.venv-${PY#python}
${PY} -m venv "${VENV_DIR}"
source "${VENV_DIR}/bin/activate"
python -m pip install -U pip
# Install PyTorch compatible with Python 3.10/3.11
OS=$(uname -s || echo Unknown)
if [ "$OS" = "Darwin" ]; then
# macOS: install from PyPI (universal CPU build)
python -m pip install "torch==2.2.2"
else
# Linux/Windows: use CPU wheels index
python -m pip install --index-url https://download.pytorch.org/whl/cpu "torch==2.2.2"
fi
# Install Python deps for DeepFM demo
python -m pip install -r requirements-deepfm.txt
# Run the demo
python test/DeepFMUse1.py
DeepFMUse1.py 测试脚本内容:
//推荐系统demo
# deepfm_demo.py
import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
import torch
from deepctr_torch.inputs import SparseFeat, DenseFeat, VarLenSparseFeat, get_feature_names
from deepctr_torch.models import DeepFM
# =============== 1) 准备一份示例数据(用户 / 商品 / 上下文 / label) ===============
np.random.seed(42)
n_users = 10
n_items = 20
n_samples = 200
# n_users = 1000
# n_items = 2000
# n_samples = 20000
df = pd.DataFrame({
# 用户侧
"user_id": np.random.randint(0, n_users, size=n_samples).astype(str),
"gender": np.random.choice(["M","F"], size=n_samples),
"age": np.random.randint(18, 60, size=n_samples), # Dense
"occupation": np.random.choice(["student","athlete","engineer","teacher"], size=n_samples),
# 商品侧
"item_id": np.random.randint(0, n_items, size=n_samples).astype(str),
"category": np.random.choice(["sports","electronics","fashion","beauty"], size=n_samples),
"price": np.random.uniform(5, 500, size=n_samples), # Dense
# 上下文侧
"hour": np.random.randint(0, 24, size=n_samples).astype(str),
"device": np.random.choice(["ios","android","web"], size=n_samples),
# Label(是否点击/购买)
"label": np.random.binomial(1, 0.2, size=n_samples)
})
# (可选)构造“历史行为序列”作为变长特征示例(这里随机构造;真实业务应从日志汇总)
maxlen = 10
def make_hist_row():
length = np.random.randint(0, maxlen+1)
seq = np.random.randint(0, n_items, size=length).tolist()
return seq, length
hist = [make_hist_row() for _ in range(n_samples)]
df["hist_item_id"] = [row[0] for row in hist]
df["hist_len"] = [row[1] for row in hist]
# =============== 2) 定义特征列(哪些是稀疏/稠密/序列) ===============
sparse_cols_user = ["user_id", "gender", "occupation"]
sparse_cols_item = ["item_id", "category"]
sparse_cols_ctx = ["hour", "device"]
sparse_features = sparse_cols_user + sparse_cols_item + sparse_cols_ctx
dense_features = ["age", "price"] # 数值型
# LabelEncode 所有稀疏列;并把历史序列也整数化(我们这里 item_id 已是字符串,先统一到同一个空间)
for col in sparse_features:
le = LabelEncoder()
df[col] = le.fit_transform(df[col].astype(str))
# 历史序列:将元素转成与 item_id 同一编码空间(此处 item_id 本身已是数值化后的索引)
# 我们假设 hist_item_id 里存放的就是 item_id 的“原始索引”(真实业务要确保映射一致)
# 为了安全,这里把超界的截断到合法范围
vocab_size_item = df["item_id"].max() + 1
df["hist_item_id"] = df["hist_item_id"].apply(lambda seq: [int(x) % int(vocab_size_item) for x in seq])
# deepctr-torch 要求序列是等长的二维输入,padding 用 0;并提供真实长度 hist_len
def pad_seq(seq, maxlen):
seq = (seq[:maxlen] + [0]*max(0, maxlen-len(seq)))
return seq
df["hist_item_padded"] = df["hist_item_id"].apply(lambda s: pad_seq(s, maxlen))
# =============== 3) 组建 DeepFM 的特征列描述 ===============
# 稀疏特征(Embedding);建议 item_id 与 hist_item_id 共享 embedding_name="item"
fixlen_sparse_feat = [
# 用户侧
SparseFeat("user_id", vocabulary_size=df["user_id"].max()+1, embedding_dim=16),
SparseFeat("gender", vocabulary_size=df["gender"].max()+1, embedding_dim=16),
SparseFeat("occupation", vocabulary_size=df["occupation"].max()+1, embedding_dim=16),
# 商品侧(共享 embedding:item)
SparseFeat("item_id", vocabulary_size=vocab_size_item, embedding_dim=16, embedding_name="item"),
SparseFeat("category", vocabulary_size=df["category"].max()+1, embedding_dim=16),
# 上下文侧
SparseFeat("hour", vocabulary_size=df["hour"].max()+1, embedding_dim=16),
SparseFeat("device", vocabulary_size=df["device"].max()+1, embedding_dim=16),
]
# 数值特征
fixlen_dense_feat = [DenseFeat(feat, 1) for feat in dense_features]
# 变长序列特征(历史 item_id 列表),与 item_id 共享 embedding
seq_feat = VarLenSparseFeat(
SparseFeat("hist_item_id", vocabulary_size=vocab_size_item, embedding_dim=16, embedding_name="item"),
maxlen=maxlen, combiner="mean", length_name="hist_len"
)
dnn_feature_columns = fixlen_sparse_feat + fixlen_dense_feat + [seq_feat]
linear_feature_columns = fixlen_sparse_feat + fixlen_dense_feat # 线性部分不包含序列特征
feature_names = get_feature_names(dnn_feature_columns + linear_feature_columns)
# =============== 4) 组织模型输入 ===============
# 序列 padding 列拆成多列或直接传 numpy 二维数组
model_input = {name: df[name].values for name in feature_names if name in df.columns}
# 手动补齐序列输入(deepctr-torch 允许直接用二维 array 绑定到名字)
model_input["hist_item_id"] = np.vstack(df["hist_item_padded"].values) # (N, maxlen)
model_input["hist_len"] = df["hist_len"].values
# =============== 4.1) 规范化 dtype 以匹配 deepctr-torch 期望 ===============
# 稀疏特征 -> int64,稠密特征 -> float32,序列索引 -> int64,序列长度 -> int32
for col in sparse_features:
if col in model_input:
model_input[col] = model_input[col].astype('int64')
for col in dense_features:
if col in model_input:
model_input[col] = model_input[col].astype('float32')
if "hist_item_id" in model_input:
model_input["hist_item_id"] = model_input["hist_item_id"].astype('int64')
if "hist_len" in model_input:
model_input["hist_len"] = model_input["hist_len"].astype('int32')
labels = df["label"].values
# deepctr-torch 需要把字典拆开
def split_dict(d, idx):
out = {}
for k, v in d.items():
out[k] = v[idx]
return out
idx_all = np.arange(len(labels))
idx_tr, idx_va = train_test_split(idx_all, test_size=0.2, random_state=2024, stratify=labels)
train_input = split_dict(model_input, idx_tr)
val_input = split_dict(model_input, idx_va)
train_y = labels[idx_tr]
val_y = labels[idx_va]
# =============== 5) 定义与训练 DeepFM(CTR:task='binary') ===============
device = "cuda" if torch.cuda.is_available() else "cpu"
model = DeepFM(
linear_feature_columns=linear_feature_columns,
dnn_feature_columns=dnn_feature_columns,
task='binary',
l2_reg_embedding=1e-6,
dnn_hidden_units=(256,128,64),
dnn_dropout=0.2,
device=device
)
model.compile(optimizer="adam", loss="binary_crossentropy", metrics=['auc'])
model.fit(
train_input, train_y,
batch_size=1024,
epochs=3,
verbose=2,
validation_data=(val_input, val_y)
)
# =============== 6) 在线/离线推理:给某个用户对候选商品打分 & Top-K 推荐 ===============
def recommend_for_user(user_row, candidate_items, context_row, topk=10):
"""
user_row: 一条用户特征记录(Series),至少包含:user_id, gender, occupation, age
candidate_items: DataFrame,包含 item_id, category, price
context_row: 一条上下文特征记录(Series),包含 hour, device
"""
n = len(candidate_items)
# 将上下文特征编码为与训练一致的索引(LabelEncoder 对字符串按字典序编码)
hour_le = LabelEncoder().fit([str(i) for i in range(24)])
device_le = LabelEncoder().fit(["android","ios","web"])
hour_idx = int(hour_le.transform([str(context_row["hour"])])[0])
device_idx = int(device_le.transform([str(context_row["device"])])[0])
# 构造批量输入
pred_df = pd.DataFrame({
# 用户侧(重复成与候选等长)
"user_id": np.repeat(user_row["user_id"], n),
"gender": np.repeat(user_row["gender"], n),
"occupation":np.repeat(user_row["occupation"], n),
"age": np.repeat(user_row["age"], n),
# 商品侧
"item_id": candidate_items["item_id"].values,
"category": candidate_items["category"].values,
"price": candidate_items["price"].values,
# 上下文
"hour": np.repeat(hour_idx, n),
"device": np.repeat(device_idx, n),
# 序列(这里示例直接用该用户的历史;实际线上应从特征库读)
"hist_len": np.repeat(user_row["hist_len"], n),
})
# 序列 padding(把用户的历史复用到每个候选)
hist_padded = np.array(user_row["hist_item_padded"])[None, :].repeat(n, axis=0)
# 组装 deepctr 输入
pred_input = {name: pred_df[name].values for name in feature_names if name in pred_df.columns}
# 规范化 dtype,避免 object 数组
for col in sparse_features:
if col in pred_input:
pred_input[col] = pred_input[col].astype('int64')
for col in dense_features:
if col in pred_input:
pred_input[col] = pred_input[col].astype('float32')
pred_input["hist_item_id"] = hist_padded.astype('int64')
if "hist_len" in pred_input:
pred_input["hist_len"] = pred_input["hist_len"].astype('int32')
# 预测 CTR 概率
preds = model.predict(pred_input, batch_size=2048).reshape(-1)
# 返回排序结果
out = candidate_items.copy()
out["ctr_pred"] = preds
out = out.sort_values("ctr_pred", ascending=False).head(topk).reset_index(drop=True)
return out
# —— 构造一个示例用户 & 候选集 & 上下文,做推荐 ——
u_idx = 0
user_row = df.loc[u_idx, ["user_id","gender","occupation","age","hist_len","hist_item_padded"]]
# 候选:取 50 个商品(真实业务应来自召回)
cand_idx = np.random.choice(df.index, size=50, replace=False)
candidate_items = df.loc[cand_idx, ["item_id","category","price"]].drop_duplicates("item_id").head(50).reset_index(drop=True)
# 上下文(比如现在是 20 点,设备是 ios)
context_row = pd.Series({"hour": str(20), "device": "ios"})
topk_rec = recommend_for_user(user_row, candidate_items, context_row, topk=10)
print(topk_rec)
最终输出:
cpu
Train on 160 samples, validate on 40 samples, 1 steps per epoch
Epoch 1/3
0s - loss: 0.6866 - auc: 0.5416 - val_auc: 0.5980
Epoch 2/3
0s - loss: 0.5992 - auc: 0.4456 - val_auc: 0.5980
Epoch 3/3
0s - loss: 0.5394 - auc: 0.4465 - val_auc: 0.5980
item_id category price ctr_pred
0 7 0 55.975832 0.413617
1 9 1 113.948121 0.372060
2 4 0 117.538737 0.369552
3 6 1 165.375080 0.336808
4 10 1 201.672467 0.312906
5 16 2 213.743448 0.305202
6 12 2 278.102685 0.265882
7 0 3 280.577184 0.264432
8 19 1 288.780717 0.259607
9 5 2 303.582714 0.251277