Skip to content

Commit

Permalink
chore(backport): improve histogram cardinality estimation (#17200) (#โ€ฆ
Browse files Browse the repository at this point in the history
โ€ฆ17210)

chore(planner): improve histogram cardinality estimation (#17200)

* chore(planner): improve histogram cardinality estimation

* chore(test): update sqllogictest

* chore(test): add sqllogictest

* chore(test): update sqllogictest

* chore(test): add fuzz test

* chore(test): add physical plan info

* chore(test): update fuzz test

* chore(test): refine test

Co-authored-by: everpcpc <[email protected]>
  • Loading branch information
Dousir9 and everpcpc authored Jan 8, 2025
1 parent 8faad84 commit 5cd121f
Show file tree
Hide file tree
Showing 19 changed files with 2,135 additions and 88 deletions.
8 changes: 8 additions & 0 deletions src/common/storage/src/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ impl Datum {
matches!(self, Datum::Bytes(_))
}

pub fn to_float(self) -> Self {
match self {
Datum::Int(v) => Datum::Float(F64::from(v as f64)),
Datum::UInt(v) => Datum::Float(F64::from(v as f64)),
_ => self,
}
}

pub fn to_double(&self) -> Result<f64> {
match self {
Datum::Bool(v) => Ok(*v as u8 as f64),
Expand Down
8 changes: 7 additions & 1 deletion src/query/service/src/interpreters/interpreter_explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ impl Interpreter for ExplainInterpreter {
self.ctx.clone(),
*s_expr.clone(),
schema.clone(),
metadata.clone(),
)?;
let plan = interpreter.build_physical_plan(&mutation, None).await?;
self.explain_physical_plan(&plan, metadata, &None).await?
Expand Down Expand Up @@ -529,7 +530,12 @@ impl ExplainInterpreter {
schema: DataSchemaRef,
) -> Result<Vec<DataBlock>> {
let mutation: Mutation = s_expr.plan().clone().try_into()?;
let interpreter = MutationInterpreter::try_create(self.ctx.clone(), s_expr, schema)?;
let interpreter = MutationInterpreter::try_create(
self.ctx.clone(),
s_expr,
schema,
mutation.metadata.clone(),
)?;
let plan = interpreter.build_physical_plan(&mutation, None).await?;
let root_fragment = Fragmenter::try_create(self.ctx.clone())?.build_fragment(&plan)?;

Expand Down
13 changes: 10 additions & 3 deletions src/query/service/src/interpreters/interpreter_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use databend_common_catalog::lock::LockTableOption;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_sql::binder::ExplainConfig;
use databend_common_sql::plans::Mutation;
use log::error;

use super::interpreter_catalog_create::CreateCatalogInterpreter;
Expand Down Expand Up @@ -384,9 +385,15 @@ impl InterpreterFactory {
Plan::Insert(insert) => InsertInterpreter::try_create(ctx, *insert.clone()),

Plan::Replace(replace) => ReplaceInterpreter::try_create(ctx, *replace.clone()),
Plan::DataMutation { s_expr, schema, .. } => Ok(Arc::new(
MutationInterpreter::try_create(ctx, *s_expr.clone(), schema.clone())?,
)),
Plan::DataMutation { s_expr, schema, .. } => {
let mutation: Mutation = s_expr.plan().clone().try_into()?;
Ok(Arc::new(MutationInterpreter::try_create(
ctx,
*s_expr.clone(),
schema.clone(),
mutation.metadata.clone(),
)?))
}

// Roles
Plan::CreateRole(create_role) => Ok(Arc::new(CreateRoleInterpreter::try_create(
Expand Down
11 changes: 11 additions & 0 deletions src/query/service/src/interpreters/interpreter_mutation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use databend_common_sql::executor::MutationBuildInfo;
use databend_common_sql::executor::PhysicalPlan;
use databend_common_sql::executor::PhysicalPlanBuilder;
use databend_common_sql::optimizer::SExpr;
use databend_common_sql::planner::MetadataRef;
use databend_common_sql::plans;
use databend_common_sql::plans::Mutation;
use databend_common_storage::MutationStatus;
Expand All @@ -44,6 +45,7 @@ use databend_common_storages_fuse::operations::TruncateMode;
use databend_common_storages_fuse::FuseTable;
use databend_common_storages_fuse::TableContext;
use databend_storages_common_table_meta::meta::TableSnapshot;
use log::info;

use crate::interpreters::common::check_deduplicate_label;
use crate::interpreters::common::dml_build_update_stream_req;
Expand All @@ -58,18 +60,21 @@ pub struct MutationInterpreter {
ctx: Arc<QueryContext>,
s_expr: SExpr,
schema: DataSchemaRef,
metadata: MetadataRef,
}

impl MutationInterpreter {
pub fn try_create(
ctx: Arc<QueryContext>,
s_expr: SExpr,
schema: DataSchemaRef,
metadata: MetadataRef,
) -> Result<MutationInterpreter> {
Ok(MutationInterpreter {
ctx,
s_expr,
schema,
metadata,
})
}
}
Expand Down Expand Up @@ -129,6 +134,12 @@ impl Interpreter for MutationInterpreter {
.build_physical_plan(&mutation, Some(mutation_build_info))
.await?;

let query_plan = physical_plan
.format(self.metadata.clone(), Default::default())?
.format_pretty()?;

info!("Query physical plan: \n{}", query_plan);

// Build pipeline.
let mut build_res =
build_query_pipeline_without_render_result_set(&self.ctx, &physical_plan).await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use databend_common_meta_types::MatchSeq;
use databend_common_sql::field_default_value;
use databend_common_sql::plans::AddColumnOption;
use databend_common_sql::plans::AddTableColumnPlan;
use databend_common_sql::plans::Mutation;
use databend_common_sql::plans::Plan;
use databend_common_sql::Planner;
use databend_common_storages_fuse::FuseTable;
Expand Down Expand Up @@ -135,8 +136,13 @@ impl Interpreter for AddTableColumnInterpreter {
let mut planner = Planner::new(self.ctx.clone());
let (plan, _) = planner.plan_sql(&query).await?;
if let Plan::DataMutation { s_expr, schema, .. } = plan {
let interpreter =
MutationInterpreter::try_create(self.ctx.clone(), *s_expr, schema)?;
let mutation: Mutation = s_expr.plan().clone().try_into()?;
let interpreter = MutationInterpreter::try_create(
self.ctx.clone(),
*s_expr,
schema,
mutation.metadata.clone(),
)?;
let _ = interpreter.execute(self.ctx.clone()).await?;
return Ok(PipelineBuildResult::create());
}
Expand Down
5 changes: 4 additions & 1 deletion src/query/service/src/test_kits/fuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use databend_common_expression::DataSchemaRef;
use databend_common_expression::ScalarRef;
use databend_common_expression::SendableDataBlockStream;
use databend_common_sql::optimizer::SExpr;
use databend_common_sql::plans::Mutation;
use databend_common_storages_factory::Table;
use databend_common_storages_fuse::io::MetaWriter;
use databend_common_storages_fuse::statistics::gen_columns_statistics;
Expand Down Expand Up @@ -286,7 +287,9 @@ pub async fn do_mutation(
s_expr: SExpr,
schema: DataSchemaRef,
) -> Result<()> {
let interpreter = MutationInterpreter::try_create(ctx.clone(), s_expr, schema)?;
let mutation: Mutation = s_expr.plan().clone().try_into()?;
let interpreter =
MutationInterpreter::try_create(ctx.clone(), s_expr, schema, mutation.metadata.clone())?;
let _ = interpreter.execute(ctx).await?;
Ok(())
}
Expand Down
7 changes: 5 additions & 2 deletions src/query/sql/src/planner/optimizer/property/histogram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@

use std::cmp::Ordering;

use databend_common_base::base::OrderedFloat;
use databend_common_exception::Result;
use databend_common_expression::arithmetics_type::ResultTypeOfUnary;
use databend_common_storage::Datum;
use databend_common_storage::Histogram;
use databend_common_storage::HistogramBucket;

pub type F64 = OrderedFloat<f64>;

/// Construct a histogram from NDV and total number of rows.
///
/// # Arguments
Expand Down Expand Up @@ -59,7 +62,7 @@ pub fn histogram_from_ndv(
}

let (min, max) = match bound {
Some((min, max)) => (min, max),
Some((min, max)) => (min.to_float(), max.to_float()),
None => {
return Err(format!(
"Must have min and max value when NDV is greater than 0, got NDV: {}",
Expand Down Expand Up @@ -182,7 +185,7 @@ impl SampleSet for UniformSampleSet {

(Datum::Float(min), Datum::Float(max)) => {
let min = *min;
let max = *max;
let max = (*max).checked_add(F64::from(1.0)).ok_or("overflowed")?;
// TODO(xudong): better histogram computation.
let bucket_range = max.checked_sub(min).ok_or("overflowed")? / num_buckets as f64;
let upper_bound = min + bucket_range * bucket_index as f64;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,22 +63,22 @@ Memo
โ”‚ โ””โ”€โ”€ #0 Join [#0, #3]
โ”œโ”€โ”€ Group #5
โ”‚ โ”œโ”€โ”€ Best properties
โ”‚ โ”‚ โ”œโ”€โ”€ { dist: Any }: expr: #0, cost: 4419.000, children: [{ dist: Any }]
โ”‚ โ”‚ โ””โ”€โ”€ { dist: Serial }: expr: #1, cost: 7569.000, children: [{ dist: Any }]
โ”‚ โ”‚ โ”œโ”€โ”€ { dist: Any }: expr: #0, cost: 4420.000, children: [{ dist: Any }]
โ”‚ โ”‚ โ””โ”€โ”€ { dist: Serial }: expr: #1, cost: 7920.000, children: [{ dist: Any }]
โ”‚ โ”œโ”€โ”€ #0 EvalScalar [#4]
โ”‚ โ””โ”€โ”€ #1 Exchange: (Merge) [#5]
โ”œโ”€โ”€ Group #6
โ”‚ โ”œโ”€โ”€ Best properties
โ”‚ โ”‚ โ””โ”€โ”€ { dist: Serial }: expr: #0, cost: 7614.000, children: [{ dist: Serial }]
โ”‚ โ”‚ โ””โ”€โ”€ { dist: Serial }: expr: #0, cost: 7970.000, children: [{ dist: Serial }]
โ”‚ โ”œโ”€โ”€ #0 Aggregate [#5]
โ”‚ โ””โ”€โ”€ #1 Exchange: (Merge) [#6]
โ”œโ”€โ”€ Group #7
โ”‚ โ”œโ”€โ”€ Best properties
โ”‚ โ”‚ โ””โ”€โ”€ { dist: Any }: expr: #0, cost: 7619.000, children: [{ dist: Serial }]
โ”‚ โ”‚ โ””โ”€โ”€ { dist: Any }: expr: #0, cost: 7975.000, children: [{ dist: Serial }]
โ”‚ โ””โ”€โ”€ #0 Aggregate [#6]
โ””โ”€โ”€ Group #8
โ”œโ”€โ”€ Best properties
โ”‚ โ””โ”€โ”€ { dist: Serial }: expr: #0, cost: 7620.000, children: [{ dist: Any }]
โ”‚ โ””โ”€โ”€ { dist: Serial }: expr: #0, cost: 7976.000, children: [{ dist: Any }]
โ””โ”€โ”€ #0 EvalScalar [#7]

query T
Expand Down Expand Up @@ -126,22 +126,22 @@ Memo
โ”‚ โ””โ”€โ”€ #0 Join [#0, #3]
โ”œโ”€โ”€ Group #5
โ”‚ โ”œโ”€โ”€ Best properties
โ”‚ โ”‚ โ”œโ”€โ”€ { dist: Any }: expr: #0, cost: 4419.000, children: [{ dist: Any }]
โ”‚ โ”‚ โ””โ”€โ”€ { dist: Hash(t_10.a (#0)::Int32 NULL) }: expr: #1, cost: 4878.000, children: [{ dist: Any }]
โ”‚ โ”‚ โ”œโ”€โ”€ { dist: Any }: expr: #0, cost: 4420.000, children: [{ dist: Any }]
โ”‚ โ”‚ โ””โ”€โ”€ { dist: Hash(t_10.a (#0)::Int32 NULL) }: expr: #1, cost: 4930.000, children: [{ dist: Any }]
โ”‚ โ”œโ”€โ”€ #0 EvalScalar [#4]
โ”‚ โ””โ”€โ”€ #1 Exchange: (Hash(t_10.a (#0)::Int32 NULL)) [#5]
โ”œโ”€โ”€ Group #6
โ”‚ โ”œโ”€โ”€ Best properties
โ”‚ โ”‚ โ””โ”€โ”€ { dist: Any }: expr: #0, cost: 4923.000, children: [{ dist: Hash(t_10.a (#0)::Int32 NULL) }]
โ”‚ โ”‚ โ””โ”€โ”€ { dist: Any }: expr: #0, cost: 4980.000, children: [{ dist: Hash(t_10.a (#0)::Int32 NULL) }]
โ”‚ โ””โ”€โ”€ #0 Aggregate [#5]
โ”œโ”€โ”€ Group #7
โ”‚ โ”œโ”€โ”€ Best properties
โ”‚ โ”‚ โ””โ”€โ”€ { dist: Any }: expr: #0, cost: 4968.000, children: [{ dist: Any }]
โ”‚ โ”‚ โ””โ”€โ”€ { dist: Any }: expr: #0, cost: 5030.000, children: [{ dist: Any }]
โ”‚ โ””โ”€โ”€ #0 Aggregate [#6]
โ”œโ”€โ”€ Group #8
โ”‚ โ”œโ”€โ”€ Best properties
โ”‚ โ”‚ โ”œโ”€โ”€ { dist: Any }: expr: #0, cost: 4977.000, children: [{ dist: Any }]
โ”‚ โ”‚ โ””โ”€โ”€ { dist: Serial }: expr: #4, cost: 8127.000, children: [{ dist: Any }]
โ”‚ โ”‚ โ”œโ”€โ”€ { dist: Any }: expr: #0, cost: 5040.000, children: [{ dist: Any }]
โ”‚ โ”‚ โ””โ”€โ”€ { dist: Serial }: expr: #4, cost: 8540.000, children: [{ dist: Any }]
โ”‚ โ”œโ”€โ”€ #0 EvalScalar [#7]
โ”‚ โ”œโ”€โ”€ #1 EvalScalar [#14]
โ”‚ โ”œโ”€โ”€ #2 EvalScalar [#20]
Expand All @@ -166,16 +166,16 @@ Memo
โ”œโ”€โ”€ Group #12
โ”‚ โ”œโ”€โ”€ Best properties
โ”‚ โ”‚ โ”œโ”€โ”€ { dist: Any }: expr: #0, cost: 66410.000, children: [{ dist: Any }, { dist: Broadcast }]
โ”‚ โ”‚ โ””โ”€โ”€ { dist: Hash(t_10.a (#0)::Int32 NULL) }: expr: #1, cost: 66869.000, children: [{ dist: Any }]
โ”‚ โ”‚ โ””โ”€โ”€ { dist: Hash(t_10.a (#0)::Int32 NULL) }: expr: #1, cost: 66920.000, children: [{ dist: Any }]
โ”‚ โ”œโ”€โ”€ #0 Join [#11, #3]
โ”‚ โ””โ”€โ”€ #1 Exchange: (Hash(t_10.a (#0)::Int32 NULL)) [#12]
โ”œโ”€โ”€ Group #13
โ”‚ โ”œโ”€โ”€ Best properties
โ”‚ โ”‚ โ””โ”€โ”€ { dist: Any }: expr: #0, cost: 66914.000, children: [{ dist: Hash(t_10.a (#0)::Int32 NULL) }]
โ”‚ โ”‚ โ””โ”€โ”€ { dist: Any }: expr: #0, cost: 66970.000, children: [{ dist: Hash(t_10.a (#0)::Int32 NULL) }]
โ”‚ โ””โ”€โ”€ #0 Aggregate [#12]
โ”œโ”€โ”€ Group #14
โ”‚ โ”œโ”€โ”€ Best properties
โ”‚ โ”‚ โ””โ”€โ”€ { dist: Any }: expr: #0, cost: 66959.000, children: [{ dist: Any }]
โ”‚ โ”‚ โ””โ”€โ”€ { dist: Any }: expr: #0, cost: 67020.000, children: [{ dist: Any }]
โ”‚ โ””โ”€โ”€ #0 Aggregate [#13]
โ”œโ”€โ”€ Group #15
โ”‚ โ”œโ”€โ”€ Best properties
Expand All @@ -197,35 +197,35 @@ Memo
โ”‚ โ””โ”€โ”€ #0 Join [#0, #16]
โ”œโ”€โ”€ Group #18
โ”‚ โ”œโ”€โ”€ Best properties
โ”‚ โ”‚ โ”œโ”€โ”€ { dist: Any }: expr: #0, cost: 5029.000, children: [{ dist: Any }]
โ”‚ โ”‚ โ””โ”€โ”€ { dist: Hash(t_10.a (#0)::Int32 NULL) }: expr: #1, cost: 5488.000, children: [{ dist: Any }]
โ”‚ โ”‚ โ”œโ”€โ”€ { dist: Any }: expr: #0, cost: 5030.000, children: [{ dist: Any }]
โ”‚ โ”‚ โ””โ”€โ”€ { dist: Hash(t_10.a (#0)::Int32 NULL) }: expr: #1, cost: 5540.000, children: [{ dist: Any }]
โ”‚ โ”œโ”€โ”€ #0 EvalScalar [#17]
โ”‚ โ””โ”€โ”€ #1 Exchange: (Hash(t_10.a (#0)::Int32 NULL)) [#18]
โ”œโ”€โ”€ Group #19
โ”‚ โ”œโ”€โ”€ Best properties
โ”‚ โ”‚ โ””โ”€โ”€ { dist: Any }: expr: #0, cost: 5533.000, children: [{ dist: Hash(t_10.a (#0)::Int32 NULL) }]
โ”‚ โ”‚ โ””โ”€โ”€ { dist: Any }: expr: #0, cost: 5590.000, children: [{ dist: Hash(t_10.a (#0)::Int32 NULL) }]
โ”‚ โ””โ”€โ”€ #0 Aggregate [#18]
โ”œโ”€โ”€ Group #20
โ”‚ โ”œโ”€โ”€ Best properties
โ”‚ โ”‚ โ””โ”€โ”€ { dist: Any }: expr: #0, cost: 5578.000, children: [{ dist: Any }]
โ”‚ โ”‚ โ””โ”€โ”€ { dist: Any }: expr: #0, cost: 5640.000, children: [{ dist: Any }]
โ”‚ โ””โ”€โ”€ #0 Aggregate [#19]
โ”œโ”€โ”€ Group #21
โ”‚ โ”œโ”€โ”€ Best properties
โ”‚ โ”‚ โ””โ”€โ”€ { dist: Any }: expr: #0, cost: 67020.000, children: [{ dist: Any }, { dist: Broadcast }]
โ”‚ โ””โ”€โ”€ #0 Join [#11, #16]
โ”œโ”€โ”€ Group #22
โ”‚ โ”œโ”€โ”€ Best properties
โ”‚ โ”‚ โ”œโ”€โ”€ { dist: Any }: expr: #0, cost: 67029.000, children: [{ dist: Any }]
โ”‚ โ”‚ โ””โ”€โ”€ { dist: Hash(t_10.a (#0)::Int32 NULL) }: expr: #1, cost: 67488.000, children: [{ dist: Any }]
โ”‚ โ”‚ โ”œโ”€โ”€ { dist: Any }: expr: #0, cost: 67030.000, children: [{ dist: Any }]
โ”‚ โ”‚ โ””โ”€โ”€ { dist: Hash(t_10.a (#0)::Int32 NULL) }: expr: #1, cost: 67540.000, children: [{ dist: Any }]
โ”‚ โ”œโ”€โ”€ #0 EvalScalar [#21]
โ”‚ โ””โ”€โ”€ #1 Exchange: (Hash(t_10.a (#0)::Int32 NULL)) [#22]
โ”œโ”€โ”€ Group #23
โ”‚ โ”œโ”€โ”€ Best properties
โ”‚ โ”‚ โ””โ”€โ”€ { dist: Any }: expr: #0, cost: 67533.000, children: [{ dist: Hash(t_10.a (#0)::Int32 NULL) }]
โ”‚ โ”‚ โ””โ”€โ”€ { dist: Any }: expr: #0, cost: 67590.000, children: [{ dist: Hash(t_10.a (#0)::Int32 NULL) }]
โ”‚ โ””โ”€โ”€ #0 Aggregate [#22]
โ””โ”€โ”€ Group #24
โ”œโ”€โ”€ Best properties
โ”‚ โ””โ”€โ”€ { dist: Any }: expr: #0, cost: 67578.000, children: [{ dist: Any }]
โ”‚ โ””โ”€โ”€ { dist: Any }: expr: #0, cost: 67640.000, children: [{ dist: Any }]
โ””โ”€โ”€ #0 Aggregate [#23]


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ Memo
โ”‚ โ””โ”€โ”€ #0 Join [#0, #3]
โ””โ”€โ”€ Group #5
โ”œโ”€โ”€ Best properties
โ”‚ โ”œโ”€โ”€ { dist: Any }: expr: #0, cost: 4419.000, children: [{ dist: Any }]
โ”‚ โ””โ”€โ”€ { dist: Serial }: expr: #1, cost: 7569.000, children: [{ dist: Any }]
โ”‚ โ”œโ”€โ”€ { dist: Any }: expr: #0, cost: 4420.000, children: [{ dist: Any }]
โ”‚ โ””โ”€โ”€ { dist: Serial }: expr: #1, cost: 7920.000, children: [{ dist: Any }]
โ”œโ”€โ”€ #0 EvalScalar [#4]
โ””โ”€โ”€ #1 Exchange: (Merge) [#5]

Expand Down Expand Up @@ -192,8 +192,8 @@ Memo
โ”‚ โ””โ”€โ”€ #0 Join [#2, #3]
โ””โ”€โ”€ Group #5
โ”œโ”€โ”€ Best properties
โ”‚ โ”œโ”€โ”€ { dist: Any }: expr: #0, cost: 112911.000, children: [{ dist: Any }]
โ”‚ โ””โ”€โ”€ { dist: Serial }: expr: #1, cost: 494761.000, children: [{ dist: Any }]
โ”‚ โ”œโ”€โ”€ { dist: Any }: expr: #0, cost: 112910.000, children: [{ dist: Any }]
โ”‚ โ””โ”€โ”€ { dist: Serial }: expr: #1, cost: 494410.000, children: [{ dist: Any }]
โ”œโ”€โ”€ #0 EvalScalar [#4]
โ””โ”€โ”€ #1 Exchange: (Merge) [#5]

Expand Down
Loading

0 comments on commit 5cd121f

Please sign in to comment.