基于贝叶斯的营销组合模型实战案例(PyMC实践)

发布于:2025-08-08 ⋅ 阅读:(14) ⋅ 点赞:(0)

文章出自:基于营销预算优化的媒体投入分配研究

本篇技术亮点在于结合了广告饱和度和累积效应,通过数学模型和数值优化方法,精确计算电视与数字媒体的最佳预算分配比例,实现增量销售最大化。该方法适合有多渠道广告投放需求、预算有限且追求投资回报率最大化的企业。
实际应用案例包括零售品牌在节假日促销期间,通过调整电视和数字广告预算比例,提高整体销售额;以及快消品企业根据不同渠道表现动态调整广告投放,实现更高效的市场覆盖和销售转化。



在这里插入图片描述

1 Bayes 定理是什么?

Bayes 定理是一种基于新证据更新我们对某件事信念的方法。

你有一个初始信念(先验),然后你观察到一些数据(证据)。Bayes 定理帮助你根据新数据调整你的信念。

  • P ( A ) P(A) P(A):先验 — 你对事件 A 的初始信念。
  • P ( B ∣ A ) P(B|A) P(BA):似然 — 如果 A 为真,观察到 B 的可能性。
  • P ( B ) P(B) P(B):边缘概率 — 观察到 B 的总概率。
  • P ( A ∣ B ) P(A|B) P(AB):后验 — 看到 B 后对 A 的更新信念。

假设:

  • 某人患病的概率是 1%(先验)。
  • 如果患病,检测准确率是 99%(似然)。
  • 但也存在假阳性。

Bayes 定理帮助我们重新计算该人在检测呈阳性后实际患病的概率——这不一定是 99%!

2 什么是营销组合模型(MMM)?

MMM 是一种统计分析,估计每个营销投入(电视、数字、促销等)对销售的贡献。

Bayes 定理为贝叶斯 MMM 提供基础,它将焦点从单点估计转向完整的概率分布。在贝叶斯 MMM 中,我们从对每个营销渠道影响销售的先验信念开始(先验),然后利用观察到的数据(媒体花费和销售)更新这些信念,形成后验分布——反映了历史理解和实际证据的结合。
这种方法不仅告诉我们渠道最可能的投资回报率,还告诉我们对该估计的不确定性。通过显式建模不确定性,贝叶斯 MMM 在数据噪声大、媒体效应非线性以及需要可信区间而非单点估计的实际场景中,支持更稳健的决策。

3 传统 MMM 与贝叶斯 MMM 的对比

3.1 传统 MMM(频率派):

  • 使用线性回归。
  • 提供点估计(例如“电视的系数是0.3”)。
  • 没有先验信念,只拟合观察数据的最佳模型。
  • 给出点估计及置信区间(但置信区间是次要的)。
  • 例如,电视的系数是 0.25,意味着电视花费 1 亿卢比带来 2500 万卢比的销售提升。

3.2 贝叶斯 MMM:

  • 从一个(先验)开始,例如“我们认为电视对销售贡献在10%到30%之间”。
  • 观察实际活动数据(证据)。
  • 使用 Bayes 定理更新信念,得到后验分布。
  • 不仅给出单一系数,而是给出一个系数的可能值分布,结合先验和数据。

3.3 贝叶斯 MMM 的主要优势:

  1. 融入先验知识(如过往活动、行业基准)。
  2. 给出分布,而非单点估计(建模不确定性)。
  3. 对稀疏或噪声数据更鲁棒。
  4. 可以建模层级结构,比如品牌层级和区域层级的 MMM。

4 什么是 Adstock?

Adstock 用于考虑媒体的滞后效应。例如:

“今天的电视广告可能影响未来几天或几周的销售,而不仅仅是当天。”


5 用 PyMC 演示贝叶斯 MMM建模案例

5.1 PyMC 是什么?

PyMC 是一个 Python 库,用于构建贝叶斯模型,使用概率分布定义模型,并通过 MCMC 等采样算法估计后验。

它允许你:

  • 定义先验(观察数据前的信念)
  • 设置似然(数据生成方式)
  • 计算后验(观察数据后的更新信念)
import pymc as pm  
import arviz as az  
import pandas as pd  
import numpy as np

data = {  
    "sales":   [100, 120, 130, 125, 150, 160],    
    "tv":      [10, 15, 20, 15, 25, 30],          
    "digital": [5, 7, 10, 8, 12, 14]              
}

df = pd.DataFrame(data)  

with pm.Model() as model:

    alpha = pm.Normal("intercept", mu=0, sigma=10)               
    beta_tv = pm.Normal("beta_tv", mu=0.2, sigma=0.1)             
    beta_digital = pm.Normal("beta_digital", mu=0.3, sigma=0.1)   
    sigma = pm.HalfNormal("sigma", sigma=10)                   

    mu = alpha + beta_tv * df["tv"] + beta_digital * df["digital"]

    y_obs = pm.Normal("sales", mu=mu, sigma=sigma, observed=df["sales"])

with model:  
    trace = pm.sample(draws=200,         
                      tune=100,          
                      target_accept=0.9,   
                      random_seed=42)   

az.plot_posterior(trace,  
                  var_names=["beta_tv", "beta_digital"],  
                  hdi_prob=0.9)  

后验分布示意图
MCMC

