在当今内容爆炸的时代,个性化推荐系统已成为短视频平台的核心竞争力之一。本文将详细介绍如何利用Elasticsearch(ES)构建一个高效、可扩展的短视频个性化推荐系统。
一、系统架构概述
我们的推荐系统将采用混合推荐策略,结合协同过滤、内容相似度和热度推荐等多种方法。Elasticsearch作为核心搜索引擎和数据存储,将承担以下职责:
- 用户画像存储与查询
- 视频内容索引与检索
- 实时行为日志分析
- 推荐结果计算与排序
二、数据模型设计
1. 用户数据模型
{
"mappings": {
"properties": {
"user_id": {"type": "keyword"},
"age": {"type": "integer"},
"gender": {"type": "keyword"},
"location": {"type": "geo_point"},
"interests": {"type": "keyword"},
"watch_history": {
"type": "nested",
"properties": {
"video_id": {"type": "keyword"},
"watch_time": {"type": "date"},
"duration": {"type": "float"},
"interaction": {
"type": "nested",
"properties": {
"type": {"type": "keyword"}, // like, share, comment, etc.
"timestamp": {"type": "date"}
}
}
}
},
"followers": {"type": "keyword"},
"following": {"type": "keyword"},
"created_at": {"type": "date"}
}
}
}
2. 视频数据模型
{
"mappings": {
"properties": {
"video_id": {"type": "keyword"},
"title": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
"description": {
"type": "text",
"analyzer": "ik_max_word"
},
"tags": {"type": "keyword"},
"category": {"type": "keyword"},
"creator_id": {"type": "keyword"},
"duration": {"type": "integer"},
"created_at": {"type": "date"},
"location": {"type": "geo_point"},
"stats": {
"properties": {
"views": {"type": "integer"},
"likes": {"type": "integer"},
"shares": {"type": "integer"},
"comments": {"type": "integer"},
"watch_time_avg": {"type": "float"}
}
},
"embedding": {
"type": "dense_vector",
"dims": 512
}
}
}
}
三、核心推荐算法实现
1. 基于用户画像的内容推荐
from elasticsearch import Elasticsearch
from datetime import datetime, timedelta
es = Elasticsearch(["localhost:9200"])
def get_content_based_recommendations(user_id, size=10):
# 获取用户画像
user_profile = es.get(index="user_profiles", id=user_id)['_source']
# 构建查询
query = {
"bool": {
"should": [
{"terms": {"tags": user_profile.get("interests", [])}},
{"term": {"category": user_profile.get("primary_interest")}},
{"geo_distance": {
"distance": "100km",
"location": user_profile.get("location")
}}
],
"must_not": [
{"terms": {
"video_id": [h['video_id'] for h in user_profile.get('watch_history', [])]
}}
]
}
}
# 添加时间衰减因子 - 优先推荐新内容
recency_script = {
"script_score": {
"script": {
"source": """
double decay = 0.5;
double scale = 7;
double offset = 0;
double decayValue = decay * Math.exp(-Math.max(
doc['created_at'].value.toInstant().toEpochMilli() - params.now, 0) / scale);
return decayValue + _score;
""",
"params": {
"now": datetime.now().timestamp() * 1000
}
}
}
}
response = es.search(
index="videos",
body={
"query": {
"function_score": {
"query": query,
"functions": [recency_script],
"score_mode": "sum"
}
},
"size": size
}
)
return [hit['_source'] for hit in response['hits']['hits']]
2. 基于协同过滤的相似用户推荐
def find_similar_users(user_id, size=5):
# 获取目标用户观看历史
target_user = es.get(index="user_profiles", id=user_id)['_source']
target_videos = {h['video_id'] for h in target_user.get('watch_history', [])}
# 查找观看过相同视频的用户
query = {
"bool": {
"must": [
{"nested": {
"path": "watch_history",
"query": {
"terms": {
"watch_history.video_id": list(target_videos)[:100] # 限制数量防止查询过大
}
}
}},
{"range": {
"watch_history.watch_time": {
"gte": "now-30d/d"
}
}}
],
"must_not": [
{"term": {"user_id": user_id}}
]
}
}
# 使用脚本评分计算相似度
script = {
"script_score": {
"script": {
"source": """
double score = 0;
for (def item : params.target_videos) {
for (def wh : doc['watch_history']) {
if (wh.video_id == item) {
score += 1;
break;
}
}
}
return score;
""",
"params": {
"target_videos": list(target_videos)
}
}
}
}
response = es.search(
index="user_profiles",
body={
"query": {
"function_score": {
"query": query,
"functions": [script],
"score_mode": "sum"
}
},
"size": size
}
)
return [hit['_source']['user_id'] for hit in response['hits']['hits']]
def get_collaborative_recommendations(user_id, size=10):
similar_users = find_similar_users(user_id)
# 获取相似用户观看但目标用户未观看的视频
query = {
"bool": {
"must": [
{"terms": {"creator_id": similar_users}},
{"nested": {
"path": "watch_history",
"query": {
"terms": {
"watch_history.user_id": similar_users
}
}
}}
],
"must_not": [
{"term": {"watch_history.user_id": user_id}}
]
}
}
# 根据观看次数和互动率排序
response = es.search(
index="videos",
body={
"query": query,
"sort": [
{"stats.likes": {"order": "desc"}},
{"stats.watch_time_avg": {"order": "desc"}}
],
"size": size
}
)
return [hit['_source'] for hit in response['hits']['hits']]
3. 基于向量相似度的深度推荐
import numpy as np
from sentence_transformers import SentenceTransformer
# 初始化模型
model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
def get_video_embeddings(video_ids):
# 从ES获取视频文本内容
response = es.mget(
index="videos",
body={"ids": video_ids}
)
videos = [doc['_source'] for doc in response['docs'] if doc['found']]
texts = [
f"{v['title']} {v['description']} {' '.join(v.get('tags', []))}"
for v in videos
]
# 生成嵌入向量
embeddings = model.encode(texts, convert_to_tensor=False)
# 更新ES中的视频嵌入
for i, vid in enumerate(video_ids):
es.update(
index="videos",
id=vid,
body={"doc": {"embedding": embeddings[i].tolist()}}
)
return dict(zip(video_ids, embeddings))
def get_semantic_recommendations(user_id, size=10):
# 获取用户最近观看的视频
user = es.get(index="user_profiles", id=user_id)['_source']
recent_watched = [
h for h in sorted(
user.get('watch_history', []),
key=lambda x: x.get('watch_time', 0),
reverse=True
)[:5]
]
if not recent_watched:
return []
# 获取这些视频的嵌入向量
video_ids = [h['video_id'] for h in recent_watched]
video_embeddings = get_video_embeddings(video_ids)
# 计算平均用户兴趣向量
user_vector = np.mean([video_embeddings[vid] for vid in video_ids], axis=0)
# 在ES中搜索相似视频
script_query = {
"script_score": {
"query": {"match_all": {}},
"script": {
"source": """
double similarity = cosineSimilarity(params.user_vector, 'embedding');
return similarity;
""",
"params": {
"user_vector": user_vector.tolist()
}
}
}
}
response = es.search(
index="videos",
body={
"query": script_query,
"size": size,
"_source": ["video_id", "title", "description", "tags"]
}
)
return [hit['_source'] for hit in response['hits']['hits']]
四、混合推荐策略
def hybrid_recommendation(user_id, size=20):
# 获取各种推荐结果
content_based = get_content_based_recommendations(user_id, size//4)
collaborative = get_collaborative_recommendations(user_id, size//4)
semantic = get_semantic_recommendations(user_id, size//4)
# 获取热门推荐作为补充
popular = get_popular_videos(size//4)
# 合并结果并去重
all_recs = {}
for rec_list in [content_based, collaborative, semantic, popular]:
for rec in rec_list:
vid = rec['video_id']
if vid not in all_recs:
all_recs[vid] = rec
# 个性化排序
ranked = personalize_ranking(user_id, list(all_recs.values()))
return ranked[:size]
def personalize_ranking(user_id, recommendations):
user = es.get(index="user_profiles", id=user_id)['_source']
# 为每个推荐项计算个性化分数
for rec in recommendations:
score = 0
# 内容匹配分数
content_score = 0
if 'interests' in user and 'tags' in rec:
common_tags = set(user['interests']) & set(rec['tags'])
content_score = len(common_tags) * 0.2
# 创作者关注分数
creator_score = 1 if rec['creator_id'] in user.get('following', []) else 0
# 热度分数
popularity_score = min(rec['stats']['likes'] / 1000, 5)
# 时间衰减
recency = (datetime.now() - datetime.fromisoformat(rec['created_at'])).days
recency_score = max(0, 1 - recency / 30)
# 综合分数
rec['personal_score'] = (
0.4 * content_score +
0.3 * creator_score +
0.2 * popularity_score +
0.1 * recency_score
)
# 按分数排序
return sorted(recommendations, key=lambda x: x['personal_score'], reverse=True)
def get_popular_videos(size=5, time_range="7d"):
response = es.search(
index="videos",
body={
"query": {
"range": {
"created_at": {
"gte": f"now-{time_range}/d"
}
}
},
"sort": [
{"stats.likes": {"order": "desc"}},
{"stats.views": {"order": "desc"}}
],
"size": size
}
)
return [hit['_source'] for hit in response['hits']['hits']]
五、实时反馈与模型更新
def log_user_interaction(user_id, video_id, interaction_type):
# 记录用户交互
timestamp = datetime.utcnow().isoformat()
script = """
if (ctx._source.watch_history == null) {
ctx._source.watch_history = [];
}
boolean found = false;
for (int i = 0; i < ctx._source.watch_history.size(); i++) {
if (ctx._source.watch_history[i].video_id == params.video_id) {
ctx._source.watch_history[i].last_watched = params.timestamp;
if (params.interaction_type == 'watch') {
ctx._source.watch_history[i].watch_count += 1;
} else {
if (ctx._source.watch_history[i].interactions == null) {
ctx._source.watch_history[i].interactions = [];
}
ctx._source.watch_history[i].interactions.add(
{
'type': params.interaction_type,
'timestamp': params.timestamp
}
);
}
found = true;
break;
}
}
if (!found && params.interaction_type == 'watch') {
ctx._source.watch_history.add(
{
'video_id': params.video_id,
'first_watched': params.timestamp,
'last_watched': params.timestamp,
'watch_count': 1,
'interactions': []
}
);
}
"""
es.update(
index="user_profiles",
id=user_id,
body={
"script": {
"source": script,
"lang": "painless",
"params": {
"video_id": video_id,
"interaction_type": interaction_type,
"timestamp": timestamp
}
}
}
)
# 更新视频统计
if interaction_type in ['like', 'share', 'comment']:
es.update(
index="videos",
id=video_id,
body={
"script": {
"source": f"ctx._source.stats.{interaction_type}s += 1",
"lang": "painless"
}
}
)
六、性能优化与扩展
索引优化:
- 为常用查询字段设置合适的mapping类型
- 使用index sorting预排序
- 合理设置分片数和副本数
查询优化:
- 使用filter context缓存常用过滤条件
- 合理使用bool查询的must/should/filter组合
- 限制返回字段数量
缓存策略:
- 使用Redis缓存热门推荐结果
- 实现用户推荐结果的短期缓存
- 对向量相似度计算实现近似最近邻(ANN)搜索
扩展性考虑:
- 实现AB测试框架评估不同推荐策略
- 设计插件式架构便于添加新的推荐算法
- 考虑使用Elasticsearch的机器学习功能进行异常检测
七、总结
本文详细介绍了基于Elasticsearch构建短视频平台个性化推荐系统的完整方案。通过结合内容推荐、协同过滤和语义向量相似度等多种技术,我们能够为用户提供精准的个性化内容推荐。Elasticsearch的强大搜索和分析能力使其成为构建推荐系统的理想选择。
实际应用中,还需要考虑以下方面:
- 冷启动问题的解决方案
- 推荐多样性与惊喜度的平衡
- 实时推荐与批量推荐的结合
- 推荐结果的解释性