Sherry_z
发布于

代码教程 | 倾向得分匹配(PSM)Python 全流程:从理论到代码

做实证研究,最头疼的问题之一就是——处理组和对照组本来就不一样,直接比较「有失公允」。倾向得分匹配(PSM)正是解决这类问题的经典方法。今天我们用 Python,从原理到代码,把 PSM 讲透。



一、为什么需要 PSM?

想象一个场景:你想研究「参加培训」对员工工资的影响。

  • 处理组:参加培训的员工
  • 对照组:没参加培训的员工

问题是——参加培训的人本来可能就更上进、学历更高、更有能力。即使不参加培训,他们的工资也可能更高。直接比较得到的「培训效果」,其实是选择偏差在作祟。

PSM 的核心思想:为每个处理组个体,在对照组中找到一个「长得像」的匹配对象,使得两者在可观测特征上尽可能相似。这样比较的就是「反事实」——如果处理组个体没参加培训,工资会是多少?



二、PSM 三步走

  • Step 1 → 构建倾向得分:用 Logit 回归估计每个个体「被处理」的概率
  • Step 2 → 最近邻匹配:处理组找对照组中倾向得分最接近的个体
  • Step 3 → ATT 估计:比较匹配后处理组与对照组的结局差异

下面一步步来。



三、代码实现

Step 0|环境准备

import pandas as pd
import numpy as np
import statsmodels.api as sm
import statsmodels.formula.api as smf
import matplotlib.pyplot as plt
import warnings

warnings.filterwarnings('ignore')

plt.rcParams['font.sans-serif'] = ['SimHei', 'Arial Unicode MS']
plt.rcParams['axes.unicode_minus'] = False

Step 1|读取数据

# 修改这里:替换为你的数据文件路径
df = pd.read_csv('YOUR_DATA_PATH.csv')

# 数据结构说明(请替换为你的真实变量名):
# - TREAT:处理组虚拟变量(1=处理组,0=对照组)
# - Y:结局变量(如工资、绩效等)
# - X1, X2, X3:协变量(用于估计倾向得分的控制变量)

print(f"样本量:{len(df)}")
print(f"处理组:{df['TREAT'].sum()},对照组:{(df['TREAT']==0).sum()}")

小提示:协变量的选择遵循「既影响处理状态、又影响结局变量,但不中介处理-结局关系」的原则——即著名的 CIA 条件(条件独立假设)。

Step 2|倾向得分估计

用 Logit 回归估计每个个体进入处理组的概率:

# 修改这里:列出所有协变量
formula = 'TREAT ~ X1 + X2 + X3 + X4 + X5'

# 方法一:用 statsmodels 公式接口(推荐,输出更清晰)
logit_model = smf.logit(formula, data=df).fit(disp=False)

# 查看回归结果(检查协变量显著性)
print(logit_model.summary())

# 提取倾向得分(每个个体「被处理」的概率)
df['pscore'] = logit_model.predict(df)

print(f"\n倾向得分范围:[{df['pscore'].min():.4f}, {df['pscore'].max():.4f}]")
print(f"倾向得分均值:{df['pscore'].mean():.4f}")

重要检验:如果处理组和对照组的倾向得分分布没有重叠区域(overlap assumption 违反),则无法进行有效匹配。需要先检查这一步。

重叠假设检验

# 画出处理组 vs 对照组的倾向得分分布
treated_ps = df[df['TREAT'] == 1]['pscore']
control_ps = df[df['TREAT'] == 0]['pscore']

plt.figure(figsize=(8, 5))
plt.hist(treated_ps, bins=50, alpha=0.6, label='处理组', color='steelblue')
plt.hist(control_ps, bins=50, alpha=0.6, label='对照组', color='coral')

plt.xlabel('倾向得分', fontsize=12)
plt.ylabel('频数', fontsize=12)
plt.title('倾向得分分布对比(Overlap 检验)', fontsize=14)
plt.legend()
plt.tight_layout()
plt.savefig('pscore_distribution.png', dpi=150)
plt.show()

print("重叠假设检验图已保存")

合格标准:两条曲线有明显的重叠区域。如果处理组得分全在对照组右侧(或反之),则存在共同支撑域(common support)问题,需要删除不满足条件的样本。

Step 3|最近邻匹配

