deequ data-column profile 代码分析
admin
2024-02-04 04:27:01

data-column profile 的String 类型侦测

deequ的profile 带来了一个 string 类型的字段类型侦测,以deequ的例子来说明这个特性,数据定义为:
case class RawData(productName: String, totalNumber: String, status: String, valuable: String)

但输出的结果如下
Column ‘productName’:
completeness: 1.0
approximate number of distinct values: 5
datatype: String

Column ‘totalNumber’:
completeness: 1.0
approximate number of distinct values: 6
datatype: Fractional

这个侦测是通过class StatefulDataType 来实现的。

object ColumnProfiler 核心代码

核心执行代码在object ColumnProfiler的profile 函数中,具体如下:

/*** Profile a (potentially very large) dataset.** @param data                             data dataset as dataframe* @param restrictToColumns                an contain a subset of columns to profile, otherwise*                                         all columns will be considered ,限制列,默认所有列* @param printStatusUpdates              是否打印进度* @param lowCardinalityHistogramThreshold the maximum (estimated) number of distinct values*                                         in a column until which we should compute exact*                                         histograms for it (defaults to 120)*                                         是否计算直方图的阀值,对值数量小于阀值的计算直方图* @param metricsRepository                the repo to store metrics 保存的repository* @param reuseExistingResultsUsingKey     key for reuse existing result  使用已有的结果,即Repository里面的数据* @param failIfResultsForReusingMissing   true if we have results for reusing* @param saveInMetricsRepositoryUsingKey  key for saving in metrics repo   保存KEY* @param kllProfiling                     是否自定义了KLL 参数,kll 可以参考https://www.51cto.com/article/711445.html* @param kllParameters                    parameters for KLL Sketches ,注意是否使用kll与此参数无关,具体参考doAnalysisRun* @param predefinedTypes                  预先定义的column Type,如果string column 有定义Type,将不进行detect, 具体见getAnalyzersForGenericStats 函数* @return the profile of columns*/// scalastyle:off argcountprivate[deequ] def profile(data: DataFrame,restrictToColumns: Option[Seq[String]] = None,printStatusUpdates: Boolean = false,lowCardinalityHistogramThreshold: Int =ColumnProfiler.DEFAULT_CARDINALITY_THRESHOLD,metricsRepository: Option[MetricsRepository] = None,reuseExistingResultsUsingKey: Option[ResultKey] = None,failIfResultsForReusingMissing: Boolean = false,saveInMetricsRepositoryUsingKey: Option[ResultKey] = None,kllProfiling: Boolean = false,kllParameters: Option[KLLParameters] = None,predefinedTypes: Map[String, DataTypeInstances.Value] = Map.empty): ColumnProfiles = {// Ensure that all desired columns existrestrictToColumns.foreach { restrictToColumns =>restrictToColumns.foreach { columnName =>require(data.schema.fieldNames.contains(columnName), s"Unable to find column $columnName")}}// Find columns we want to profileval relevantColumns = getRelevantColumns(data.schema, restrictToColumns)// First passif (printStatusUpdates) {println("### PROFILING: Computing generic column statistics in pass (1/3)...")}// We compute completeness, approximate number of distinct values// and type detection for string columns in the first passval analyzersForGenericStats = getAnalyzersForGenericStats(data.schema,relevantColumns,predefinedTypes)//使用AnalysisRunner 运行,但不是用addcheck,改用addAnalyzers,check 最后也是转为Analyzer// check 转analyzers  代码在 private[deequ] def doVerificationRun( 中的如下代码//  val analyzers = requiredAnalyzers ++ checks.flatMap { _.requiredAnalyzers() }// val analysisResults = AnalysisRunner.doAnalysisRun(//  data,//   analyzers.distinct,////var analysisRunnerFirstPass = AnalysisRunner.onData(data).addAnalyzers(analyzersForGenericStats).addAnalyzer(Size())analysisRunnerFirstPass = setMetricsRepositoryConfigurationIfNecessary(analysisRunnerFirstPass,metricsRepository,reuseExistingResultsUsingKey,failIfResultsForReusingMissing,saveInMetricsRepositoryUsingKey)//使用AnalysisRunner 运行,和constraint 相同val firstPassResults = analysisRunnerFirstPass.run()val genericStatistics = extractGenericStatistics(relevantColumns,data.schema,firstPassResults,predefinedTypes)// Second passif (printStatusUpdates) {println("### PROFILING: Computing numeric column statistics in pass (2/3)...")}// We cast all string columns that were detected as numericval castedDataForSecondPass = castNumericStringColumns(relevantColumns, data,genericStatistics)// We compute mean, stddev, min, max for all numeric columnsval analyzersForSecondPass = getAnalyzersForSecondPass(relevantColumns,genericStatistics, kllProfiling, kllParameters)var analysisRunnerSecondPass = AnalysisRunner.onData(castedDataForSecondPass).addAnalyzers(analyzersForSecondPass)analysisRunnerSecondPass = setMetricsRepositoryConfigurationIfNecessary(analysisRunnerSecondPass,metricsRepository,reuseExistingResultsUsingKey,failIfResultsForReusingMissing,saveInMetricsRepositoryUsingKey)//使用AnalysisRunner 运行,和constraint 相同val secondPassResults = analysisRunnerSecondPass.run()val numericStatistics = extractNumericStatistics(secondPassResults)// Third passif (printStatusUpdates) {println("### PROFILING: Computing histograms of low-cardinality columns in pass (3/3)...")}// We compute exact histograms for all low-cardinality string columns, find those hereval targetColumnsForHistograms = findTargetColumnsForHistograms(data.schema, genericStatistics,lowCardinalityHistogramThreshold)// Find out, if we have values for those we can reuseval analyzerContextExistingValues = getAnalyzerContextWithHistogramResultsForReusingIfNecessary(metricsRepository,reuseExistingResultsUsingKey,targetColumnsForHistograms)// The columns we need to calculate the histograms forval nonExistingHistogramColumns = targetColumnsForHistograms.filter { column => analyzerContextExistingValues.metricMap.get(Histogram(column)).isEmpty }// Calculate and save/append results if necessaryval histograms: Map[String, Distribution] = getHistogramsForThirdPass(data,nonExistingHistogramColumns,analyzerContextExistingValues,printStatusUpdates,failIfResultsForReusingMissing,metricsRepository,saveInMetricsRepositoryUsingKey)val thirdPassResults = CategoricalColumnStatistics(histograms)createProfiles(relevantColumns, genericStatistics, numericStatistics, thirdPassResults)}