MCMC(马尔可夫链蒙特卡洛)是一种用于从复杂概率分布中生成样本的方法,当精确解难以获得时非常有用。
它通过构建一条猜测链(马尔可夫链),并利用随机性(蒙特卡洛方法)探索在模型下更可能的取值。
这是贝叶斯推断的核心——根据数据学习参数的可能取值。
在贝叶斯统计中,MCMC 帮助估计参数的后验分布——即给定数据后哪些参数值最有可能。
生成的一系列样本代表了未知参数的可能取值,帮助你做出概率性的结论。

解释

HDI(高密度区间)用于总结参数的后验分布。
它告诉你,在所选置信水平(如90%)下,真实参数值很可能落在这个区间内。

例如:

  • 有90%的概率电视广告的效应介于0.093到0.43之间。
  • 有90%的概率数字广告的效应介于0.16到0.49之间。

5.2 Geometric Adstock 函数示例


# 🧪 Adstock in PyMC – Step-by-Step

# Define a geometric adstock function in NumPy (for simulation or explanation)
def geometric_adstock(x, theta):
    """
    Geometric Adstock: Applies geometric decay to a time series.
    - θ (theta) is the decay rate: newer impressions have more impact.
    - Pros: Simple, interpretable, computationally efficient.
    """
    result = np.zeros_like(x)
    for t in range(len(x)):
        for i in range(t + 1):
            result[t] += x[t - i] * theta**i  # Decay previous values
    return result


# ----------------------------------------
# Step 1: Import Libraries and Prepare Data
# ----------------------------------------

import pymc as pm
import numpy as np
import pandas as pd
import pytensor.tensor as at
import arviz as az

# Simulated marketing dataset with weekly sales and media spend
df = pd.DataFrame({
    "sales":    [100, 120, 130, 125, 150, 160],   # target variable
    "tv":       [10, 15, 20, 15, 25, 30],         # TV spend
    "digital":  [5, 7, 10, 8, 12, 14]             # Digital spend
})


# ----------------------------------------
# Step 2: Define Adstock Transformation in PyTensor (Aesara-compatible)
# ----------------------------------------

def adstock_aesara(x, theta):
    """
    Geometric Adstock in PyTensor for use inside PyMC models.
    - Handles symbolic differentiation.
    - Applies decay to each time step using theta.
    """
    result = at.zeros_like(x)
    for i in range(x.shape[0]):
        # Reverse cumulative weighted sum with decay factor
        result = at.set_subtensor(result[i], at.sum(x[:i+1][::-1] * theta**at.arange(i+1)))
    return result


# ----------------------------------------
# Step 3: Define PyMC Bayesian Model Using Adstock
# ----------------------------------------

with pm.Model() as model:

    # Prior for adstock decay (between 0 and 1 using Beta distribution)
    theta_tv = pm.Beta("theta_tv", alpha=2, beta=2)           # Decay for TV
    theta_digital = pm.Beta("theta_digital", alpha=2, beta=2) # Decay for Digital

    # Apply adstock transformation on media channels
    tv_adstocked = adstock_aesara(df["tv"].values.astype("float32"), theta_tv)
    digital_adstocked = adstock_aesara(df["digital"].values.astype("float32"), theta_digital)

    # Priors for regression coefficients
    alpha = pm.Normal("intercept", mu=0, sigma=10)            # Baseline sales
    beta_tv = pm.Normal("beta_tv", mu=0.2, sigma=0.1)         # Effect of TV spend
    beta_digital = pm.Normal("beta_digital", mu=0.3, sigma=0.1)  # Effect of Digital spend
    sigma = pm.HalfNormal("sigma", sigma=10)                  # Noise term

    # Linear combination of adstocked media and intercept
    mu = alpha + beta_tv * tv_adstocked + beta_digital * digital_adstocked

    # Likelihood: how the observed sales are generated
    y_obs = pm.Normal("sales", mu=mu, sigma=sigma, observed=df["sales"].values)


# ----------------------------------------
# Step 4: Run MCMC Sampling
# ----------------------------------------

with model:
    trace = pm.sample(2000,      # Number of posterior samples
                      tune=1000, # Burn-in period
                      target_accept=0.9,  # Prevent divergent samples
                      random_seed=42)    # Reproducibility


# ----------------------------------------
# Step 5: Visualize Posterior Distributions
# ----------------------------------------

# Plot the estimated distributions of the model parameters
az.plot_posterior(trace,
                  var_names=["beta_tv", "beta_digital", "theta_tv", "theta_digital"],
                  hdi_prob=0.9)  # Show 90% HDI (credible intervals)

后验分布示意图
Adstock 考虑了媒体的滞后效应
今天的电视广告可能会在接下来的几天或几周内影响销售,而不仅仅是当天

  • beta_tv(0.15 – 0.48)— 每单位累积电视广告能提升约0.32的销售额
  • theta_tv(0.34 – 0.99)— 电视广告每周保留大约67%的影响力
  • beta_digital(0.19 - 0.52)— 数字广告稍强,能提升约0.67的销售额
  • theta_digital(0.25 – 0.96)— 数字广告衰减较快,θ约为59%;主要是短期影响
  • Adstock 中较小的 theta 值表示衰减更快/记忆时间更短

5.3 Adstock 加饱和效应函数

# ----------------------------------------
# 🧪 Step 1: Imports and Simulated Data
# ----------------------------------------

import pymc as pm
import numpy as np
import pandas as pd
import pytensor.tensor as at
import arviz as az

