代码教程 | 倾向得分匹配(PSM)Python 全流程:从理论到代码
做实证研究,最头疼的问题之一就是——处理组和对照组本来就不一样,直接比较「有失公允」。倾向得分匹配(PSM)正是解决这类问题的经典方法。今天我们用 Python,从原理到代码,把 PSM 讲透。
一、为什么需要 PSM?
想象一个场景:你想研究「参加培训」对员工工资的影响。
- 处理组:参加培训的员工
- 对照组:没参加培训的员工
问题是——参加培训的人本来可能就更上进、学历更高、更有能力。即使不参加培训,他们的工资也可能更高。直接比较得到的「培训效果」,其实是选择偏差在作祟。
PSM 的核心思想:为每个处理组个体,在对照组中找到一个「长得像」的匹配对象,使得两者在可观测特征上尽可能相似。这样比较的就是「反事实」——如果处理组个体没参加培训,工资会是多少?
二、PSM 三步走
- Step 1 → 构建倾向得分:用 Logit 回归估计每个个体「被处理」的概率
- Step 2 → 最近邻匹配:处理组找对照组中倾向得分最接近的个体
- Step 3 → ATT 估计:比较匹配后处理组与对照组的结局差异
下面一步步来。
三、代码实现
Step 0|环境准备
import pandas as pd
import numpy as np
import statsmodels.api as sm
import statsmodels.formula.api as smf
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore')
plt.rcParams['font.sans-serif'] = ['SimHei', 'Arial Unicode MS']
plt.rcParams['axes.unicode_minus'] = False
Step 1|读取数据
# 修改这里:替换为你的数据文件路径
df = pd.read_csv('YOUR_DATA_PATH.csv')
# 数据结构说明(请替换为你的真实变量名):
# - TREAT:处理组虚拟变量(1=处理组,0=对照组)
# - Y:结局变量(如工资、绩效等)
# - X1, X2, X3:协变量(用于估计倾向得分的控制变量)
print(f"样本量:{len(df)}")
print(f"处理组:{df['TREAT'].sum()},对照组:{(df['TREAT']==0).sum()}")
小提示:协变量的选择遵循「既影响处理状态、又影响结局变量,但不中介处理-结局关系」的原则——即著名的 CIA 条件(条件独立假设)。
Step 2|倾向得分估计
用 Logit 回归估计每个个体进入处理组的概率:
# 修改这里:列出所有协变量
formula = 'TREAT ~ X1 + X2 + X3 + X4 + X5'
# 方法一:用 statsmodels 公式接口(推荐,输出更清晰)
logit_model = smf.logit(formula, data=df).fit(disp=False)
# 查看回归结果(检查协变量显著性)
print(logit_model.summary())
# 提取倾向得分(每个个体「被处理」的概率)
df['pscore'] = logit_model.predict(df)
print(f"\n倾向得分范围:[{df['pscore'].min():.4f}, {df['pscore'].max():.4f}]")
print(f"倾向得分均值:{df['pscore'].mean():.4f}")
重要检验:如果处理组和对照组的倾向得分分布没有重叠区域(overlap assumption 违反),则无法进行有效匹配。需要先检查这一步。
重叠假设检验:
# 画出处理组 vs 对照组的倾向得分分布
treated_ps = df[df['TREAT'] == 1]['pscore']
control_ps = df[df['TREAT'] == 0]['pscore']
plt.figure(figsize=(8, 5))
plt.hist(treated_ps, bins=50, alpha=0.6, label='处理组', color='steelblue')
plt.hist(control_ps, bins=50, alpha=0.6, label='对照组', color='coral')
plt.xlabel('倾向得分', fontsize=12)
plt.ylabel('频数', fontsize=12)
plt.title('倾向得分分布对比(Overlap 检验)', fontsize=14)
plt.legend()
plt.tight_layout()
plt.savefig('pscore_distribution.png', dpi=150)
plt.show()
print("重叠假设检验图已保存")
合格标准:两条曲线有明显的重叠区域。如果处理组得分全在对照组右侧(或反之),则存在共同支撑域(common support)问题,需要删除不满足条件的样本。
Step 3|最近邻匹配
手写最近邻匹配(无依赖,逻辑透明)
def nearest_neighbor_matching(df, treatment_col, pscore_col, replace=True, caliper=None):
"""
最近邻匹配
参数:
df : DataFrame
treatment_col : 处理组列名
pscore_col : 倾向得分列名
replace : 是否允许有放回匹配(True=1:1最近邻,False=不重复匹配)
caliper : 卡尺半径(如 0.05),超过此距离不匹配
返回:
匹配后的 DataFrame,包含 match_id 列
"""
treated = df[df[treatment_col] == 1].copy()
control = df[df[treatment_col] == 0].copy()
matched_control_idx = []
for idx, row in treated.iterrows():
pscore_t = row[pscore_col]
# 计算与所有对照组的得分距离
distances = np.abs(control[pscore_col] - pscore_t)
if caliper is not None:
distances[distances > caliper] = np.inf
# 找到距离最小的对照组
min_idx = distances.idxmin()
# 不重复匹配(可选)
if not replace and min_idx in matched_control_idx:
distances[min_idx] = np.inf
min_idx = distances.idxmin()
matched_control_idx.append(min_idx)
# 构建匹配后的数据集
treated_matched = treated.copy()
treated_matched['match_id'] = matched_control_idx
control_matched = control.loc[matched_control_idx].copy()
control_matched['match_id'] = matched_control_idx
control_matched.index.name = 'original_index'
# 合并
matched_df = pd.concat([treated_matched, control_matched], ignore_index=True)
return matched_df
# 不设卡尺(默认1:1最近邻,有放回)
matched_df = nearest_neighbor_matching(
df,
treatment_col='TREAT',
pscore_col='pscore',
replace=True,
caliper=None
)
# 或启用卡尺匹配(更严格,推荐)
matched_df = nearest_neighbor_matching(
df,
treatment_col='TREAT',
pscore_col='pscore',
replace=True,
caliper=0.05
)
print(f"匹配后样本量:{len(matched_df)}")
print(f"处理组:{(matched_df['TREAT']==1).sum()},对照组:{(matched_df['TREAT']==0).sum()}")
卡尺匹配 vs 普通最近邻:
- 普通最近邻:处理组每个个体强制匹配得分最接近的对照,不问差距有多大
- 卡尺匹配:加一个「容忍阈值」,差距超过阈值的对照组直接拒绝匹配,更保守、偏误更小
- 建议优先用卡尺匹配,卡尺一般取 0.01~0.05
Step 4|平衡性检验
匹配质量好不好,关键看协变量在匹配后是否「平衡」了。
标准化均值差异(SMD)计算:
def calc_std_mean_diff(df, var, treatment_col, after_match=True):
"""计算标准化均值差异(SMD)"""
treated = df[df[treatment_col] == 1][var]
control = df[df[treatment_col] == 0][var]
diff = treated.mean() - control.mean()
pooled_std = np.sqrt((treated.var() + control.var()) / 2)
smd = diff / pooled_std if pooled_std > 0 else 0
return smd
# 修改这里:列出你的所有协变量
covariates = ['X1', 'X2', 'X3', 'X4', 'X5']
# 匹配前 SMD
smd_before = {var: calc_std_mean_diff(df, var, 'TREAT', False) for var in covariates}
# 匹配后 SMD
smd_after = {var: calc_std_mean_diff(matched_df, var, 'TREAT', True) for var in covariates}
# 汇总表格
balance_df = pd.DataFrame({
'变量': covariates,
'匹配前 SMD': [smd_before[v] for v in covariates],
'匹配后 SMD': [smd_after[v] for v in covariates],
})
balance_df['SMD 改善幅度(%)'] = (
1 - balance_df['匹配后 SMD'].abs() /
balance_df['匹配前 SMD'].abs().replace(0, np.nan)) * 100
print("=" * 55)
print("平衡性检验结果")
print("=" * 55)
print(balance_df.to_string(index=False))
print("=" * 55)
# 判断标准:|SMD| < 0.1 为理想平衡,< 0.2 为可接受
balance_df['是否平衡(<0.1)'] = balance_df['匹配后 SMD'].abs().apply(
lambda x: '通过' if x < 0.1 else ('警告' if x < 0.2 else '不通过'))
print(balance_df[['变量', '匹配后 SMD', '是否平衡(<0.1)']].to_string(index=False))
可视化:匹配前后 SMD 对比:
vars_order = balance_df['变量'].tolist()
x_pos = np.arange(len(vars_order))
plt.figure(figsize=(9, 5))
plt.bar(x_pos - 0.2, balance_df['匹配前 SMD'].abs(), 0.4,
label='匹配前', color='coral', alpha=0.8)
plt.bar(x_pos + 0.2, balance_df['匹配后 SMD'].abs(), 0.4,
label='匹配后', color='steelblue', alpha=0.8)
plt.axhline(y=0.1, color='red', linestyle='--', linewidth=1.5,
label='理想阈值(|SMD|=0.1)')
plt.xticks(x_pos, vars_order, fontsize=11)
plt.ylabel('|SMD|', fontsize=12)
plt.title('匹配前后标准化均值差异(SMD)对比', fontsize=14)
plt.legend()
plt.tight_layout()
plt.savefig('smd_comparison.png', dpi=150)
plt.show()
print("平衡性检验图已保存")
合格标准:匹配后所有协变量的 |SMD| < 0.1(推荐)或 < 0.2(勉强接受)。如果某变量在匹配后 SMD 仍然很大,说明该变量的分布仍不平衡,需要重新审视协变量选择或匹配方法。
Step 5|ATT 估计
ATT(Average Treatment effect on the Treated)= 处理组的平均处理效应
# 匹配后各组的结局变量均值
att_treated = matched_df[matched_df['TREAT'] == 1]['Y'].mean()
att_control = matched_df[matched_df['TREAT'] == 0]['Y'].mean()
ATT = att_treated - att_control
print("=" * 45)
print("ATT 估计结果")
print("=" * 45)
print(f"处理组均值(匹配后):{att_treated:.4f}")
print(f"对照组均值(匹配后):{att_control:.4f}")
print(f"ATT(平均处理效应):{ATT:.4f}")
print("=" * 45)
# 配对 t 检验(检验 ATT 是否显著不同于零)
treated_Y = matched_df[matched_df['TREAT'] == 1][['Y', 'match_id']].set_index('match_id')
control_Y = matched_df[matched_df['TREAT'] == 0][['Y']].rename(columns={'Y': 'Y_control'})
control_Y.index.name = 'match_id'
paired = treated_Y.join(control_Y).dropna()
diff = paired['Y'] - paired['Y_control']
t_stat = diff.mean() / (diff.std() / np.sqrt(len(diff)))
p_value = 2 * (1 - stats.t.cdf(abs(t_stat), df=len(diff)-1))
from scipy import stats
print(f"\n配对 t 检验:t = {t_stat:.4f}, p = {p_value:.4f}")
if p_value < 0.01:
print("结论:ATT 在 1% 水平上显著")
elif p_value < 0.05:
print("结论:ATT 在 5% 水平上显著")
elif p_value < 0.1:
print("结论:ATT 在 10% 水平上显著")
else:
print("结论:ATT 不显著")
四、完整流程封装
把以上步骤封装成一个通用函数,方便以后直接调用:
def run_psm_analysis(df, treatment_col, outcome_col, covariate_list,
caliper=0.05, replace=True, verbose=True):
"""
PSM 完整分析流程
参数:
df : 原始数据 DataFrame
treatment_col : 处理组列名(如 'TREAT')
outcome_col : 结局变量列名(如 'Y')
covariate_list : 协变量列表(如 ['X1', 'X2', 'X3'])
caliper : 卡尺半径(None = 无卡尺)
replace : 是否允许有放回匹配
verbose : 是否打印详细结果
返回:
matched_df, ATT, balance_df
"""
import statsmodels.formula.api as smf
from scipy import stats
import numpy as np
# 1. 倾向得分估计
formula = f"{treatment_col} ~ {' + '.join(covariate_list)}"
logit = smf.logit(formula, data=df).fit(disp=False)
df = df.copy()
df['pscore'] = logit.predict(df)
# 2. 最近邻匹配
matched_df = nearest_neighbor_matching(
df, treatment_col, 'pscore',
replace=replace, caliper=caliper
)
# 3. ATT 估计
treated_mean = matched_df[matched_df[treatment_col]==1][outcome_col].mean()
control_mean = matched_df[matched_df[treatment_col]==0][outcome_col].mean()
ATT = treated_mean - control_mean
# 4. 平衡性检验
smd_before = {}
smd_after = {}
for var in covariate_list:
t = df[df[treatment_col]==1][var]
c = df[df[treatment_col]==0][var]
tm = matched_df[matched_df[treatment_col]==1][var]
cm = matched_df[matched_df[treatment_col]==0][var]
pooled = np.sqrt((t.var() + c.var()) / 2)
smd_before[var] = (t.mean() - c.mean()) / pooled
smd_after[var] = (tm.mean() - cm.mean()) / pooled
balance_df = pd.DataFrame({
'变量': covariate_list,
'匹配前 SMD': [smd_before[v] for v in covariate_list],
'匹配后 SMD': [smd_after[v] for v in covariate_list],
})
if verbose:
print(f"\n{'='*45}")
print(f"ATT 估计结果:{ATT:.4f}")
print(f"处理组均值:{treated_mean:.4f}")
print(f"对照组均值:{control_mean:.4f}")
print(f"{'='*45}")
print(balance_df.round(4).to_string(index=False))
return matched_df, ATT, balance_df
# 使用示例(替换变量名即可):
# matched, att, balance = run_psm_analysis(
# df=df,
# treatment_col='TREAT',
# outcome_col='Y',
# covariate_list=['X1', 'X2', 'X3', 'X4', 'X5'],
# caliper=0.05
# )
五、进阶:倾向得分加权(PSW)
除了匹配,还可以用**逆倾向得分加权(IPW)**来估计 ATT,思路是给每个样本赋予权重:
df = df.copy()
df['weight'] = np.where(
df['TREAT'] == 1,
1 / df['pscore'], # 处理组:1 / pscore
1 / (1 - df['pscore']) # 对照组:1 / (1 - pscore)
)
# 加权 ATT
ATT_psw = (
(df[df['TREAT']==1]['Y'] * df[df['TREAT']==1]['weight']).sum() /
df[df['TREAT']==1]['weight'].sum()
-
(df[df['TREAT']==0]['Y'] * df[df['TREAT']==0]['weight']).sum() /
df[df['TREAT']==0]['weight'].sum()
)
print(f"PSW 加权 ATT 估计值:{ATT_psw:.4f}")
PSW 的优势在于不损失样本量,但对极端倾向得分(接近0或1)非常敏感,需要做截断(trimming)或用稳健标准误。
六、常见「坑」与避坑指南
| 坑 | 说明 | 避坑方法 |
|---|---|---|
| 匹配后样本量骤降 | 卡尺过严或重叠区太小 | 调整卡尺,或在匹配前检查重叠区,必要时删去极端值 |
| 协变量选择错误 | 遗漏了影响处理分配的变量,或加入了中介变量 | 只选「影响处理分配但不中介处理-结局」的变量 |
| 匹配后 SMD 仍大 | 某些变量匹配效果不好 | 考虑增加协变量,或换用更严格的匹配方法(如半径匹配、核匹配) |
| 倾向得分模型误设 | 用线性概率模型(LPM)而非 Logit | 优先用 Logit,检查预测概率分布是否合理 |
| 未报告稳健标准误 | 直接用样本均值差异做 t 检验,偏误较大 | 用 Bootstrap 或聚类稳健标准误 |
七、结语
PSM 是因果推断中最常用也最易理解的「准实验」方法之一。它的本质是用倾向得分这把尺子,在对照组中找到处理组的「影子」,让比较变得公平。
今天这套代码覆盖了:
- 倾向得分估计(statsmodels Logit)
- 最近邻匹配(手写,无第三方依赖)
- 卡尺匹配(更保守的匹配策略)
- 平衡性检验(SMD 可视化)
- ATT 估计与统计检验
- 进阶:逆倾向得分加权(PSW)