手写最近邻匹配(无依赖,逻辑透明)

def nearest_neighbor_matching(df, treatment_col, pscore_col, replace=True, caliper=None):
    """
    最近邻匹配
    
    参数:
        df            : DataFrame
        treatment_col : 处理组列名
        pscore_col    : 倾向得分列名
        replace       : 是否允许有放回匹配(True=1:1最近邻,False=不重复匹配)
        caliper       : 卡尺半径(如 0.05),超过此距离不匹配
    
    返回:
        匹配后的 DataFrame,包含 match_id 列
    """
    
    treated = df[df[treatment_col] == 1].copy()
    control = df[df[treatment_col] == 0].copy()
    
    matched_control_idx = []
    
    for idx, row in treated.iterrows():
        pscore_t = row[pscore_col]
        
        # 计算与所有对照组的得分距离
        distances = np.abs(control[pscore_col] - pscore_t)
        
        if caliper is not None:
            distances[distances > caliper] = np.inf
        
        # 找到距离最小的对照组
        min_idx = distances.idxmin()
        
        # 不重复匹配(可选)
        if not replace and min_idx in matched_control_idx:
            distances[min_idx] = np.inf
            min_idx = distances.idxmin()
        
        matched_control_idx.append(min_idx)
    
    # 构建匹配后的数据集
    treated_matched = treated.copy()
    treated_matched['match_id'] = matched_control_idx
    
    control_matched = control.loc[matched_control_idx].copy()
    control_matched['match_id'] = matched_control_idx
    control_matched.index.name = 'original_index'
    
    # 合并
    matched_df = pd.concat([treated_matched, control_matched], ignore_index=True)
    return matched_df


# 不设卡尺(默认1:1最近邻,有放回)
matched_df = nearest_neighbor_matching(
    df,
    treatment_col='TREAT',
    pscore_col='pscore',
    replace=True,
    caliper=None
)

# 或启用卡尺匹配(更严格,推荐)
matched_df = nearest_neighbor_matching(
    df,
    treatment_col='TREAT',
    pscore_col='pscore',
    replace=True,
    caliper=0.05
)

print(f"匹配后样本量:{len(matched_df)}")
print(f"处理组:{(matched_df['TREAT']==1).sum()},对照组:{(matched_df['TREAT']==0).sum()}")

卡尺匹配 vs 普通最近邻

  • 普通最近邻:处理组每个个体强制匹配得分最接近的对照,不问差距有多大
  • 卡尺匹配:加一个「容忍阈值」,差距超过阈值的对照组直接拒绝匹配,更保守、偏误更小
  • 建议优先用卡尺匹配,卡尺一般取 0.01~0.05

Step 4|平衡性检验

匹配质量好不好,关键看协变量在匹配后是否「平衡」了。

标准化均值差异(SMD)计算

def calc_std_mean_diff(df, var, treatment_col, after_match=True):
    """计算标准化均值差异(SMD)"""
    treated = df[df[treatment_col] == 1][var]
    control = df[df[treatment_col] == 0][var]
    
    diff = treated.mean() - control.mean()
    pooled_std = np.sqrt((treated.var() + control.var()) / 2)
    
    smd = diff / pooled_std if pooled_std > 0 else 0
    return smd


# 修改这里:列出你的所有协变量
covariates = ['X1', 'X2', 'X3', 'X4', 'X5']

# 匹配前 SMD
smd_before = {var: calc_std_mean_diff(df, var, 'TREAT', False) for var in covariates}

# 匹配后 SMD
smd_after = {var: calc_std_mean_diff(matched_df, var, 'TREAT', True) for var in covariates}

# 汇总表格
balance_df = pd.DataFrame({
    '变量': covariates,
    '匹配前 SMD': [smd_before[v] for v in covariates],
    '匹配后 SMD': [smd_after[v] for v in covariates],
})

balance_df['SMD 改善幅度(%)'] = (
    1 - balance_df['匹配后 SMD'].abs() / 
    balance_df['匹配前 SMD'].abs().replace(0, np.nan)) * 100

print("=" * 55)
print("平衡性检验结果")
print("=" * 55)
print(balance_df.to_string(index=False))
print("=" * 55)