# Small mock dataset to illustrate MMM with media and sales
df = pd.DataFrame({
    "sales":    [100, 120, 130, 125, 150, 160],  # weekly sales (target variable)
    "tv":       [10, 15, 20, 15, 25, 30],        # TV spend per week
    "digital":  [5, 7, 10, 8, 12, 14]            # Digital spend per week
})


# ----------------------------------------
# 🚀 Step 2: Adstock + Saturation Function (Aesara compatible)
# ----------------------------------------

def adstock_saturation(x, theta, alpha):
    """
    Applies geometric adstock and log-saturation to input spend.

    - Adstock: captures delayed/lagged effect of media (e.g. recall over time).
    - Saturation: diminishing returns as spend increases (logarithmic effect).
    - `theta`: decay factor (0 to 1), controls memory decay.
    - `alpha`: saturation multiplier, scales the response curve.
    """
    result = at.zeros_like(x)
    for i in range(x.shape[0]):
        # Weighted cumulative sum with decay applied in reverse
        result = at.set_subtensor(
            result[i],
            at.sum(x[:i+1][::-1] * theta**at.arange(i+1))
        )
    return at.log1p(alpha * result)  # log(1 + α × adstocked)


# ----------------------------------------
# 📦 Step 3: Define PyMC Model with Adstock + Saturation
# ----------------------------------------

with pm.Model() as model:

    # ---- Adstock + Saturation Priors ----
    theta_tv = pm.Beta("theta_tv", alpha=2, beta=2)              # TV adstock decay
    alpha_tv = pm.HalfNormal("alpha_tv", sigma=1)                # TV saturation effect

    theta_digital = pm.Beta("theta_digital", alpha=2, beta=2)    # Digital adstock decay
    alpha_digital = pm.HalfNormal("alpha_digital", sigma=1)      # Digital saturation effect

    # ---- Apply transformation ----
    tv_transformed = adstock_saturation(df["tv"].values.astype("float32"), theta_tv, alpha_tv)
    digital_transformed = adstock_saturation(df["digital"].values.astype("float32"), theta_digital, alpha_digital)

    # ---- Regression Coefficients ----
    intercept = pm.Normal("intercept", mu=0, sigma=10)             # Baseline sales
    beta_tv = pm.Normal("beta_tv", mu=0.3, sigma=0.1)              # TV effectiveness
    beta_digital = pm.Normal("beta_digital", mu=0.3, sigma=0.1)    # Digital effectiveness
    sigma = pm.HalfNormal("sigma", sigma=10)                       # Residual noise

    # ---- Expected sales (Linear model) ----
    mu = intercept + beta_tv * tv_transformed + beta_digital * digital_transformed

    # ---- Likelihood ----
    y_obs = pm.Normal("sales", mu=mu, sigma=sigma, observed=df["sales"].values)


# ----------------------------------------
# 🔁 Step 4: Run MCMC Sampling
# ----------------------------------------

with model:
    trace = pm.sample(400,         # Posterior samples
                      tune=200,    # Warm-up
                      target_accept=0.9,  # Higher value = fewer divergences
                      random_seed=42)     # Reproducibility


# ----------------------------------------
# 📈 Step 5: Visualize Posterior Distributions
# ----------------------------------------

az.plot_posterior(
    trace,
    var_names=[
        "beta_tv", "beta_digital",         # Channel effectiveness
        "theta_tv", "theta_digital",       # Adstock memory
        "alpha_tv", "alpha_digital"        # Saturation impact
    ],
    hdi_prob=0.9  # 90% highest density interval (credible range)
)

后验分布示意图
解释

  • alpha_tv(0.00056 到 1.7)这是一个非常宽的区间,表明 alpha_tv 的具体数值存在显著不确定性
  • alpha_digital(0.00054 到 1.7)同样表明 alpha_digital 的具体数值存在不确定性

Beta
给定媒体的效应大小(经过饱和和累积效应后的结果)

theta
衰减率(电视广告效果持续的时间长短)

Alpha
alpha 通常代表饱和度或边际效应递减参数
较高的 alpha 表明该渠道可能非常有效
宽广且正偏的分布意味着存在较大的不确定性


5.4 模拟数据与完整贝叶斯 MMM

# Importing necessary libraries
import pymc as pm                      # PyMC for Bayesian modeling
import numpy as np                    # NumPy for numerical computations
import pandas as pd                   # Pandas for data handling
import pytensor.tensor as at          # PyTensor (Aesara) for symbolic tensor operations
import arviz as az                    # ArviZ for posterior analysis and diagnostics
import matplotlib.pyplot as plt       # For plotting

# ========================
# 1. Simulate media and sales data
# ========================

np.random.seed(42)  # Set seed for reproducibility
n = 20              # Number of observations (weeks)

# Generate random spend data for TV and Digital
tv = np.random.uniform(10, 100, n)
digital = np.random.uniform(5, 50, n)

# Adstock + Saturation function (NumPy version for simulation)
def adstock_saturation_np(x, theta, alpha):
    """
    Simulates adstock effect using geometric decay and saturation.
    - theta: decay rate (memory of past ad spends)
    - alpha: controls saturation (diminishing returns)
    """
    result = np.zeros_like(x)
    for t in range(len(x)):
        for i in range(t + 1):
            result[t] += x[t - i] * theta ** i
    return np.log1p(alpha * result)  # Saturation applied after adstock

