deequ data-column profile 代码分析
admin
2024-02-04 04:27:01

data-column profile 的String 类型侦测

deequ的profile 带来了一个 string 类型的字段类型侦测,以deequ的例子来说明这个特性,数据定义为:
case class RawData(productName: String, totalNumber: String, status: String, valuable: String)

但输出的结果如下
Column ‘productName’:
completeness: 1.0
approximate number of distinct values: 5
datatype: String

Column ‘totalNumber’:
completeness: 1.0
approximate number of distinct values: 6
datatype: Fractional

这个侦测是通过class StatefulDataType 来实现的。

object ColumnProfiler 核心代码

核心执行代码在object ColumnProfiler的profile 函数中,具体如下:

/*** Profile a (potentially very large) dataset.** @param data                             data dataset as dataframe* @param restrictToColumns                an contain a subset of columns to profile, otherwise*                                         all columns will be considered ,限制列,默认所有列* @param printStatusUpdates              是否打印进度* @param lowCardinalityHistogramThreshold the maximum (estimated) number of distinct values*                                         in a column until which we should compute exact*                                         histograms for it (defaults to 120)*                                         是否计算直方图的阀值,对值数量小于阀值的计算直方图* @param metricsRepository                the repo to store metrics 保存的repository* @param reuseExistingResultsUsingKey     key for reuse existing result  使用已有的结果,即Repository里面的数据* @param failIfResultsForReusingMissing   true if we have results for reusing* @param saveInMetricsRepositoryUsingKey  key for saving in metrics repo   保存KEY* @param kllProfiling                     是否自定义了KLL 参数,kll 可以参考https://www.51cto.com/article/711445.html* @param kllParameters                    parameters for KLL Sketches ,注意是否使用kll与此参数无关,具体参考doAnalysisRun* @param predefinedTypes                  预先定义的column Type,如果string column 有定义Type,将不进行detect, 具体见getAnalyzersForGenericStats 函数* @return the profile of columns*/// scalastyle:off argcountprivate[deequ] def profile(data: DataFrame,restrictToColumns: Option[Seq[String]] = None,printStatusUpdates: Boolean = false,lowCardinalityHistogramThreshold: Int =ColumnProfiler.DEFAULT_CARDINALITY_THRESHOLD,metricsRepository: Option[MetricsRepository] = None,reuseExistingResultsUsingKey: Option[ResultKey] = None,failIfResultsForReusingMissing: Boolean = false,saveInMetricsRepositoryUsingKey: Option[ResultKey] = None,kllProfiling: Boolean = false,kllParameters: Option[KLLParameters] = None,predefinedTypes: Map[String, DataTypeInstances.Value] = Map.empty): ColumnProfiles = {// Ensure that all desired columns existrestrictToColumns.foreach { restrictToColumns =>restrictToColumns.foreach { columnName =>require(data.schema.fieldNames.contains(columnName), s"Unable to find column $columnName")}}// Find columns we want to profileval relevantColumns = getRelevantColumns(data.schema, restrictToColumns)// First passif (printStatusUpdates) {println("### PROFILING: Computing generic column statistics in pass (1/3)...")}// We compute completeness, approximate number of distinct values// and type detection for string columns in the first passval analyzersForGenericStats = getAnalyzersForGenericStats(data.schema,relevantColumns,predefinedTypes)//使用AnalysisRunner 运行,但不是用addcheck,改用addAnalyzers,check 最后也是转为Analyzer// check 转analyzers  代码在 private[deequ] def doVerificationRun( 中的如下代码//  val analyzers = requiredAnalyzers ++ checks.flatMap { _.requiredAnalyzers() }// val analysisResults = AnalysisRunner.doAnalysisRun(//  data,//   analyzers.distinct,////var analysisRunnerFirstPass = AnalysisRunner.onData(data).addAnalyzers(analyzersForGenericStats).addAnalyzer(Size())analysisRunnerFirstPass = setMetricsRepositoryConfigurationIfNecessary(analysisRunnerFirstPass,metricsRepository,reuseExistingResultsUsingKey,failIfResultsForReusingMissing,saveInMetricsRepositoryUsingKey)//使用AnalysisRunner 运行,和constraint 相同val firstPassResults = analysisRunnerFirstPass.run()val genericStatistics = extractGenericStatistics(relevantColumns,data.schema,firstPassResults,predefinedTypes)// Second passif (printStatusUpdates) {println("### PROFILING: Computing numeric column statistics in pass (2/3)...")}// We cast all string columns that were detected as numericval castedDataForSecondPass = castNumericStringColumns(relevantColumns, data,genericStatistics)// We compute mean, stddev, min, max for all numeric columnsval analyzersForSecondPass = getAnalyzersForSecondPass(relevantColumns,genericStatistics, kllProfiling, kllParameters)var analysisRunnerSecondPass = AnalysisRunner.onData(castedDataForSecondPass).addAnalyzers(analyzersForSecondPass)analysisRunnerSecondPass = setMetricsRepositoryConfigurationIfNecessary(analysisRunnerSecondPass,metricsRepository,reuseExistingResultsUsingKey,failIfResultsForReusingMissing,saveInMetricsRepositoryUsingKey)//使用AnalysisRunner 运行,和constraint 相同val secondPassResults = analysisRunnerSecondPass.run()val numericStatistics = extractNumericStatistics(secondPassResults)// Third passif (printStatusUpdates) {println("### PROFILING: Computing histograms of low-cardinality columns in pass (3/3)...")}// We compute exact histograms for all low-cardinality string columns, find those hereval targetColumnsForHistograms = findTargetColumnsForHistograms(data.schema, genericStatistics,lowCardinalityHistogramThreshold)// Find out, if we have values for those we can reuseval analyzerContextExistingValues = getAnalyzerContextWithHistogramResultsForReusingIfNecessary(metricsRepository,reuseExistingResultsUsingKey,targetColumnsForHistograms)// The columns we need to calculate the histograms forval nonExistingHistogramColumns = targetColumnsForHistograms.filter { column => analyzerContextExistingValues.metricMap.get(Histogram(column)).isEmpty }// Calculate and save/append results if necessaryval histograms: Map[String, Distribution] = getHistogramsForThirdPass(data,nonExistingHistogramColumns,analyzerContextExistingValues,printStatusUpdates,failIfResultsForReusingMissing,metricsRepository,saveInMetricsRepositoryUsingKey)val thirdPassResults = CategoricalColumnStatistics(histograms)createProfiles(relevantColumns, genericStatistics, numericStatistics, thirdPassResults)}