# 判断标准:|SMD| < 0.1 为理想平衡,< 0.2 为可接受
balance_df['是否平衡(<0.1)'] = balance_df['匹配后 SMD'].abs().apply(
    lambda x: '通过' if x < 0.1 else ('警告' if x < 0.2 else '不通过'))

print(balance_df[['变量', '匹配后 SMD', '是否平衡(<0.1)']].to_string(index=False))

可视化:匹配前后 SMD 对比

vars_order = balance_df['变量'].tolist()
x_pos = np.arange(len(vars_order))

plt.figure(figsize=(9, 5))
plt.bar(x_pos - 0.2, balance_df['匹配前 SMD'].abs(), 0.4,
        label='匹配前', color='coral', alpha=0.8)
plt.bar(x_pos + 0.2, balance_df['匹配后 SMD'].abs(), 0.4,
        label='匹配后', color='steelblue', alpha=0.8)
plt.axhline(y=0.1, color='red', linestyle='--', linewidth=1.5,
           label='理想阈值(|SMD|=0.1)')

plt.xticks(x_pos, vars_order, fontsize=11)
plt.ylabel('|SMD|', fontsize=12)
plt.title('匹配前后标准化均值差异(SMD)对比', fontsize=14)
plt.legend()
plt.tight_layout()
plt.savefig('smd_comparison.png', dpi=150)
plt.show()

print("平衡性检验图已保存")

合格标准:匹配后所有协变量的 |SMD| < 0.1(推荐)或 < 0.2(勉强接受)。如果某变量在匹配后 SMD 仍然很大,说明该变量的分布仍不平衡,需要重新审视协变量选择或匹配方法。

Step 5|ATT 估计

ATT(Average Treatment effect on the Treated)= 处理组的平均处理效应

# 匹配后各组的结局变量均值
att_treated = matched_df[matched_df['TREAT'] == 1]['Y'].mean()
att_control = matched_df[matched_df['TREAT'] == 0]['Y'].mean()

ATT = att_treated - att_control

print("=" * 45)
print("ATT 估计结果")
print("=" * 45)
print(f"处理组均值(匹配后):{att_treated:.4f}")
print(f"对照组均值(匹配后):{att_control:.4f}")
print(f"ATT(平均处理效应):{ATT:.4f}")
print("=" * 45)

# 配对 t 检验(检验 ATT 是否显著不同于零)
treated_Y = matched_df[matched_df['TREAT'] == 1][['Y', 'match_id']].set_index('match_id')
control_Y = matched_df[matched_df['TREAT'] == 0][['Y']].rename(columns={'Y': 'Y_control'})
control_Y.index.name = 'match_id'

paired = treated_Y.join(control_Y).dropna()
diff = paired['Y'] - paired['Y_control']
t_stat = diff.mean() / (diff.std() / np.sqrt(len(diff)))
p_value = 2 * (1 - stats.t.cdf(abs(t_stat), df=len(diff)-1))

from scipy import stats

print(f"\n配对 t 检验:t = {t_stat:.4f}, p = {p_value:.4f}")

if p_value < 0.01:
    print("结论:ATT 在 1% 水平上显著")
elif p_value < 0.05:
    print("结论:ATT 在 5% 水平上显著")
elif p_value < 0.1:
    print("结论:ATT 在 10% 水平上显著")
else:
    print("结论:ATT 不显著")


四、完整流程封装

把以上步骤封装成一个通用函数,方便以后直接调用:

