deequ的profile 带来了一个 string 类型的字段类型侦测,以deequ的例子来说明这个特性,数据定义为:
case class RawData(productName: String, totalNumber: String, status: String, valuable: String)
但输出的结果如下
Column ‘productName’:
completeness: 1.0
approximate number of distinct values: 5
datatype: String
Column ‘totalNumber’:
completeness: 1.0
approximate number of distinct values: 6
datatype: Fractional
这个侦测是通过class StatefulDataType 来实现的。
核心执行代码在object ColumnProfiler的profile 函数中,具体如下:
/*** Profile a (potentially very large) dataset.** @param data data dataset as dataframe* @param restrictToColumns an contain a subset of columns to profile, otherwise* all columns will be considered ,限制列,默认所有列* @param printStatusUpdates 是否打印进度* @param lowCardinalityHistogramThreshold the maximum (estimated) number of distinct values* in a column until which we should compute exact* histograms for it (defaults to 120)* 是否计算直方图的阀值,对值数量小于阀值的计算直方图* @param metricsRepository the repo to store metrics 保存的repository* @param reuseExistingResultsUsingKey key for reuse existing result 使用已有的结果,即Repository里面的数据* @param failIfResultsForReusingMissing true if we have results for reusing* @param saveInMetricsRepositoryUsingKey key for saving in metrics repo 保存KEY* @param kllProfiling 是否自定义了KLL 参数,kll 可以参考https://www.51cto.com/article/711445.html* @param kllParameters parameters for KLL Sketches ,注意是否使用kll与此参数无关,具体参考doAnalysisRun* @param predefinedTypes 预先定义的column Type,如果string column 有定义Type,将不进行detect, 具体见getAnalyzersForGenericStats 函数* @return the profile of columns*/// scalastyle:off argcountprivate[deequ] def profile(data: DataFrame,restrictToColumns: Option[Seq[String]] = None,printStatusUpdates: Boolean = false,lowCardinalityHistogramThreshold: Int =ColumnProfiler.DEFAULT_CARDINALITY_THRESHOLD,metricsRepository: Option[MetricsRepository] = None,reuseExistingResultsUsingKey: Option[ResultKey] = None,failIfResultsForReusingMissing: Boolean = false,saveInMetricsRepositoryUsingKey: Option[ResultKey] = None,kllProfiling: Boolean = false,kllParameters: Option[KLLParameters] = None,predefinedTypes: Map[String, DataTypeInstances.Value] = Map.empty): ColumnProfiles = {// Ensure that all desired columns existrestrictToColumns.foreach { restrictToColumns =>restrictToColumns.foreach { columnName =>require(data.schema.fieldNames.contains(columnName), s"Unable to find column $columnName")}}// Find columns we want to profileval relevantColumns = getRelevantColumns(data.schema, restrictToColumns)// First passif (printStatusUpdates) {println("### PROFILING: Computing generic column statistics in pass (1/3)...")}// We compute completeness, approximate number of distinct values// and type detection for string columns in the first passval analyzersForGenericStats = getAnalyzersForGenericStats(data.schema,relevantColumns,predefinedTypes)//使用AnalysisRunner 运行,但不是用addcheck,改用addAnalyzers,check 最后也是转为Analyzer// check 转analyzers 代码在 private[deequ] def doVerificationRun( 中的如下代码// val analyzers = requiredAnalyzers ++ checks.flatMap { _.requiredAnalyzers() }// val analysisResults = AnalysisRunner.doAnalysisRun(// data,// analyzers.distinct,////var analysisRunnerFirstPass = AnalysisRunner.onData(data).addAnalyzers(analyzersForGenericStats).addAnalyzer(Size())analysisRunnerFirstPass = setMetricsRepositoryConfigurationIfNecessary(analysisRunnerFirstPass,metricsRepository,reuseExistingResultsUsingKey,failIfResultsForReusingMissing,saveInMetricsRepositoryUsingKey)//使用AnalysisRunner 运行,和constraint 相同val firstPassResults = analysisRunnerFirstPass.run()val genericStatistics = extractGenericStatistics(relevantColumns,data.schema,firstPassResults,predefinedTypes)// Second passif (printStatusUpdates) {println("### PROFILING: Computing numeric column statistics in pass (2/3)...")}// We cast all string columns that were detected as numericval castedDataForSecondPass = castNumericStringColumns(relevantColumns, data,genericStatistics)// We compute mean, stddev, min, max for all numeric columnsval analyzersForSecondPass = getAnalyzersForSecondPass(relevantColumns,genericStatistics, kllProfiling, kllParameters)var analysisRunnerSecondPass = AnalysisRunner.onData(castedDataForSecondPass).addAnalyzers(analyzersForSecondPass)analysisRunnerSecondPass = setMetricsRepositoryConfigurationIfNecessary(analysisRunnerSecondPass,metricsRepository,reuseExistingResultsUsingKey,failIfResultsForReusingMissing,saveInMetricsRepositoryUsingKey)//使用AnalysisRunner 运行,和constraint 相同val secondPassResults = analysisRunnerSecondPass.run()val numericStatistics = extractNumericStatistics(secondPassResults)// Third passif (printStatusUpdates) {println("### PROFILING: Computing histograms of low-cardinality columns in pass (3/3)...")}// We compute exact histograms for all low-cardinality string columns, find those hereval targetColumnsForHistograms = findTargetColumnsForHistograms(data.schema, genericStatistics,lowCardinalityHistogramThreshold)// Find out, if we have values for those we can reuseval analyzerContextExistingValues = getAnalyzerContextWithHistogramResultsForReusingIfNecessary(metricsRepository,reuseExistingResultsUsingKey,targetColumnsForHistograms)// The columns we need to calculate the histograms forval nonExistingHistogramColumns = targetColumnsForHistograms.filter { column => analyzerContextExistingValues.metricMap.get(Histogram(column)).isEmpty }// Calculate and save/append results if necessaryval histograms: Map[String, Distribution] = getHistogramsForThirdPass(data,nonExistingHistogramColumns,analyzerContextExistingValues,printStatusUpdates,failIfResultsForReusingMissing,metricsRepository,saveInMetricsRepositoryUsingKey)val thirdPassResults = CategoricalColumnStatistics(histograms)createProfiles(relevantColumns, genericStatistics, numericStatistics, thirdPassResults)}
参考以下代码可以指定,通过输入column name Seq 可以指定分析column范围。
/*** Can be used to specify a subset of columns to look at* 限制3分析的colum 列表* @param restrictToColumns The columns to look at*/
def restrictToColumns(restrictToColumns: Seq[String]): this.type = {this.restrictToColumns = Option(restrictToColumns)this
}
前面也提到了predefinedTypes可以避免String Column Detection,以下是实现逻辑。
//通过DataType 函数进行String 类型的detection
private[this] def getAnalyzersForGenericStats(schema: StructType,relevantColumns: Seq[String],predefinedTypes: Map[String, DataTypeInstances.Value]): Seq[Analyzer[_, Metric[_]]] = {schema.fields.filter { field => relevantColumns.contains(field.name) }.flatMap { field =>val name = field.nameif (field.dataType == StringType && !predefinedTypes.contains(name)) {Seq(Completeness(name), ApproxCountDistinct(name), DataType(name))} else {Seq(Completeness(name), ApproxCountDistinct(name))}}
}