Skip to content

Commit

Permalink
refactor: Optimize When Querying the tables_history Table with the Co…
Browse files Browse the repository at this point in the history
…ndition table_id = n (#17166)

* optimize: Optimize When Querying the tables_history Table with the Condition table_id = n

* use btreeset replace vec

* ignore check contain and add some comment
  • Loading branch information
TCeason authored Jan 5, 2025
1 parent acd8e61 commit 8f6713d
Show file tree
Hide file tree
Showing 14 changed files with 61 additions and 37 deletions.
1 change: 1 addition & 0 deletions src/meta/api/src/schema_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ pub trait SchemaApi: Send + Sync {
async fn mget_table_names_by_ids(
&self,
table_ids: &[MetaId],
get_dropped_table: bool,
) -> Result<Vec<Option<String>>, KVAppError>;

async fn mget_database_names_by_ids(
Expand Down
3 changes: 2 additions & 1 deletion src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1639,6 +1639,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
async fn mget_table_names_by_ids(
&self,
table_ids: &[MetaId],
get_dropped_table: bool,
) -> Result<Vec<Option<String>>, KVAppError> {
debug!(req :? =(&table_ids); "SchemaApi: {}", func_name!());

Expand All @@ -1654,7 +1655,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
let seq_metas = self.get_pb_values_vec(id_idents).await?;
for (i, seq_meta_opt) in seq_metas.iter().enumerate() {
if let Some(seq_meta) = seq_meta_opt {
if seq_meta.data.drop_on.is_some() {
if seq_meta.data.drop_on.is_some() && !get_dropped_table {
table_names[i] = None;
}
} else {
Expand Down
1 change: 1 addition & 0 deletions src/query/catalog/src/catalog/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ pub trait Catalog: DynClone + Send + Sync + Debug {
&self,
tenant: &Tenant,
table_ids: &[MetaId],
get_dropped_table: bool,
) -> Result<Vec<Option<String>>>;

// Get the db name by meta id.
Expand Down
5 changes: 3 additions & 2 deletions src/query/service/src/catalogs/default/database_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,14 +288,15 @@ impl Catalog for DatabaseCatalog {
&self,
tenant: &Tenant,
table_ids: &[MetaId],
get_dropped_table: bool,
) -> Result<Vec<Option<String>>> {
let sys_table_names = self
.immutable_catalog
.mget_table_names_by_ids(tenant, table_ids)
.mget_table_names_by_ids(tenant, table_ids, get_dropped_table)
.await?;
let mut_table_names = self
.mutable_catalog
.mget_table_names_by_ids(tenant, table_ids)
.mget_table_names_by_ids(tenant, table_ids, get_dropped_table)
.await?;

let mut table_names = Vec::with_capacity(table_ids.len());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ impl Catalog for ImmutableCatalog {
&self,
_tenant: &Tenant,
table_ids: &[MetaId],
_get_dropped_table: bool,
) -> Result<Vec<Option<String>>> {
let mut table_name = Vec::with_capacity(table_ids.len());
for id in table_ids {
Expand Down
7 changes: 6 additions & 1 deletion src/query/service/src/catalogs/default/mutable_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,8 +447,13 @@ impl Catalog for MutableCatalog {
&self,
_tenant: &Tenant,
table_ids: &[MetaId],
get_dropped_table: bool,
) -> Result<Vec<Option<String>>> {
let res = self.ctx.meta.mget_table_names_by_ids(table_ids).await?;
let res = self
.ctx
.meta
.mget_table_names_by_ids(table_ids, get_dropped_table)
.await?;
Ok(res)
}

Expand Down
5 changes: 4 additions & 1 deletion src/query/service/src/catalogs/default/session_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,11 @@ impl Catalog for SessionCatalog {
&self,
tenant: &Tenant,
table_ids: &[MetaId],
get_dropped_table: bool,
) -> databend_common_exception::Result<Vec<Option<String>>> {
self.inner.mget_table_names_by_ids(tenant, table_ids).await
self.inner
.mget_table_names_by_ids(tenant, table_ids, get_dropped_table)
.await
}

async fn get_table_name_by_id(&self, table_id: MetaId) -> Result<Option<String>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,9 @@ async fn show_account_grants(
.collect::<HashSet<u64>>();
let mut table_ids = table_id_set.into_iter().collect::<Vec<u64>>();
table_ids.sort();
let table_names = catalog.mget_table_names_by_ids(&tenant, &table_ids).await?;
let table_names = catalog
.mget_table_names_by_ids(&tenant, &table_ids, false)
.await?;
let table_map = table_ids
.into_iter()
.zip(table_names.into_iter())
Expand Down
5 changes: 4 additions & 1 deletion src/query/service/tests/it/sql/exec/get_table_bind_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,11 @@ impl Catalog for FakedCatalog {
&self,
tenant: &Tenant,
table_ids: &[MetaId],
get_dropped_table: bool,
) -> Result<Vec<Option<String>>> {
self.cat.mget_table_names_by_ids(tenant, table_ids).await
self.cat
.mget_table_names_by_ids(tenant, table_ids, get_dropped_table)
.await
}

async fn get_db_name_by_id(&self, db_id: MetaId) -> Result<String> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -940,8 +940,11 @@ impl Catalog for FakedCatalog {
&self,
tenant: &Tenant,
table_id: &[MetaId],
get_dropped_table: bool,
) -> Result<Vec<Option<String>>> {
self.cat.mget_table_names_by_ids(tenant, table_id).await
self.cat
.mget_table_names_by_ids(tenant, table_id, get_dropped_table)
.await
}

async fn get_table_name_by_id(&self, table_id: MetaId) -> Result<Option<String>> {
Expand Down
1 change: 1 addition & 0 deletions src/query/storages/hive/hive/src/hive_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ impl Catalog for HiveCatalog {
&self,
_tenant: &Tenant,
_table_ids: &[MetaId],
_get_dropped_table: bool,
) -> Result<Vec<Option<String>>> {
Err(ErrorCode::Unimplemented(
"Cannot get tables name by ids in HIVE catalog",
Expand Down
1 change: 1 addition & 0 deletions src/query/storages/iceberg/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ impl Catalog for IcebergCatalog {
&self,
_tenant: &Tenant,
_table_ids: &[MetaId],
_get_dropped_table: bool,
) -> Result<Vec<Option<String>>> {
Err(ErrorCode::Unimplemented(
"Cannot get tables name by ids in HIVE catalog",
Expand Down
4 changes: 3 additions & 1 deletion src/query/storages/system/src/streams_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,9 @@ impl<const T: bool> AsyncSystemTable for StreamsTable<T> {

let mut source_tb_ids = source_tb_id_set.into_iter().collect::<Vec<u64>>();
source_tb_ids.sort();
let source_tb_names = ctl.mget_table_names_by_ids(&tenant, &source_tb_ids).await?;
let source_tb_names = ctl
.mget_table_names_by_ids(&tenant, &source_tb_ids, false)
.await?;
let source_tb_map = source_tb_ids
.into_iter()
.zip(source_tb_names.into_iter())
Expand Down
55 changes: 27 additions & 28 deletions src/query/storages/system/src/tables_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeSet;
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;
Expand Down Expand Up @@ -252,7 +253,7 @@ where TablesTable<WITH_HISTORY, WITHOUT_VIEW>: HistoryAware
let user_api = UserApiProvider::instance();

let mut dbs = Vec::new();
let mut tables_names: Vec<String> = Vec::new();
let mut tables_names: BTreeSet<String> = BTreeSet::new();
let mut invalid_tables_ids = false;
let mut tables_ids: Vec<u64> = Vec::new();
let mut db_name: Vec<String> = Vec::new();
Expand Down Expand Up @@ -317,9 +318,7 @@ where TablesTable<WITH_HISTORY, WITHOUT_VIEW>: HistoryAware
}
} else if col_name == "name" {
if let Scalar::String(t_name) = scalar {
if !tables_names.contains(t_name) {
tables_names.push(t_name.clone());
}
tables_names.insert(t_name.clone());
}
}
Ok(())
Expand Down Expand Up @@ -348,17 +347,19 @@ where TablesTable<WITH_HISTORY, WITHOUT_VIEW>: HistoryAware
}
}
}
if let Err(err) = ctl.mget_table_names_by_ids(&tenant, &tables_ids).await {
warn!("Failed to get tables: {}, {}", ctl.name(), err);
} else {
let new_tables_names = ctl
.mget_table_names_by_ids(&tenant, &tables_ids)
.await?
.into_iter()
.flatten()
.filter(|table| !tables_names.contains(table))
.collect::<Vec<_>>();
tables_names.extend(new_tables_names);
match ctl
.mget_table_names_by_ids(&tenant, &tables_ids, false)
.await
{
Ok(new_tables) => {
let new_table_names: BTreeSet<_> =
new_tables.into_iter().flatten().collect();
tables_names.extend(new_table_names);
}
Err(err) => {
// swallow the errors related with mget tables
warn!("Failed to get tables: {}, {}", ctl.name(), err);
}
}

for table_name in &tables_names {
Expand Down Expand Up @@ -430,21 +431,19 @@ where TablesTable<WITH_HISTORY, WITHOUT_VIEW>: HistoryAware
}
}

if !WITH_HISTORY {
match ctl.mget_table_names_by_ids(&tenant, &tables_ids).await {
Ok(tables) => {
for table in tables.into_iter().flatten() {
if !tables_names.contains(&table) {
tables_names.push(table.clone());
}
}
}
Err(err) => {
let msg =
format!("Failed to get tables: {}, {}", ctl.name(), err);
warn!("{}", msg);
match ctl
.mget_table_names_by_ids(&tenant, &tables_ids, WITH_HISTORY)
.await
{
Ok(tables) => {
for table in tables.into_iter().flatten() {
tables_names.insert(table.clone());
}
}
Err(err) => {
let msg = format!("Failed to get tables: {}, {}", ctl.name(), err);
warn!("{}", msg);
}
}
}
}
Expand Down

0 comments on commit 8f6713d

Please sign in to comment.