本文共 13168 字,大约阅读时间需要 43 分钟。
Python数据预处理过程:利用统计学对数据进行检验,对连续属性检验正态分布,针对正态分布属性继续使用t检验检验方差齐次性,针对非正态分布使用Mann-Whitney检验。针对分类变量进行卡方检验(涉及三种卡方的检验:Pearson卡方,校准卡方,精准卡方)等。
不懂卡方的原理可以参考:
卡方检验具体的使用准则# 四格表卡方检验用于进行两个率或两个构成比的比较。# 要求样本含量应大于40且每个格子中的理论频数不应小于5。# 当样本含量大于40但理论频数有小于5的情况时卡方值需要校正,当样本含量小于40时只能用确切概率法计算概率。# (1)所有的理论数T≥5并且总样本量n≥40,用Pearson卡方进行检验。# (2)如果理论数T<5但T≥1,并且总样本量n≥40,用连续性校正的卡方进行检验。# (3)如果有理论数T<1或n<40,则用Fisher’s检验。
具体的代码中注释很详细:
def output_statistics_info_self(data_df, category_feats, continue_feats, target,logger,nan_value=-1,info_more=True): ''' Function:输出最全的数据的描述信息 Parameters: data_df:DataFrame the source data category_feats:list continue_feats:list target:the classification target,as the y nan_value:default -1,represent the nan value need to be filled info_more:default True,output the whole info;False:output the part info for client and paper Return: DataFrame ''' sample_size = data_df.shape[0] # 判断target是二分类还是多分类(三分类及其以上) target_values=list(data_df[target].value_counts().index) logger.info('%s取值%s'%(target,target_values)) task_type=len(target_values) total_describe_list=[] # 警告:做单因素分析之前必须要异常值检查,排出非数字的异常值,否则报错 # data_df[continue_feats]=data_df[continue_feats].applymap(float) # data_df[category_feats]=data_df[category_feats].applymap(float) # 针对二分类任务 if task_type==2: # 针对连续属性 # 先检验连续属性是否是正态分布其次再检验是否方差齐次,才能使用独立t检验 for col in continue_feats: logger.info('------%s--------'%col) col_series=data_df[data_df[col]!=nan_value][col] col_count=col_series.count() vals=[col,'连续',col_count] # 检验连续属性是否符合正态分布 p_value=norm_distribution_test(sample_size,col_series) # 如果p_value>0.05 正态分布,使用独立t检验,检验连续属性在两组样本方差相同的情况下它们的均值是否相同 condition0=(data_df[target] == target_values[0]) & (data_df[col] != nan_value) condition1=(data_df[target] == target_values[1]) & (data_df[col] != nan_value) if p_value > 0.05: logger.info('%s符合正态分布'%col) # 使用levene检验方差齐次 stat,pval=levene(data_df[condition0][col].values,data_df[condition1][col].values) if pval>0.05: # p值大于0.05,认为两总体具有方差齐性。 t_stat, pvalue = ttest_ind(data_df[condition0][col].values,data_df[condition1][col].values, equal_var=True) else: # 两总体方差不齐 t_stat, pvalue = ttest_ind(data_df[condition0][col].values,data_df[condition1][col].values, equal_var=False) pvalue=round(pvalue,3) if pvalue==0: pvalue='<0.001' vals.extend(['是_%s'%p_value,'ttest',t_stat,pvalue,'']) #非正态分布的二分类使用Mann-Whitney U test检验 else: logger.info('%s不符合正态分布'%col) m_stat, pvalue = mannwhitneyu( data_df[condition0][col].values,data_df[condition1][col].values, use_continuity=False,alternative='two-sided' ) pvalue=round(pvalue,3) if pvalue==0: pvalue='<0.001' vals.extend(['否_%s'%p_value,'Mann',m_stat,pvalue,'']) # 对连续变量输出均值±标准差 # 并在括号中附上IQR值(75%分位点-25%分位点的值),查看连续属性中间部分是否集中或者分散 target0_col_iqr=round(iqr(x=data_df[condition0][col].values,nan_policy='omit'),3) target_0_mean_std="%.2f±%.2f (%s)" %(data_df[condition0][col].mean(), data_df[condition0][col].std(),target0_col_iqr) target1_col_iqr=round(iqr(x=data_df[condition1][col].values,nan_policy='omit'),3) target_1_mean_std="%.2f±%.2f (%s)" %(data_df[condition1][col].mean(), data_df[condition1][col].std(),target1_col_iqr) vals.extend([target_0_mean_std,target_1_mean_std]) total_describe_list.append(vals) # 针对分类变量使用"卡方检验" for col in category_feats: logger.info('#######%s######'%col) col_series=data_df[data_df[col]!=nan_value][col] col_count=col_series.count() col_count_ser=col_series.value_counts() vals=[col,'分类',col_count,'','卡方'] data_kf = data_df[data_df[col]!=nan_value][[col,target]] cross_table = data_kf.groupby([col, target])[target].count().unstack() cross_table.fillna(0,inplace=True) logger.info(cross_table) if len(col_count_ser)==2: stat,pvalue=foursquare_chi_test(cross_table,col_count) vals.extend([stat,pvalue,'']) else: stat,pvalue,iswarning=not_foursquare_chi_test(cross_table) vals.extend([stat,pvalue,iswarning]) vals.extend(['','']) total_describe_list.append(vals) # 针对分类变量输出各个类别的target比例 for col_kind in col_count_ser.index: logger.info('col_kind:%s'%col_kind) col_kind_percent=['%s_%s'%(col,col_kind),'','','','','','',''] for v in target_values: col_kind_percent.append("%d(%.1f%%)" % (data_df[((data_df[col] == col_kind) & (data_df[target] == v))].shape[0], data_df[((data_df[col] == col_kind) & (data_df[target] == v))].shape[0] / data_df[((data_df[col]!=nan_value)&(data_df[target] == v))].shape[0]*100)) total_describe_list.append(col_kind_percent) # 针对三分类或者多分类 elif task_type>=3: # "先判断是否方差齐次,才能使用独立t检验" for col in continue_feats: logger.info('----!!!--%s--------'%col) col_series=data_df[data_df[col]!=nan_value][col] col_count=col_series.count() vals=[col,'连续',col_count] p_value=norm_distribution_test(sample_size,col_series) if p_value > 0.05:#正态分布 # 1-way ANOVA:原假设:两个或多个group拥有相同的均值 # 使用的前提条件:1、样本独立,2、每个样本都来源于正态分布群体,3、每个group方差齐次(方差相同) # 以上条件不满足时:使用Kruskal-Wallis H-test df = data_df[[col, target]] # 排出填补的那些值 df = df[df[col] != nan_value] stat, pvalue = f_oneway( df[df[target] == target_values[0]][col].values, df[df[target] == target_values[1]][col].values, df[df[target] == target_values[2]][col].values ) pvalue=round(pvalue,3) if pvalue==0: pvalue='<0.001' vals.extend(['是_%s'%p_value,'anova',round(stat,3),pvalue]) else: # 非正态分布 # Compute the Kruskal-Wallis H-test for independent samples df = data_df[[col, target]] df = data_df[data_df[col] != nan_value] stat, pvalue = kruskalwallis(df[df[target] == target_values[0]][col].values, df[df[target] == target_values[1]][col].values, df[df[target] == target_values[2]][col].values) pvalue=round(pvalue,3) if pvalue==0: pvalue='<0.001' vals.extend(['否_%s'%p_value,'kruskal',round(stat,3),pvalue]) # 对连续变量输出均值±标准差,以及IQR值 condition0=(data_df[target] == target_values[0]) & (data_df[col] != nan_value) target0_col_iqr=round(iqr(x=data_df[condition0][col].values,nan_policy='omit'),3) target_0_mean_std="%.2f±%.2f (%s)" %(data_df[condition0][col].mean(), data_df[condition0][col].std(),target0_col_iqr) condition1=(data_df[target] == target_values[1]) & (data_df[col] != nan_value) target1_col_iqr=round(iqr(x=data_df[condition1][col].values,nan_policy='omit'),3) target_1_mean_std="%.2f±%.2f (%s)" %(data_df[condition1][col].mean(), data_df[condition1][col].std(),target1_col_iqr) condition2=(data_df[target] == target_values[2]) & (data_df[col] != nan_value) target2_col_iqr=round( iqr(x=data_df[condition2][col].values,nan_policy='omit'),3) target_2_mean_std="%.2f±%.2f (%s)" %(data_df[condition2][col].mean(), data_df[condition2][col].std(),target2_col_iqr) vals.extend([target_0_mean_std,target_1_mean_std,target_2_mean_std]) total_describe_list.append(vals) for col in category_feats: logger.info('#######%s######'%col) col_series=data_df[data_df[col]!=nan_value][col] col_count=col_series.count() vals=[col,'分类',col_count,'','卡方'] data_kf = data_df[data_df[col] != nan_value][[col, target]] cross_table = data_kf.groupby([col, target])[target].count().unstack() cross_table.fillna(0,inplace=True) logger.info(cross_table) stat,pvalue,iswarning=not_foursquare_chi_test(cross_table) vals.extend([stat,pvalue,iswarning]) vals.extend(['','','']) total_describe_list.append(vals) # 对类别属性输出各类别的比例 for col_kind in col_series.index: logger.info('col_kind:%s'%col_kind) if col_kind!=nan_value: col_kind_percent=['%s_%s'%(col,col_kind),'','','','','',''] for v in target_values: col_kind_percent.append("%d(%.2f)" % (data_df[((data_df[col] == col_kind) & (data_df[target] == v))].shape[0], data_df[((data_df[col] == col_kind) & (data_df[target] == v))].shape[0] / data_df[data_df[col] == col_kind].shape[0])) total_describe_list.append(col_kind_percent) columns = ['属性','属性类别','有效值','是否正态分布', '检验方法', '统计量', 'pvalue','卡方warning'] for v in target_values: columns.append("target_{0}".format(v)) total_describe_df = pd.DataFrame(total_describe_list, columns=columns) # 输出额外的更多详细信息 if info_more==True: # 新增缺失情况统计,缺失情况、最小值、最大值、均值、标准差 info_add_list=[] for col in continue_feats+category_feats: col_series=data_df[data_df[col]!=nan_value][col] miss_count=data_df[data_df[col]==nan_value][col].count() if miss_count==0: _miss='' else: miss_ratio=round(miss_count/sample_size*100,2) _miss='%s(%.1f%%)'%(miss_count,miss_ratio) vals_info=[col,_miss] if col in continue_feats: vals_info.extend([ round(col_series.min(), 2),round(col_series.max(), 2), round(col_series.mean(), 2),round(col_series.std(), 2), round(iqr(x=col_series.values,nan_policy='omit'),2) ] ) elif col in category_feats: vals_info.extend(['','','','','']) info_add_list.append(vals_info) add_columns=['属性','缺失情况','最小值','最大值','均值','标准差','IQR'] info_add_df= pd.DataFrame(info_add_list, columns=add_columns) total_describe_df=total_describe_df.merge(info_add_df,on='属性',how='outer') return total_describe_df
def norm_distribution_test(sample_size,_series): # 连续属性是否符合正态分布 # 样本大于5000:Kolmogorov-Smirnov test # 样本小于5000:shapiro-wilk if sample_size > 5000: ks_stat, p_value = kstest(_series ,'norm') else: s_stat, p_value = shapiro(_series) p_value=round(p_value,3) return p_value
def foursquare_chi_test(cross_table,col_count): # 四格表卡方检验用于进行两个率或两个构成比的比较。 # 要求样本含量应大于40且每个格子中的理论频数不应小于5。 # 当样本含量大于40但理论频数有小于5的情况时卡方值需要校正,当样本含量小于40时只能用确切概率法计算概率。 # (1)所有的理论数T≥5并且总样本量n≥40,用Pearson卡方进行检验。 # (2)如果理论数T<5但T≥1,并且总样本量n≥40,用连续性校正的卡方进行检验。 # (3)如果有理论数T<1或n<40,则用Fisher’s检验。 stat, pvalue, dof, expected = chi2_contingency(cross_table,correction=False) if col_count>=40 and expected.min()>=5: # Pearson卡方进行检验 stat, pvalue, dof, expected = chi2_contingency(cross_table,correction=False) elif col_count>=40 and expected.min()<5 and expected.min()>=1: # 连续性校正的卡方进行检验 stat, pvalue, dof, expected = chi2_contingency(cross_table,correction=True) else: # 用Fisher’s检验 stat,pvalue=fisher_exact(cross_table) stat=round(stat,3) pvalue=round(pvalue,3) if pvalue==0: pvalue='<0.001' return stat,pvaluedef not_foursquare_chi_test(cross_table): # 针对非四方表格的卡方检验 # (1)如果rxc表格中最小的理论数<1,报警告 # (2)如果rxc表格中最小的理论数<5的个数占比超过>1/5,报警告 # (3)其他情况下,使用Pearson检验 iswarning='' stat, pvalue, dof, expected = chi2_contingency(cross_table,correction=False) if expected.min()<1 or len([v for v in expected.reshape(1,-1)[0] if v<5])/\ (expected.shape[0]*expected.shape[1])>0.2: iswarning='warning' stat=round(stat,3) pvalue=round(pvalue,3) if pvalue==0: pvalue='<0.001' return stat,pvalue,iswarning
转载地址:http://oilrb.baihongyu.com/