# True parameter values used only for data generation
true_params = {
    "theta_tv": 0.6, "alpha_tv": 0.04, "beta_tv": 1.8,
    "theta_digital": 0.4, "alpha_digital": 0.07, "beta_digital": 2.4,
    "intercept": 50
}

# Generate media effects
tv_effect = adstock_saturation_np(tv, true_params["theta_tv"], true_params["alpha_tv"])
digital_effect = adstock_saturation_np(digital, true_params["theta_digital"], true_params["alpha_digital"])

# Simulate weekly sales as a combination of TV + Digital effects + noise
sales = (
    true_params["intercept"]
    + true_params["beta_tv"] * tv_effect
    + true_params["beta_digital"] * digital_effect
    + np.random.normal(0, 5, n)  # Add noise
)

# Create DataFrame
df = pd.DataFrame({"sales": sales, "tv": tv, "digital": digital})

# ========================
# 2. PyMC-compatible Adstock + Saturation function
# ========================

def adstock_saturation(x, theta, alpha):
    """
    PyTensor-compatible adstock with saturation function.
    Used inside PyMC probabilistic model.
    """
    result = at.zeros_like(x)
    for i in range(x.shape[0]):
        result = at.set_subtensor(result[i], at.sum(x[:i+1][::-1] * theta ** at.arange(i+1)))
    return at.log1p(alpha * result)

# ========================
# 3. Bayesian Model Definition
# ========================

with pm.Model() as model:
    # Priors for decay (theta) and saturation (alpha) parameters
    theta_tv = pm.Beta("theta_tv", alpha=2, beta=2)
    alpha_tv = pm.HalfNormal("alpha_tv", sigma=1)
    theta_digital = pm.Beta("theta_digital", alpha=2, beta=2)
    alpha_digital = pm.HalfNormal("alpha_digital", sigma=1)

    # Apply adstock + saturation transformations
    tv_trans = adstock_saturation(df["tv"].values.astype("float32"), theta_tv, alpha_tv)
    digital_trans = adstock_saturation(df["digital"].values.astype("float32"), theta_digital, alpha_digital)

    # Linear regression coefficients
    intercept = pm.Normal("intercept", mu=0, sigma=50)
    beta_tv = pm.Normal("beta_tv", mu=0, sigma=5)
    beta_digital = pm.Normal("beta_digital", mu=0, sigma=5)
    sigma = pm.HalfNormal("sigma", sigma=5)

    # Linear model for expected sales
    mu = intercept + beta_tv * tv_trans + beta_digital * digital_trans

    # Likelihood: observed sales
    y_obs = pm.Normal("sales", mu=mu, sigma=sigma, observed=df["sales"].values)

    # Sample from posterior
    trace = pm.sample(400, tune=200, target_accept=0.9, random_seed=42)

# ========================
# 4. Posterior Summary and Visualization
# ========================

# Summarize key posterior estimates
az.summary(trace, var_names=["theta_tv", "alpha_tv", "beta_tv", "theta_digital", "alpha_digital", "beta_digital"])

# Plot posterior distributions
az.plot_posterior(trace, var_names=["theta_tv", "alpha_tv", "beta_tv", "theta_digital", "alpha_digital", "beta_digital"])
plt.show()

# ========================
# 5. ROI Estimation
# ========================

# Compute posterior means of key parameters
theta_tv_mean = trace.posterior["theta_tv"].mean().values
alpha_tv_mean = trace.posterior["alpha_tv"].mean().values
beta_tv_mean = trace.posterior["beta_tv"].mean().values

theta_digital_mean = trace.posterior["theta_digital"].mean().values
alpha_digital_mean = trace.posterior["alpha_digital"].mean().values
beta_digital_mean = trace.posterior["beta_digital"].mean().values

# Reconstruct adstocked values using posterior means
tv_adstock = adstock_saturation_np(df["tv"].values, theta_tv_mean, alpha_tv_mean)
digital_adstock = adstock_saturation_np(df["digital"].values, theta_digital_mean, alpha_digital_mean)

# Calculate ROI = (total incremental sales) / (total spend)
roi_tv = (beta_tv_mean * tv_adstock.sum()) / df["tv"].sum()
roi_digital = (beta_digital_mean * digital_adstock.sum()) / df["digital"].sum()

print(f"ROI (TV): {roi_tv:.2f}")
print(f"ROI (Digital): {roi_digital:.2f}")

# ========================
# 6. Response Curve Visualization
# ========================

# Create smooth spend range to visualize media response curves
tv_range = np.linspace(0, 100, 100)
tv_effect_curve = adstock_saturation_np(tv_range, theta_tv_mean, alpha_tv_mean) * beta_tv_mean

digital_range = np.linspace(0, 60, 100)
digital_effect_curve = adstock_saturation_np(digital_range, theta_digital_mean, alpha_digital_mean) * beta_digital_mean

# Plot response curves
plt.figure(figsize=(10, 5))
plt.plot(tv_range, tv_effect_curve, label="TV Response")
plt.plot(digital_range, digital_effect_curve, label="Digital Response")
plt.xlabel("Media Spend")
plt.ylabel("Predicted Incremental Sales")
plt.title("Media Response Curves")
plt.legend()
plt.grid(True)
plt.show()

响应曲线示意图
在这里插入图片描述
数字媒体的早期优势:相比电视,数字媒体在初期投入时带来更高的增量销售。

