Skip to content

Commit

Permalink
Merge pull request #1455 from github/arthur/dont-lock-rows
Browse files Browse the repository at this point in the history
Avoid causing deadlocks when copying rows on busy tables
  • Loading branch information
arthurschreiber authored Oct 23, 2024
2 parents 9af3a07 + 5ddeb21 commit 30f28c2
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 33 deletions.
2 changes: 2 additions & 0 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,8 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected
this.migrationContext.MigrationIterationRangeMaxValues.AbstractValues(),
this.migrationContext.GetIteration() == 0,
this.migrationContext.IsTransactionalTable(),
// TODO: Don't hardcode this
strings.HasPrefix(this.migrationContext.ApplierMySQLVersion, "8."),
)
if err != nil {
return chunkSize, rowsAffected, duration, err
Expand Down
187 changes: 167 additions & 20 deletions go/logic/applier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
gosql "database/sql"
"strings"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -199,6 +200,30 @@ type ApplierTestSuite struct {
mysqlContainer testcontainers.Container
}

func (suite *ApplierTestSuite) getConnectionConfig(ctx context.Context) (*mysql.ConnectionConfig, error) {
host, err := suite.mysqlContainer.ContainerIP(ctx)
if err != nil {
return nil, err
}

config := mysql.NewConnectionConfig()
config.Key.Hostname = host
config.Key.Port = 3306
config.User = "root"
config.Password = "root-password"

return config, nil
}

func (suite *ApplierTestSuite) getDb(ctx context.Context) (*gosql.DB, error) {
host, err := suite.mysqlContainer.ContainerIP(ctx)
if err != nil {
return nil, err
}

return gosql.Open("mysql", "root:root-password@tcp("+host+":3306)/test")
}

func (suite *ApplierTestSuite) SetupSuite() {
ctx := context.Background()
req := testcontainers.ContainerRequest{
Expand Down Expand Up @@ -229,7 +254,7 @@ func (suite *ApplierTestSuite) SetupTest() {
suite.Require().NoError(err)
suite.Require().Equalf(0, rc, "failed to created database: expected exit code 0, got %d", rc)

rc, _, err = suite.mysqlContainer.Exec(ctx, []string{"mysql", "-uroot", "-proot-password", "-e", "CREATE TABLE test.testing (id INT, item_id INT);"})
rc, _, err = suite.mysqlContainer.Exec(ctx, []string{"mysql", "-uroot", "-proot-password", "-e", "CREATE TABLE test.testing (id INT, item_id INT, PRIMARY KEY (id));"})
suite.Require().NoError(err)
suite.Require().Equalf(0, rc, "failed to created table: expected exit code 0, got %d", rc)
}
Expand All @@ -245,15 +270,11 @@ func (suite *ApplierTestSuite) TearDownTest() {
func (suite *ApplierTestSuite) TestInitDBConnections() {
ctx := context.Background()

host, err := suite.mysqlContainer.ContainerIP(ctx)
connectionConfig, err := suite.getConnectionConfig(ctx)
suite.Require().NoError(err)

migrationContext := base.NewMigrationContext()
migrationContext.ApplierConnectionConfig = mysql.NewConnectionConfig()
migrationContext.ApplierConnectionConfig.Key.Hostname = host
migrationContext.ApplierConnectionConfig.Key.Port = 3306
migrationContext.ApplierConnectionConfig.User = "root"
migrationContext.ApplierConnectionConfig.Password = "root-password"
migrationContext.ApplierConnectionConfig = connectionConfig
migrationContext.DatabaseName = "test"
migrationContext.OriginalTableName = "testing"
migrationContext.SetConnectionConfig("innodb")
Expand All @@ -274,15 +295,11 @@ func (suite *ApplierTestSuite) TestInitDBConnections() {
func (suite *ApplierTestSuite) TestApplyDMLEventQueries() {
ctx := context.Background()

host, err := suite.mysqlContainer.ContainerIP(ctx)
connectionConfig, err := suite.getConnectionConfig(ctx)
suite.Require().NoError(err)

migrationContext := base.NewMigrationContext()
migrationContext.ApplierConnectionConfig = mysql.NewConnectionConfig()
migrationContext.ApplierConnectionConfig.Key.Hostname = host
migrationContext.ApplierConnectionConfig.Key.Port = 3306
migrationContext.ApplierConnectionConfig.User = "root"
migrationContext.ApplierConnectionConfig.Password = "root-password"
migrationContext.ApplierConnectionConfig = connectionConfig
migrationContext.DatabaseName = "test"
migrationContext.OriginalTableName = "testing"
migrationContext.SetConnectionConfig("innodb")
Expand Down Expand Up @@ -313,7 +330,7 @@ func (suite *ApplierTestSuite) TestApplyDMLEventQueries() {
suite.Require().NoError(err)

// Check that the row was inserted
db, err := gosql.Open("mysql", "root:root-password@tcp("+host+":3306)/test")
db, err := suite.getDb(ctx)
suite.Require().NoError(err)
defer db.Close()

Expand All @@ -340,15 +357,11 @@ func (suite *ApplierTestSuite) TestApplyDMLEventQueries() {
func (suite *ApplierTestSuite) TestValidateOrDropExistingTables() {
ctx := context.Background()

host, err := suite.mysqlContainer.ContainerIP(ctx)
connectionConfig, err := suite.getConnectionConfig(ctx)
suite.Require().NoError(err)

migrationContext := base.NewMigrationContext()
migrationContext.ApplierConnectionConfig = mysql.NewConnectionConfig()
migrationContext.ApplierConnectionConfig.Key.Hostname = host
migrationContext.ApplierConnectionConfig.Key.Port = 3306
migrationContext.ApplierConnectionConfig.User = "root"
migrationContext.ApplierConnectionConfig.Password = "root-password"
migrationContext.ApplierConnectionConfig = connectionConfig
migrationContext.DatabaseName = "test"
migrationContext.OriginalTableName = "testing"
migrationContext.SetConnectionConfig("innodb")
Expand All @@ -367,6 +380,140 @@ func (suite *ApplierTestSuite) TestValidateOrDropExistingTables() {
suite.Require().NoError(err)
}

func (suite *ApplierTestSuite) TestApplyIterationInsertQuery() {
ctx := context.Background()

connectionConfig, err := suite.getConnectionConfig(ctx)
suite.Require().NoError(err)

migrationContext := base.NewMigrationContext()
migrationContext.ApplierConnectionConfig = connectionConfig
migrationContext.DatabaseName = "test"
migrationContext.OriginalTableName = "testing"
migrationContext.ChunkSize = 10
migrationContext.SetConnectionConfig("innodb")

db, err := suite.getDb(ctx)
suite.Require().NoError(err)
defer db.Close()

_, err = db.Exec("CREATE TABLE test._testing_gho (id INT, item_id INT, PRIMARY KEY (id))")
suite.Require().NoError(err)

// Insert some test values
for i := 1; i <= 10; i++ {
_, err = db.Exec("INSERT INTO test.testing (id, item_id) VALUES (?, ?)", i, i)
suite.Require().NoError(err)
}

migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "item_id"})
migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "item_id"})
migrationContext.UniqueKey = &sql.UniqueKey{
Name: "PRIMARY",
Columns: *sql.NewColumnList([]string{"id"}),
}

migrationContext.MigrationIterationRangeMinValues = sql.ToColumnValues([]interface{}{1})
migrationContext.MigrationIterationRangeMaxValues = sql.ToColumnValues([]interface{}{10})

applier := NewApplier(migrationContext)
defer applier.Teardown()

err = applier.InitDBConnections()
suite.Require().NoError(err)

chunkSize, rowsAffected, duration, err := applier.ApplyIterationInsertQuery()
suite.Require().NoError(err)

suite.Require().Equal(migrationContext.ChunkSize, chunkSize)
suite.Require().Equal(int64(10), rowsAffected)
suite.Require().Greater(duration, time.Duration(0))

// Check that the rows were inserted
rows, err := db.Query("SELECT * FROM test._testing_gho")
suite.Require().NoError(err)
defer rows.Close()

var count, id, item_id int
for rows.Next() {
err = rows.Scan(&id, &item_id)
suite.Require().NoError(err)
count += 1
}
suite.Require().NoError(rows.Err())

suite.Require().Equal(10, count)
}

func (suite *ApplierTestSuite) TestApplyIterationInsertQueryFailsFastWhenSelectingLockedRows() {
ctx := context.Background()

connectionConfig, err := suite.getConnectionConfig(ctx)
suite.Require().NoError(err)

migrationContext := base.NewMigrationContext()
migrationContext.ApplierConnectionConfig = connectionConfig
migrationContext.DatabaseName = "test"
migrationContext.OriginalTableName = "testing"
migrationContext.ChunkSize = 10
migrationContext.TableEngine = "innodb"
migrationContext.SetConnectionConfig("innodb")

db, err := suite.getDb(ctx)
suite.Require().NoError(err)
defer db.Close()

_, err = db.Exec("CREATE TABLE test._testing_gho (id INT, item_id INT, PRIMARY KEY (id))")
suite.Require().NoError(err)

// Insert some test values
for i := 1; i <= 10; i++ {
_, err = db.Exec("INSERT INTO test.testing (id, item_id) VALUES (?, ?)", i, i)
suite.Require().NoError(err)
}

migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "item_id"})
migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "item_id"})
migrationContext.UniqueKey = &sql.UniqueKey{
Name: "PRIMARY",
Columns: *sql.NewColumnList([]string{"id"}),
}

migrationContext.MigrationIterationRangeMinValues = sql.ToColumnValues([]interface{}{1})
migrationContext.MigrationIterationRangeMaxValues = sql.ToColumnValues([]interface{}{10})

applier := NewApplier(migrationContext)
defer applier.Teardown()

err = applier.InitDBConnections()
suite.Require().NoError(err)

// Lock one of the rows
tx, err := db.Begin()
suite.Require().NoError(err)
defer func() {
suite.Require().NoError(tx.Rollback())
}()

_, err = tx.Exec("SELECT * FROM test.testing WHERE id = 5 FOR UPDATE")
suite.Require().NoError(err)

chunkSize, rowsAffected, duration, err := applier.ApplyIterationInsertQuery()
suite.Require().Error(err)
suite.Require().EqualError(err, "Error 3572 (HY000): Statement aborted because lock(s) could not be acquired immediately and NOWAIT is set.")

suite.Require().Equal(migrationContext.ChunkSize, chunkSize)
suite.Require().Equal(int64(0), rowsAffected)
suite.Require().Equal(time.Duration(0), duration)

// Check that the no rows were inserted
var count int
err = db.QueryRow("SELECT COUNT(*) FROM test._testing_gho").Scan(&count)
suite.Require().NoError(err)

suite.Require().Equal(0, count)
}

func TestApplier(t *testing.T) {
suite.Run(t, new(ApplierTestSuite))
}
18 changes: 11 additions & 7 deletions go/sql/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func BuildRangePreparedComparison(columns *ColumnList, args []interface{}, compa
return BuildRangeComparison(columns.Names(), values, args, comparisonSign)
}

func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, mappedSharedColumns []string, uniqueKey string, uniqueKeyColumns *ColumnList, rangeStartValues, rangeEndValues []string, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool, transactionalTable bool) (result string, explodedArgs []interface{}, err error) {
func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, mappedSharedColumns []string, uniqueKey string, uniqueKeyColumns *ColumnList, rangeStartValues, rangeEndValues []string, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool, transactionalTable bool, noWait bool) (result string, explodedArgs []interface{}, err error) {
if len(sharedColumns) == 0 {
return "", explodedArgs, fmt.Errorf("Got 0 shared columns in BuildRangeInsertQuery")
}
Expand Down Expand Up @@ -212,15 +212,19 @@ func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName strin
return "", explodedArgs, err
}
explodedArgs = append(explodedArgs, rangeExplodedArgs...)
transactionalClause := ""
if transactionalTable {
if noWait {
transactionalClause = "for share nowait"
} else {
transactionalClause = "lock in share mode"
}
}
rangeEndComparison, rangeExplodedArgs, err := BuildRangeComparison(uniqueKeyColumns.Names(), rangeEndValues, rangeEndArgs, LessThanOrEqualsComparisonSign)
if err != nil {
return "", explodedArgs, err
}
explodedArgs = append(explodedArgs, rangeExplodedArgs...)
transactionalClause := ""
if transactionalTable {
transactionalClause = "lock in share mode"
}
result = fmt.Sprintf(`
insert /* gh-ost %s.%s */ ignore
into
Expand All @@ -241,10 +245,10 @@ func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName strin
return result, explodedArgs, nil
}

func BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, mappedSharedColumns []string, uniqueKey string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool, transactionalTable bool) (result string, explodedArgs []interface{}, err error) {
func BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, mappedSharedColumns []string, uniqueKey string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool, transactionalTable bool, noWait bool) (result string, explodedArgs []interface{}, err error) {
rangeStartValues := buildColumnsPreparedValues(uniqueKeyColumns)
rangeEndValues := buildColumnsPreparedValues(uniqueKeyColumns)
return BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, includeRangeStartValues, transactionalTable)
return BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, includeRangeStartValues, transactionalTable, noWait)
}

func BuildUniqueKeyRangeEndPreparedQueryViaOffset(databaseName, tableName string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, chunkSize int64, includeRangeStartValues bool, hint string) (result string, explodedArgs []interface{}, err error) {
Expand Down
16 changes: 10 additions & 6 deletions go/sql/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func TestBuildRangeInsertQuery(t *testing.T) {
rangeStartArgs := []interface{}{3}
rangeEndArgs := []interface{}{103}

query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false)
query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, true, true)
require.NoError(t, err)
expected := `
insert /* gh-ost mydb.tbl */ ignore
Expand All @@ -186,6 +186,7 @@ func TestBuildRangeInsertQuery(t *testing.T) {
where
(((id > @v1s) or ((id = @v1s)))
and ((id < @v1e) or ((id = @v1e))))
for share nowait
)`
require.Equal(t, normalizeQuery(expected), normalizeQuery(query))
require.Equal(t, []interface{}{3, 3, 103, 103}, explodedArgs)
Expand All @@ -198,7 +199,7 @@ func TestBuildRangeInsertQuery(t *testing.T) {
rangeStartArgs := []interface{}{3, 17}
rangeEndArgs := []interface{}{103, 117}

query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false)
query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, true, true)
require.NoError(t, err)
expected := `
insert /* gh-ost mydb.tbl */ ignore
Expand All @@ -219,6 +220,7 @@ func TestBuildRangeInsertQuery(t *testing.T) {
or (((name = @v1e))
AND (position < @v2e))
or ((name = @v1e) and (position = @v2e))))
for share nowait
)`
require.Equal(t, normalizeQuery(expected), normalizeQuery(query))
require.Equal(t, []interface{}{3, 3, 17, 3, 17, 103, 103, 117, 103, 117}, explodedArgs)
Expand All @@ -239,7 +241,7 @@ func TestBuildRangeInsertQueryRenameMap(t *testing.T) {
rangeStartArgs := []interface{}{3}
rangeEndArgs := []interface{}{103}

query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false)
query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, true, true)
require.NoError(t, err)
expected := `
insert /* gh-ost mydb.tbl */ ignore
Expand All @@ -255,6 +257,7 @@ func TestBuildRangeInsertQueryRenameMap(t *testing.T) {
(((id > @v1s) or ((id = @v1s)))
and
((id < @v1e) or ((id = @v1e))))
for share nowait
)`
require.Equal(t, normalizeQuery(expected), normalizeQuery(query))
require.Equal(t, []interface{}{3, 3, 103, 103}, explodedArgs)
Expand All @@ -267,7 +270,7 @@ func TestBuildRangeInsertQueryRenameMap(t *testing.T) {
rangeStartArgs := []interface{}{3, 17}
rangeEndArgs := []interface{}{103, 117}

query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false)
query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, true, true)
require.NoError(t, err)
expected := `
insert /* gh-ost mydb.tbl */ ignore
Expand All @@ -284,6 +287,7 @@ func TestBuildRangeInsertQueryRenameMap(t *testing.T) {
AND (position > @v2s)) or ((name = @v1s) and (position = @v2s)))
and ((name < @v1e) or (((name = @v1e)) AND (position < @v2e))
or ((name = @v1e) and (position = @v2e))))
for share nowait
)`
require.Equal(t, normalizeQuery(expected), normalizeQuery(query))
require.Equal(t, []interface{}{3, 3, 17, 3, 17, 103, 103, 117, 103, 117}, explodedArgs)
Expand All @@ -301,7 +305,7 @@ func TestBuildRangeInsertPreparedQuery(t *testing.T) {
rangeStartArgs := []interface{}{3, 17}
rangeEndArgs := []interface{}{103, 117}

query, explodedArgs, err := BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartArgs, rangeEndArgs, true, true)
query, explodedArgs, err := BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartArgs, rangeEndArgs, true, true, true)
require.NoError(t, err)
expected := `
insert /* gh-ost mydb.tbl */ ignore
Expand All @@ -314,7 +318,7 @@ func TestBuildRangeInsertPreparedQuery(t *testing.T) {
mydb.tbl
force index (name_position_uidx)
where (((name > ?) or (((name = ?)) AND (position > ?)) or ((name = ?) and (position = ?))) and ((name < ?) or (((name = ?)) AND (position < ?)) or ((name = ?) and (position = ?))))
lock in share mode
for share nowait
)`
require.Equal(t, normalizeQuery(expected), normalizeQuery(query))
require.Equal(t, []interface{}{3, 3, 17, 3, 17, 103, 103, 117, 103, 117}, explodedArgs)
Expand Down

0 comments on commit 30f28c2

Please sign in to comment.