def run_psm_analysis(df, treatment_col, outcome_col, covariate_list,
                     caliper=0.05, replace=True, verbose=True):
    """
    PSM 完整分析流程
    
    参数:
        df              : 原始数据 DataFrame
        treatment_col   : 处理组列名(如 'TREAT')
        outcome_col     : 结局变量列名(如 'Y')
        covariate_list  : 协变量列表(如 ['X1', 'X2', 'X3'])
        caliper         : 卡尺半径(None = 无卡尺)
        replace         : 是否允许有放回匹配
        verbose         : 是否打印详细结果
    
    返回:
        matched_df, ATT, balance_df
    """
    
    import statsmodels.formula.api as smf
    from scipy import stats
    import numpy as np
    
    # 1. 倾向得分估计
    formula = f"{treatment_col} ~ {' + '.join(covariate_list)}"
    logit = smf.logit(formula, data=df).fit(disp=False)
    df = df.copy()
    df['pscore'] = logit.predict(df)
    
    # 2. 最近邻匹配
    matched_df = nearest_neighbor_matching(
        df, treatment_col, 'pscore',
        replace=replace, caliper=caliper
    )
    
    # 3. ATT 估计
    treated_mean = matched_df[matched_df[treatment_col]==1][outcome_col].mean()
    control_mean = matched_df[matched_df[treatment_col]==0][outcome_col].mean()
    ATT = treated_mean - control_mean
    
    # 4. 平衡性检验
    smd_before = {}
    smd_after = {}
    for var in covariate_list:
        t = df[df[treatment_col]==1][var]
        c = df[df[treatment_col]==0][var]
        tm = matched_df[matched_df[treatment_col]==1][var]
        cm = matched_df[matched_df[treatment_col]==0][var]
        pooled = np.sqrt((t.var() + c.var()) / 2)
        smd_before[var] = (t.mean() - c.mean()) / pooled
        smd_after[var] = (tm.mean() - cm.mean()) / pooled
    
    balance_df = pd.DataFrame({
        '变量': covariate_list,
        '匹配前 SMD': [smd_before[v] for v in covariate_list],
        '匹配后 SMD': [smd_after[v] for v in covariate_list],
    })
    
    if verbose:
        print(f"\n{'='*45}")
        print(f"ATT 估计结果:{ATT:.4f}")
        print(f"处理组均值:{treated_mean:.4f}")
        print(f"对照组均值:{control_mean:.4f}")
        print(f"{'='*45}")
        print(balance_df.round(4).to_string(index=False))
    
    return matched_df, ATT, balance_df


# 使用示例(替换变量名即可):
# matched, att, balance = run_psm_analysis(
#     df=df,
#     treatment_col='TREAT',
#     outcome_col='Y',
#     covariate_list=['X1', 'X2', 'X3', 'X4', 'X5'],
#     caliper=0.05
# )


五、进阶:倾向得分加权(PSW)

除了匹配,还可以用**逆倾向得分加权(IPW)**来估计 ATT,思路是给每个样本赋予权重:

df = df.copy()
df['weight'] = np.where(
    df['TREAT'] == 1,
    1 / df['pscore'],              # 处理组:1 / pscore
    1 / (1 - df['pscore'])          # 对照组:1 / (1 - pscore)
)

# 加权 ATT
ATT_psw = (
    (df[df['TREAT']==1]['Y'] * df[df['TREAT']==1]['weight']).sum() /
    df[df['TREAT']==1]['weight'].sum()
    -
    (df[df['TREAT']==0]['Y'] * df[df['TREAT']==0]['weight']).sum() /
    df[df['TREAT']==0]['weight'].sum()
)

print(f"PSW 加权 ATT 估计值:{ATT_psw:.4f}")

PSW 的优势在于不损失样本量,但对极端倾向得分(接近0或1)非常敏感,需要做截断(trimming)或用稳健标准误。



六、常见「坑」与避坑指南

说明避坑方法
匹配后样本量骤降卡尺过严或重叠区太小调整卡尺,或在匹配前检查重叠区,必要时删去极端值
协变量选择错误遗漏了影响处理分配的变量,或加入了中介变量只选「影响处理分配但不中介处理-结局」的变量
匹配后 SMD 仍大某些变量匹配效果不好考虑增加协变量,或换用更严格的匹配方法(如半径匹配、核匹配)
倾向得分模型误设用线性概率模型(LPM)而非 Logit优先用 Logit,检查预测概率分布是否合理
未报告稳健标准误直接用样本均值差异做 t 检验,偏误较大用 Bootstrap 或聚类稳健标准误


七、结语

PSM 是因果推断中最常用也最易理解的「准实验」方法之一。它的本质是用倾向得分这把尺子,在对照组中找到处理组的「影子」,让比较变得公平。

今天这套代码覆盖了:

  • 倾向得分估计(statsmodels Logit)
  • 最近邻匹配(手写,无第三方依赖)
  • 卡尺匹配(更保守的匹配策略)
  • 平衡性检验(SMD 可视化)
  • ATT 估计与统计检验
  • 进阶:逆倾向得分加权(PSW)
浏览 (41)
点赞
收藏
删除
评论