数字回报递减:数字媒体的响应更快趋于平缓,表明更快达到饱和。

电视的稳定增长:电视广告随着投入增加,销售额呈现更缓慢但持续的增长。

需要战略性组合:实现最佳销售效果可能需要电视和数字投入的合理组合。


5.5 预算优化示例

from scipy.optimize import minimize

# This function calculates the expected incremental sales from given spends on TV and Digital
# It applies the Adstock + Saturation transformation to capture lagged and diminishing returns
def incremental_sales_response(tv, digital, theta_tv, alpha_tv, beta_tv, theta_digital, alpha_digital, beta_digital):

    # Adstock + Saturation using numpy (no PyMC or Theano involved)
    # Adstock simulates how past media spend impacts current sales (carryover effect)
    # Saturation simulates diminishing returns — each additional unit of spend has less effect
    def adstock_saturation_np(x, theta, alpha):
        result = np.zeros_like(x)
        # Apply geometric adstock recursively: past spends decay over time with rate θ
        for t in range(len(x)):
            for i in range(t+1):
                result[t] += x[t - i] * theta**i
        # Saturation using log1p for stability: α scales response
        return np.log1p(alpha * result)

    # Since this function is designed for optimization (scalar inputs), we wrap the inputs in arrays
    tv_arr = np.array([tv])
    digital_arr = np.array([digital])

    # Apply adstock + saturation transformation and multiply by β (regression coefficient)
    tv_response = adstock_saturation_np(tv_arr, theta_tv, alpha_tv)[0] * beta_tv
    digital_response = adstock_saturation_np(digital_arr, theta_digital, alpha_digital)[0] * beta_digital

    # Total predicted incremental sales from both channels
    return tv_response + digital_response


# This function finds the optimal spend allocation between TV and Digital
# to maximize incremental sales under a fixed total budget using scipy.optimize
def optimize_budget(trace, total_budget):
    # Extract posterior mean estimates for response function parameters
    theta_tv = trace.posterior["theta_tv"].mean().values
    alpha_tv = trace.posterior["alpha_tv"].mean().values
    beta_tv = trace.posterior["beta_tv"].mean().values

    theta_digital = trace.posterior["theta_digital"].mean().values
    alpha_digital = trace.posterior["alpha_digital"].mean().values
    beta_digital = trace.posterior["beta_digital"].mean().values

    # Define the objective function for optimization
    # It returns the negative of incremental sales (because we minimize in scipy)
    def objective(x):
        tv_spend, digital_spend = x
        if tv_spend < 0 or digital_spend < 0:
            return 1e6  # Large penalty for infeasible solutions
        return -incremental_sales_response(tv_spend, digital_spend,
                                           theta_tv, alpha_tv, beta_tv,
                                           theta_digital, alpha_digital, beta_digital)

    # Constraint: total spend should not exceed the budget
    constraints = ({
        'type': 'ineq',
        'fun': lambda x: total_budget - (x[0] + x[1])
    })

    # Spend on both channels should be non-negative and not exceed total budget
    bounds = [(0, total_budget), (0, total_budget)]

    # Start with equal allocation as an initial guess
    x0 = [total_budget / 2, total_budget / 2]

    # Run the optimization using Sequential Least Squares Programming (SLSQP)
    result = minimize(objective, x0, bounds=bounds, constraints=constraints)

    # If optimization is successful, return optimal spends and sales
    if result.success:
        tv_opt, digital_opt = result.x
        max_incremental_sales = -result.fun  # flip the sign back to get actual sales
        return {
            "tv_spend": tv_opt,
            "digital_spend": digital_opt,
            "expected_incremental_sales": max_incremental_sales
        }
    else:
        # Raise an error if optimizer fails (e.g., due to poor parameter initialization)
        raise RuntimeError("Optimization failed: " + result.message)


if __name__ == "__main__":
    # Simulate sample media + sales data
    df = simulate_data()

    # Fit Bayesian MMM model on the data
    trace = run_bayesian_mmm(df)

    # Show posterior estimates for parameters of interest
    print(az.summary(trace, var_names=[
        "theta_tv", "alpha_tv", "beta_tv",
        "theta_digital", "alpha_digital", "beta_digital",
        "beta_sin", "beta_cos"
    ]))

    # Plot posterior distributions to see parameter uncertainty
    az.plot_posterior(trace, var_names=[
        "theta_tv", "alpha_tv", "beta_tv",
        "theta_digital", "alpha_digital", "beta_digital"
    ])
    plt.show()

    # Plot how each media channel contributes to response across a spend range
    plot_response_curves(trace)

    # Define a fixed total budget (e.g., ₹100 units)
    total_budget = 100

    # Run optimization to find best split of budget between TV and Digital
    opt_results = optimize_budget(trace, total_budget)

    # Display the results of the optimization
    print("\nOptimal Budget Allocation:")
    print(f"TV Spend: {opt_results['tv_spend']:.2f}")
    print(f"Digital Spend: {opt_results['digital_spend']:.2f}")
    print(f"Expected Incremental Sales: {opt_results['expected_incremental_sales']:.2f}")

优化结果示意图
在这里插入图片描述
Optimal Budget Allocation:

  • TV Spend: 53.37
  • Digital Spend: 46.63
  • Expected Incremental Sales: 10.72