限制分析范围及避免String Column Detection

参考以下代码可以指定,通过输入column name Seq 可以指定分析column范围。

/*** Can be used to specify a subset of columns to look at* 限制3分析的colum 列表* @param restrictToColumns The columns to look at*/
def restrictToColumns(restrictToColumns: Seq[String]): this.type = {this.restrictToColumns = Option(restrictToColumns)this
}

前面也提到了predefinedTypes可以避免String Column Detection,以下是实现逻辑。

//通过DataType 函数进行String 类型的detection
private[this] def getAnalyzersForGenericStats(schema: StructType,relevantColumns: Seq[String],predefinedTypes: Map[String, DataTypeInstances.Value]): Seq[Analyzer[_, Metric[_]]] = {schema.fields.filter { field => relevantColumns.contains(field.name) }.flatMap { field =>val name = field.nameif (field.dataType == StringType && !predefinedTypes.contains(name)) {Seq(Completeness(name), ApproxCountDistinct(name), DataType(name))} else {Seq(Completeness(name), ApproxCountDistinct(name))}}
}

相关内容

热门资讯

昭通市幼儿园:国粹育幼儿 游园... 在六一国际儿童节即将来临之际,5月29日,昭通市幼儿园幼儿、家长齐上阵,欢度六一儿童节。国粹京剧精彩...
米拉日巴佛阁位于甘南合作市郊 米拉日巴佛阁位于甘南合作市郊,距离市中心约3公里,是一座红色的藏式高层建筑。佛阁的高层宗教建筑在藏区...
原创 5... 要知道,5月27日赵子豪在上海迪士尼的照片和短文在社交平台上被不少人热聊,他背着树懒卡通包,还配了句...
沉浸式露营体验!长春这家河畔休... 露营,作为一种亲近自然、放松身心的休闲方式,越来越受到人们的喜爱。然而,传统的露营需要准备大量的装备...