Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Refactor] [Enhancement] List Partition For AMV(Part 1): Refactor MVTimelinessArbiter and MVPCTRefreshPartitioner to make it more extensible #46808

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 68 additions & 44 deletions fe/fe-core/src/main/java/com/starrocks/catalog/MaterializedView.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
import com.starrocks.sql.analyzer.RelationId;
import com.starrocks.sql.analyzer.Scope;
import com.starrocks.sql.ast.UserIdentity;
import com.starrocks.sql.common.PartitionRange;
import com.starrocks.sql.common.PRangeCell;
import com.starrocks.sql.optimizer.CachingMvPlanContextBuilder;
import com.starrocks.sql.optimizer.MvRewritePreprocessor;
import com.starrocks.sql.optimizer.Utils;
Expand Down Expand Up @@ -668,34 +668,61 @@ public void setQueryOutputIndices(List<Integer> queryOutputIndices) {
*/
public static SlotRef getRefBaseTablePartitionSlotRef(MaterializedView materializedView) {
List<SlotRef> slotRefs = Lists.newArrayList();
Expr partitionExpr = materializedView.getFirstPartitionRefTableExpr();
Expr partitionExpr = materializedView.getPartitionExpr();
partitionExpr.collect(SlotRef.class, slotRefs);
// if partitionExpr is FunctionCallExpr, get first SlotRef
Preconditions.checkState(slotRefs.size() == 1);
return slotRefs.get(0);
}

public Expr getFirstPartitionRefTableExpr() {
/**
* Return the partition column of the materialized view.
* NOTE: Only one column is supported for now, support more columns in the future.
* @return the partition column of the materialized view
*/
public Optional<Column> getPartitionColumn() {
List<Column> partitionCols = partitionInfo.getPartitionColumns(this.idToColumn);
if (partitionCols == null || partitionCols.isEmpty()) {
return Optional.empty();
}
return Optional.of(partitionCols.get(0));
}

/**
* Return the partition expr of the range partitioned materialized view.
* NOTE: only one partition expr is supported for now.
* @return the partition expr of the range partitioned materialized view
*/
public Expr getPartitionExpr() {
if (partitionRefTableExprs == null) {
return null;
}
if (partitionRefTableExprs.get(0).getType() == Type.INVALID) {
ExpressionRangePartitionInfo expressionRangePartitionInfo = (ExpressionRangePartitionInfo) partitionInfo;
partitionRefTableExprs.get(0).setType(
expressionRangePartitionInfo.getPartitionExprs(idToColumn).get(0).getType());

Expr partitionExpr = partitionRefTableExprs.get(0);
if (partitionExpr == null) {
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggest to use Optional as return value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good advice. But we always check the expr's null or not for the usage.

}
return partitionRefTableExprs.get(0);
if (partitionExpr.getType() == Type.INVALID) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does Type.INVALID appear here? because of analysis absence in save-and-restore operations ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right.

Optional<Column> partitionColOpt = getPartitionColumn();
if (partitionColOpt.isEmpty()) {
return null;
}
Type partitionColType = partitionColOpt.get().getType();
partitionExpr.setType(partitionColType);
}
return partitionExpr;
}

public static Expr getPartitionExpr(MaterializedView materializedView) {
if (!(materializedView.getPartitionInfo() instanceof ExpressionRangePartitionInfo)) {
// TODO: only range partition expr is supported now
if (!materializedView.getPartitionInfo().isExprRangePartitioned()) {
return null;
}
ExpressionRangePartitionInfo expressionRangePartitionInfo =
((ExpressionRangePartitionInfo) materializedView.getPartitionInfo());
// currently, mv only supports one expression
Preconditions.checkState(expressionRangePartitionInfo.getPartitionExprsSize() == 1);
return materializedView.getFirstPartitionRefTableExpr();
return materializedView.getPartitionExpr();
}

/**
Expand Down Expand Up @@ -1025,7 +1052,7 @@ private boolean onReloadImpl() {
private void analyzePartitionInfo() {
Database db = GlobalStateMgr.getCurrentState().getDb(dbId);

if (partitionInfo instanceof SinglePartitionInfo) {
if (partitionInfo.isUnPartitioned()) {
return;
}
// analyze expression, because it converts to sql for serialize
Expand All @@ -1036,7 +1063,7 @@ private void analyzePartitionInfo() {
connectContext.setCurrentUserIdentity(UserIdentity.ROOT);
connectContext.setCurrentRoleIds(Sets.newHashSet(PrivilegeBuiltinConstants.ROOT_ROLE_ID));
// currently, mv only supports one expression
if (partitionInfo instanceof ExpressionRangePartitionInfo) {
if (partitionInfo.isExprRangePartitioned()) {
ExpressionRangePartitionInfo expressionRangePartitionInfo = (ExpressionRangePartitionInfo) partitionInfo;
Expr partitionExpr = expressionRangePartitionInfo.getPartitionExprs(idToColumn).get(0);
// for Partition slot ref, the SlotDescriptor is not serialized, so should recover it here.
Expand Down Expand Up @@ -1101,7 +1128,8 @@ public boolean isEnableRewrite() {
public boolean isEnableTransparentRewrite() {
TableProperty tableProperty = getTableProperty();
if (tableProperty == null) {
LiShuMing marked this conversation as resolved.
Show resolved Hide resolved
return true;
// default is false
return false;
}
return tableProperty.getMvTransparentRewriteMode().isEnable();
}
Expand Down Expand Up @@ -1191,7 +1219,7 @@ public String getMaterializedViewDdlStmt(boolean simple, boolean isReplay) {

// partition
PartitionInfo partitionInfo = this.getPartitionInfo();
if (!(partitionInfo instanceof SinglePartitionInfo)) {
if (!partitionInfo.isUnPartitioned()) {
sb.append("\n").append(partitionInfo.toSql(this, null));
}

Expand Down Expand Up @@ -1277,7 +1305,6 @@ public String getMaterializedViewDdlStmt(boolean simple, boolean isReplay) {
if (!hasStorageMedium && !isReplay) {
appendUniqueProperties(sb);
}

// bloom filter
Set<String> bfColumnNames = getBfColumnNames();
if (bfColumnNames != null) {
Expand Down Expand Up @@ -1452,14 +1479,14 @@ public boolean isCalcPotentialRefreshPartition(List<TableWithPartitions> baseCha
Map<Table, Map<String, Range<PartitionKey>>> refBaseTableRangePartitionMap,
Set<String> mvPartitions,
Map<String, Range<PartitionKey>> mvPartitionNameToRangeMap) {
List<PartitionRange> mvSortedPartitionRanges =
List<PRangeCell> mvSortedPartitionRanges =
TableWithPartitions.getSortedPartitionRanges(mvPartitionNameToRangeMap, mvPartitions);
for (TableWithPartitions baseTableWithPartition : baseChangedPartitionNames) {
Map<String, Range<PartitionKey>> baseRangePartitionMap =
refBaseTableRangePartitionMap.get(baseTableWithPartition.getTable());
List<PartitionRange> baseSortedPartitionRanges =
List<PRangeCell> baseSortedPartitionRanges =
baseTableWithPartition.getSortedPartitionRanges(baseRangePartitionMap);
for (PartitionRange basePartitionRange : baseSortedPartitionRanges) {
for (PRangeCell basePartitionRange : baseSortedPartitionRanges) {
if (isManyToManyPartitionRangeMapping(basePartitionRange, mvSortedPartitionRanges)) {
return true;
}
Expand All @@ -1471,8 +1498,8 @@ public boolean isCalcPotentialRefreshPartition(List<TableWithPartitions> baseCha
/**
* Whether srcRange is intersected with many dest ranges.
*/
private boolean isManyToManyPartitionRangeMapping(PartitionRange srcRange,
List<PartitionRange> dstRanges) {
private boolean isManyToManyPartitionRangeMapping(PRangeCell srcRange,
List<PRangeCell> dstRanges) {
int mid = Collections.binarySearch(dstRanges, srcRange);
if (mid < 0) {
return false;
Expand All @@ -1498,8 +1525,7 @@ private boolean isManyToManyPartitionRangeMapping(PartitionRange srcRange,
* @return : The materialized view's referred base table and its partition column.
*/
public Pair<Table, Column> getDirectTableAndPartitionColumn() {
if (partitionRefTableExprs == null ||
!(partitionInfo instanceof ExpressionRangePartitionInfo || partitionInfo instanceof ListPartitionInfo)) {
if (partitionRefTableExprs == null || !(partitionInfo.isRangePartition() || partitionInfo.isListPartition())) {
return null;
}
Expr partitionExpr = getPartitionRefTableExprs().get(0);
Expand All @@ -1519,11 +1545,17 @@ public Pair<Table, Column> getDirectTableAndPartitionColumn() {
String.format("can not find partition info for mv:%s on base tables:%s", name, baseTableNames));
}

/**
* Get the related partition table and column of the materialized view since one mv can contain multi ref base tables.
* @return The related partition table and its partition column referred by the materialized view.
*/
public Map<Table, Column> getRelatedPartitionTableAndColumn() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getRelatedPartitionTableAndColumn and getDirectTableAndPartitionColumn seem can be abstracted into one function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. We can unify the original single ref base table into multi ref tables policy later.
But it will be a great deal later.

Map<Table, Column> result = Maps.newHashMap();
if (partitionExprMaps == null || partitionExprMaps.isEmpty()) {
return result;
}

// find the partition column for each base table
Map<Table, SlotRef> tableToSlotMap = getTableToPartitionSlotMap();
for (BaseTableInfo baseTableInfo : baseTableInfos) {
Table table = MvUtils.getTableChecked(baseTableInfo);
Expand All @@ -1534,22 +1566,12 @@ public Map<Table, Column> getRelatedPartitionTableAndColumn() {
if (partitionColumns.isEmpty()) {
continue;
}
// if the partition column is the same with the slot ref, put it into the result.
SlotRef slotRef = tableToSlotMap.get(table);
if (table.isNativeTableOrMaterializedView()) {
if (partitionColumns.size() != 1) {
continue;
}
Column partitionColumn = partitionColumns.get(0);
if (com.starrocks.common.util.StringUtils.areColumnNamesEqual(slotRef.getColumnName(),
partitionColumn.getName())) {
for (Column partitionColumn : partitionColumns) {
if (areColumnNamesEqual(slotRef.getColumnName(), partitionColumn.getName())) {
result.put(table, partitionColumn);
}
} else {
for (Column partitionColumn : partitionColumns) {
if (com.starrocks.common.util.StringUtils.areColumnNamesEqual(slotRef.getColumnName(),
partitionColumn.getName())) {
result.put(table, partitionColumn);
}
break;
}
}
}
Expand All @@ -1559,7 +1581,11 @@ public Map<Table, Column> getRelatedPartitionTableAndColumn() {
String baseTableNames = baseTableInfos.stream()
.map(tableInfo -> MvUtils.getTableChecked(tableInfo).getName()).collect(Collectors.joining(","));
throw new RuntimeException(
String.format("can not find partition info for mv:%s on base tables:%s", name, baseTableNames));
String.format("Can not find partition info for mv:%s on base tables:%s", name, baseTableNames));
}

private boolean areColumnNamesEqual(String col1, String col2) {
return com.starrocks.common.util.StringUtils.areColumnNamesEqual(col1, col2);
}

public ExecPlan getMaintenancePlan() {
Expand Down Expand Up @@ -1653,14 +1679,12 @@ public void gsonPreProcess() throws IOException {
@Override
public void gsonPostProcess() throws IOException {
LiShuMing marked this conversation as resolved.
Show resolved Hide resolved
super.gsonPostProcess();
// only range partition need to recover partitionRefTableExprs
if (!(partitionInfo instanceof ExpressionRangePartitionInfo)) {
// only partition materialized view need to recover partitionRefTableExprs
Optional<Column> partitionColOpt = getPartitionColumn();
if (partitionColOpt.isEmpty()) {
return;
}
ExpressionRangePartitionInfo expressionRangePartitionInfo = (ExpressionRangePartitionInfo) partitionInfo;
// only one partition column is supported now.
Preconditions.checkState(expressionRangePartitionInfo.getPartitionColumnsSize() == 1);
Column partitionCol = expressionRangePartitionInfo.getPartitionColumns(this.idToColumn).get(0);
Column partitionCol = partitionColOpt.get();

// for single ref base table, recover from serializedPartitionRefTableExprs
partitionRefTableExprs = new ArrayList<>();
Expand Down Expand Up @@ -1835,7 +1859,7 @@ public Status doAfterRestore(MvRestoreContext mvRestoreContext) throws DdlExcept
this.baseTableInfos = newBaseTableInfos;

// change ExpressionRangePartitionInfo because mv's db may be changed.
if (partitionInfo instanceof ExpressionRangePartitionInfo) {
if (partitionInfo.isExprRangePartitioned()) {
ExpressionRangePartitionInfo expressionRangePartitionInfo = (ExpressionRangePartitionInfo) partitionInfo;
Preconditions.checkState(expressionRangePartitionInfo.getPartitionExprsSize() == 1);
expressionRangePartitionInfo.renameTableName(db.getFullName(), this.name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,13 @@

package com.starrocks.catalog;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import com.starrocks.sql.common.PCell;
import com.starrocks.sql.common.PListCell;
import com.starrocks.sql.common.PRangeCell;

import java.util.Map;
import java.util.Set;
Expand All @@ -28,7 +32,7 @@ public class MvBaseTableUpdateInfo {
// The partition names of base table that have been updated
private final Set<String> toRefreshPartitionNames = Sets.newHashSet();
// The mapping of partition name to partition range
private final Map<String, Range<PartitionKey>> partitionNameWithRanges = Maps.newHashMap();
private final Map<String, PCell> nameToPartKeys = Maps.newHashMap();

public MvBaseTableUpdateInfo() {
}
Expand All @@ -37,15 +41,65 @@ public Set<String> getToRefreshPartitionNames() {
return toRefreshPartitionNames;
}

/**
* Add partition names that base table needs to be refreshed
* @param toRefreshPartitionNames the partition names that need to be refreshed
*/
public void addToRefreshPartitionNames(Set<String> toRefreshPartitionNames) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a chance that two partition columns from different tables are the same name? how to handle this scenario?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For each mv update info, it should keep a MvBaseTableUpdateInfo for each base table.

public class MvUpdateInfo {
    // The type of mv refresh later
    private final MvToRefreshType mvToRefreshType;
    // The partition names of mv to refresh
    private final Set<String> mvToRefreshPartitionNames = Sets.newHashSet();
    // The update information of base table
    private final Map<Table, MvBaseTableUpdateInfo> baseTableUpdateInfos = Maps.newHashMap();

this.toRefreshPartitionNames.addAll(toRefreshPartitionNames);
}

/**
* Add partition name that needs to be refreshed and its associated range partition key
* @param partitionName mv partition name
* @param rangePartitionKey the associated range partition
*/
public void addRangePartitionKeys(String partitionName,
Range<PartitionKey> rangePartitionKey) {
nameToPartKeys.put(partitionName, new PRangeCell(rangePartitionKey));
}

/**
* Add partition name that needs to be refreshed and its associated list partition key
* @param partitionName base table partition name
* @param listPartitionKey the associated list partition
*/
public void addListPartitionKeys(String partitionName,
PListCell listPartitionKey) {
nameToPartKeys.put(partitionName, listPartitionKey);
}

/**
* Get the partition name with its associated range partition key when the mv is range partitioned.
*/
public Map<String, Range<PartitionKey>> getPartitionNameWithRanges() {
return partitionNameWithRanges;
Map<String, Range<PartitionKey>> result = Maps.newHashMap();
for (Map.Entry<String, PCell> e : nameToPartKeys.entrySet()) {
Preconditions.checkState(e.getValue() instanceof PRangeCell);
PRangeCell rangeCell = (PRangeCell) e.getValue();
result.put(e.getKey(), rangeCell.getRange());
}
return result;
}

/**
* Get the partition name with its associated list partition key when the mv is list partitioned.
*/
public Map<String, PListCell> getPartitionNameWithLists() {
Map<String, PListCell> result = Maps.newHashMap();
for (Map.Entry<String, PCell> e : nameToPartKeys.entrySet()) {
Preconditions.checkState(e.getValue() instanceof PRangeCell);
PListCell listCell = (PListCell) e.getValue();
result.put(e.getKey(), listCell);
}
return result;
}

@Override
public String toString() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this class be designed as a immutable data structure? use Builder to build a immuable object?

return "BaseTableRefreshInfo{" +
", toRefreshPartitionNames=" + toRefreshPartitionNames +
", partitionNameWithRanges=" + partitionNameWithRanges +
", nameToPartKeys=" + nameToPartKeys +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.starrocks.analysis.SlotRef;
import com.starrocks.analysis.TableName;
import com.starrocks.common.Pair;
import com.starrocks.common.util.StringUtils;
import com.starrocks.sql.analyzer.AnalyzerUtils;
import com.starrocks.sql.analyzer.PartitionExprAnalyzer;
import com.starrocks.sql.analyzer.SlotRefResolver;
Expand Down Expand Up @@ -152,7 +153,7 @@ public static Map<Expr, SlotRef> getPartitionJoinMap(Expr partitionExpr, QuerySt
Integer slotIndex = null;
for (int i = 0; i < relations.getOutputExpression().size(); i++) {
String column = relations.getRelations().get(0).getColumnOutputNames().get(i);
if (com.starrocks.common.util.StringUtils.areColumnNamesEqual(slot.getColumnName(), column)) {
if (StringUtils.areColumnNamesEqual(slot.getColumnName(), column)) {
slotIndex = i;
break;
}
Expand Down
Loading
Loading