最有效的预算分配方式是电视占总预算的54.54%,数字媒体占总预算的45.46%
这种具体分配预计能带来10.89单位的增量销售

过程 :

  1. 使用 incremental_sales_response 函数对媒体投入与增量销售之间的关系进行建模(包括累积效应和饱和效应)
  2. 利用贝叶斯后验均值作为模型参数,获得这些关系的“最佳估计”
  3. 采用 scipy.optimize.minimize 进行数值搜索,在总预算固定的情况下,寻找最大化预测增量销售的电视和数字投入组合
    minimize 函数在给定的边界和约束条件下,迭代调整电视投入和数字投入,在每一步评估函数,直到收敛到一个解。

5.6 多渠道模拟与约束优化

import numpy as np
import pandas as pd
import pymc as pm
import pytensor.tensor as pt
import matplotlib.pyplot as plt
from scipy.optimize import minimize
import arviz as az

# ----------------------------------------
# 1. Simulate synthetic marketing + sales data
# ----------------------------------------
def simulate_data(n_days=100):
    np.random.seed(42)  # For reproducibility

    # Random media spends for 3 channels
    tv = np.random.uniform(0, 60, size=n_days)
    digital = np.random.uniform(0, 40, size=n_days)
    radio = np.random.uniform(0, 30, size=n_days)

    # Define adstock transformation (carryover effect of media)
    def adstock(x, theta):
        result = np.zeros_like(x)
        for t in range(len(x)):
            for i in range(t + 1):
                result[t] += x[t - i] * theta**i  # Geometric decay
        return result

    # Define channel-specific transformation parameters
    theta_tv, theta_digital, theta_radio = 0.5, 0.6, 0.3   # Decay rate
    alpha_tv, alpha_digital, alpha_radio = 0.8, 1.0, 0.6   # Saturation scale
    beta_tv, beta_digital, beta_radio = 4.0, 6.0, 2.0      # Effect size

    # Simulate sales with media contributions + Gaussian noise
    sales = (
        beta_tv * np.log1p(alpha_tv * adstock(tv, theta_tv)) +
        beta_digital * np.log1p(alpha_digital * adstock(digital, theta_digital)) +
        beta_radio * np.log1p(alpha_radio * adstock(radio, theta_radio)) +
        np.random.normal(0, 5, n_days)  # Random noise
    )

    # Return a tidy dataframe
    df = pd.DataFrame({
        "tv": tv,
        "digital": digital,
        "radio": radio,
        "sales": sales
    })
    return df

# ----------------------------------------
# 2. Bayesian MMM model with PyMC
# ----------------------------------------
def run_bayesian_mmm(df):
    with pm.Model() as model:
        # Intercept term capturing baseline sales
        intercept = pm.Normal("intercept", mu=100, sigma=50)

        # Adstock decay parameters — modeled as Beta to stay within [0, 1]
        theta_tv = pm.Beta("theta_tv", alpha=2, beta=2)
        theta_digital = pm.Beta("theta_digital", alpha=2, beta=2)
        theta_radio = pm.Beta("theta_radio", alpha=2, beta=2)

        # Saturation scale parameters — HalfNormal to enforce positive scaling
        alpha_tv = pm.HalfNormal("alpha_tv", sigma=1)
        alpha_digital = pm.HalfNormal("alpha_digital", sigma=1)
        alpha_radio = pm.HalfNormal("alpha_radio", sigma=1)

        # Beta coefficients — representing channel effectiveness
        beta_tv = pm.Normal("beta_tv", mu=5, sigma=2)
        beta_digital = pm.Normal("beta_digital", mu=5, sigma=2)
        beta_radio = pm.Normal("beta_radio", mu=2, sigma=1)

        # Adstock function using PyTensor
        # Advantage: models how current sales depend on lagged media spend
        def adstock(x, theta):
            return pt.stack([
                pt.sum(x[:t+1][::-1] * theta**pt.arange(t+1))
                for t in range(x.shape[0])
            ])

        # Apply adstock + saturation + effectiveness transformation
        tv_effect = beta_tv * pt.log1p(alpha_tv * adstock(df["tv"].values, theta_tv))
        digital_effect = beta_digital * pt.log1p(alpha_digital * adstock(df["digital"].values, theta_digital))
        radio_effect = beta_radio * pt.log1p(alpha_radio * adstock(df["radio"].values, theta_radio))

        # Predicted mean of sales
        mu = intercept + tv_effect + digital_effect + radio_effect

        # Noise term (standard deviation of sales error)
        sigma = pm.HalfNormal("sigma", sigma=5)

        # Likelihood: observed sales follow Normal distribution with mean = mu
        y_obs = pm.Normal("sales", mu=mu, sigma=sigma, observed=df["sales"].values)

        # Run MCMC sampling to estimate posterior distributions
        trace = pm.sample(
            draws=400,
            tune=200,
            cores=2,
            target_accept=0.9,
            return_inferencedata=True
        )

    return trace