限制分析范围及避免String Column Detection

参考以下代码可以指定,通过输入column name Seq 可以指定分析column范围。

/*** Can be used to specify a subset of columns to look at* 限制3分析的colum 列表* @param restrictToColumns The columns to look at*/
def restrictToColumns(restrictToColumns: Seq[String]): this.type = {this.restrictToColumns = Option(restrictToColumns)this
}

前面也提到了predefinedTypes可以避免String Column Detection,以下是实现逻辑。

//通过DataType 函数进行String 类型的detection
private[this] def getAnalyzersForGenericStats(schema: StructType,relevantColumns: Seq[String],predefinedTypes: Map[String, DataTypeInstances.Value]): Seq[Analyzer[_, Metric[_]]] = {schema.fields.filter { field => relevantColumns.contains(field.name) }.flatMap { field =>val name = field.nameif (field.dataType == StringType && !predefinedTypes.contains(name)) {Seq(Completeness(name), ApproxCountDistinct(name), DataType(name))} else {Seq(Completeness(name), ApproxCountDistinct(name))}}
}

相关内容

热门资讯

随笔|我把芫荽炒成菜 文|臧彦钧 馋人多半喜欢自己下厨。除了自己动手不用求人外,更多的是能在烹饪过程中自由发挥,抛开前人的...
4分钱的壳配3毛钱的肉?不少成... 对于热爱美食的当地人来说 在夜市或者美食街 都看到过这种“流量食物” ——蒜蓉粉丝烤扇贝 而且价格还...
全国首个以宋词为核心主题的演艺... 齐鲁晚报·齐鲁壹点 张浩穿越回李清照的时代与其展开时空对话,化身为玩家体验真实宋代生活……近日,在济...
“全球文旅轻创业计划”在京发布... 2025年11月17日上午,“银发文旅项目发布会暨全球文旅轻创业计划启动仪式”在中国传媒大学成功举办...
城事|办理口岸过百,台湾“首来... 据央视新闻消息,19日,国台办举行例行发布会,大陆持续释放旅游福利,首次来大陆的台胞“首来族”可获得...