# =============================================================================
# === Core Optimizer ===
# =============================================================================
def optimize_budget_with_constraints(trace, total_budget, channel_constraints):
    """
    Optimizes media budget allocation across channels with constraints
    using posterior means from Bayesian MMM trace.

    Inputs:
        trace: PyMC InferenceData containing posterior samples
        total_budget: total budget to be allocated
        channel_constraints: dict specifying min/max spend and min ROI per channel

    Returns:
        dict with optimal allocation and expected incremental sales
    """
    channel_names = list(channel_constraints.keys())

    # Extract mean posterior parameters (theta, alpha, beta) for each channel
    params = []
    for ch in channel_names:
        params.append({
            "theta": trace.posterior[f"theta_{ch}"].mean().values,
            "alpha": trace.posterior[f"alpha_{ch}"].mean().values,
            "beta": trace.posterior[f"beta_{ch}"].mean().values
        })

    # Function to apply adstock + saturation transformation in NumPy
    # This mirrors the transformation used in model training
    def adstock_saturation_np(x, theta, alpha):
        result = np.zeros_like(x)
        for t in range(len(x)):
            for i in range(t+1):
                result[t] += x[t - i] * theta**i  # Geometric decay
        return np.log1p(alpha * result)  # Saturation via log

    # Channel response = transformed spend × beta
    def channel_response(spend, p):
        return adstock_saturation_np(np.array([spend]), p['theta'], p['alpha'])[0] * p['beta']

    # Objective function to minimize (negative total incremental sales)
    def objective(spends):
        return -sum(channel_response(spend, p) for spend, p in zip(spends, params))

    # Constraint 1: total spend should not exceed the total budget
    constraints = [{'type': 'ineq', 'fun': lambda x: total_budget - np.sum(x)}]

    # Constraint 2: enforce minimum ROI per channel if specified
    for i, ch in enumerate(channel_names):
        min_roi = channel_constraints[ch].get("min_roi")
        if min_roi is not None:
            def roi_fun(x, i=i):
                spend = x[i]
                if spend <= 0:
                    return -1e6  # heavy penalty for zero/negative spend
                response = channel_response(spend, params[i])
                return (response / spend) - min_roi
            constraints.append({'type': 'ineq', 'fun': roi_fun})

    # Bounds: each channel’s spend must be within its min/max spend constraints
    bounds = [
        (channel_constraints[ch].get("min_spend", 0),
         channel_constraints[ch].get("max_spend", total_budget))
        for ch in channel_names
    ]

    # Initial guess: midpoint of each channel’s min and max bounds
    x0 = [(b[0] + b[1]) / 2 for b in bounds]

    # Run optimizer (SLSQP)
    result = minimize(objective, x0, bounds=bounds, constraints=constraints)

    # Output optimal spends and expected sales if optimization is successful
    if result.success:
        opt_spends = result.x
        expected_sales = -result.fun  # negate back from minimization
        return {
            "optimal_allocation": dict(zip(channel_names, opt_spends)),
            "expected_incremental_sales": expected_sales
        }
    else:
        raise RuntimeError("Optimization failed: " + result.message)

# =============================================================================
# === Visualization 1: Budget vs. Expected Sales
# =============================================================================
def plot_budget_vs_sales(trace, channel_constraints, budget_range, step=5):
    """
    Plots expected incremental sales against different total budgets.
    Helps visualize sales lift as a function of investment size.
    """
    budgets = list(range(budget_range[0], budget_range[1] + 1, step))
    sales = []

    for budget in budgets:
        try:
            result = optimize_budget_with_constraints(trace, budget, channel_constraints)
            sales.append(result["expected_incremental_sales"])
        except RuntimeError:
            sales.append(None)  # skip failed optimization

    # Remove failed entries for plotting
    clean_budgets = [b for b, s in zip(budgets, sales) if s is not None]
    clean_sales = [s for s in sales if s is not None]

    plt.figure(figsize=(10, 6))
    plt.plot(clean_budgets, clean_sales, marker='o', color='blue')
    plt.xlabel("Total Media Budget (₹)")
    plt.ylabel("Expected Incremental Sales (₹)")
    plt.title("Budget vs Expected Incremental Sales")
    plt.grid(True)
    plt.tight_layout()
    plt.show()

# =============================================================================
# === Visualization 2: Budget vs Sales and ROI (Dual-axis)
# =============================================================================
def plot_budget_vs_sales_and_roi(trace, channel_constraints, budget_range, step=5):
    """
    Plots budget vs expected sales and ROI (Sales/Budget) together.
    This helps in identifying the inflection point where ROI starts to drop.
    """
    budgets = list(range(budget_range[0], budget_range[1] + 1, step))
    sales, rois = [], []

    for budget in budgets:
        try:
            result = optimize_budget_with_constraints(trace, budget, channel_constraints)
            s = result["expected_incremental_sales"]
            sales.append(s)
            rois.append(s / budget)
        except RuntimeError:
            sales.append(None)
            rois.append(None)

    # Clean out any invalid entries
    clean_budgets = [b for b, s in zip(budgets, sales) if s is not None]
    clean_sales = [s for s in sales if s is not None]
    clean_rois = [r for r in rois if r is not None]

    fig, ax1 = plt.subplots(figsize=(10, 6))

    # Primary axis: Incremental Sales
    ax1.set_xlabel("Total Media Budget (₹)")
    ax1.set_ylabel("Expected Incremental Sales (₹)", color="blue")
    ax1.plot(clean_budgets, clean_sales, color="blue", marker='o')
    ax1.tick_params(axis='y', labelcolor="blue")

    # Secondary axis: ROI
    ax2 = ax1.twinx()
    ax2.set_ylabel("ROI (Sales per ₹)", color="green")
    ax2.plot(clean_budgets, clean_rois, color="green", marker='x', linestyle='--')
    ax2.tick_params(axis='y', labelcolor="green")

    plt.title("Trade-Off: Budget vs Sales & ROI")
    fig.tight_layout()
    plt.grid(True)
    plt.show()

# =============================================================================
# === Visualization 3: Stacked Channel Mix by Budget
# =============================================================================
def plot_channel_mix_by_budget(trace, channel_constraints, budget_range, step=5):
    """
    Visualizes how the allocation mix across media channels shifts
    as the total budget increases.
    """
    budgets = list(range(budget_range[0], budget_range[1] + 1, step))
    mix_data = []

    for budget in budgets:
        try:
            result = optimize_budget_with_constraints(trace, budget, channel_constraints)
            mix_data.append(result["optimal_allocation"])
        except RuntimeError:
            continue

    if not mix_data:
        print("No valid optimizations found.")
        return

    # Create a dataframe where rows = budgets, columns = channels
    df_mix = pd.DataFrame(mix_data, index=[b for b in budgets if len(mix_data) > 0][:len(mix_data)])

    # Convert absolute spends to percentages
    df_mix_perc = df_mix.divide(df_mix.sum(axis=1), axis=0) * 100

    # Plot stacked bar chart
    ax = df_mix_perc.plot(
        kind="bar",
        stacked=True,
        figsize=(12, 6),
        colormap="tab20",
        width=0.8
    )
    plt.ylabel("Channel Mix (%)")
    plt.xlabel("Total Media Budget (₹)")
    plt.title("Optimal Channel Mix by Budget Level")
    plt.legend(title="Channel", bbox_to_anchor=(1.05, 1), loc="upper left")
    plt.tight_layout()
    plt.grid(True, axis='y', linestyle='--', alpha=0.6)
    plt.show()

if __name__ == "__main__":
    # Simulate input data for media spends and target variable (e.g., sales)
    df = simulate_data()

    # Fit a Bayesian Marketing Mix Model (MMM) using the simulated data
    trace = run_bayesian_mmm(df)

    # Define constraints for each media channel
    # min_spend and max_spend define bounds for allocation
    # min_roi ensures that allocation to a channel only happens if its ROI is above this threshold
    channel_constraints = {
        "tv": {"min_spend": 10, "max_spend": 60, "min_roi": 0.05},
        "digital": {"min_spend": 5, "max_spend": 40, "min_roi": 0.07},
        "radio": {"min_spend": 0, "max_spend": 30, "min_roi": 0.03}
    }

    # Run the optimizer to allocate a fixed total budget (₹100) across channels
    # while adhering to the defined constraints and maximizing incremental sales
    result = optimize_budget_with_constraints(trace, total_budget=100, channel_constraints=channel_constraints)

    print("\nOptimal Allocation:")
    # Display the optimal spend allocation per channel
    # The optimizer returns a dictionary where keys are channels and values are optimal spends
    for ch, v in result["optimal_allocation"].items():
        print(f"{ch.capitalize()}: ₹{v:.2f}")

    # Display the expected incremental sales from this optimal allocation
    print(f"Expected Incremental Sales: ₹{result['expected_incremental_sales']:.2f}")

    # Plot 1: Budget vs. Expected Incremental Sales across a budget range
    # Helps visualize how incremental sales increase with higher budget
    plot_budget_vs_sales(trace, channel_constraints, budget_range=(60, 150), step=5)

    # Plot 2: Budget vs. Sales and ROI together for richer decision-making
    # Shows ROI variation alongside sales to identify diminishing returns
    plot_budget_vs_sales_and_roi(trace, channel_constraints, budget_range=(60, 150), step=5)

    # Plot 3: Stacked view of how budget is distributed across media channels at each budget level
    # Useful for understanding how mix shifts with available budget
    plot_channel_mix_by_budget(trace, channel_constraints, budget_range=(60, 150), step=5)

Optimal Allocation 优化分配 :

  • Tv: ₹42.36
  • Digital: ₹40.00
  • Radio: ₹17.64
  • Expected Incremental Sales: ₹42.78
    渠道分配示意图1

渠道分配示意图2

渠道分配示意图3
最优预算分配(电视:₹42.27,数字媒体:₹40.00,广播:₹17.73),预计带来₹42.52的增量销售

基本权衡:随着总营销预算的增加,预期增量销售上升,但增长速度递减,导致整体营销投资回报率下降。
使用模拟数据生成不同渠道的销售和媒体投入
渠道约束对于使优化结果更现实且符合业务策略至关重要,因此被纳入考虑。
运行带约束的预算优化器时,它将构建一个目标函数以最大化总增量销售,并显示各渠道的优化投入
通过纳入最低/最高渠道投入和最低投资回报率目标等约束,实现销售最大化。


6 结论

本教程演示了 Bayes 定理如何驱动现代、具备不确定性意识的营销组合建模方法。通过贝叶斯方法,我们不仅估计各渠道的影响,还量化了我们的置信度,从而支持更智能、更灵活的媒体规划。优化器和可视化工具帮助将洞察转化为可执行、基于预算的决策。虽然此代码为教学简化版本,非生产级,但为想超越黑箱 MMM 工具、深入理解媒体、数学与营销策略如何结合的人提供了坚实基础。随着更多真实数据、模型调优和跨职能输入,该框架可发展为营销团队强大的决策引擎。


网站公告

今日签到

点亮在社区的每一天
去签到