Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cpu-profile interactive command #1358

Merged
merged 10 commits into from
Jan 31, 2024
1 change: 1 addition & 0 deletions doc/interactive-commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Both interfaces may serve at the same time. Both respond to simple text command,
- `help`: shows a brief list of available commands
- `status`: returns a detailed status summary of migration progress and configuration
- `sup`: returns a brief status summary of migration progress
- `cpu-profile`: returns a base64-encoded [`runtime/pprof`](https://pkg.go.dev/runtime/pprof) CPU profile using a duration, default: `30s`. Comma-separated options `gzip` and/or `block` (blocked profile) may follow the profile duration
- `coordinates`: returns recent (though not exactly up to date) binary log coordinates of the inspected server
- `applier`: returns the hostname of the applier
- `inspector`: returns the hostname of the inspector
Expand Down
69 changes: 69 additions & 0 deletions go/logic/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,30 @@ package logic

import (
"bufio"
"bytes"
"compress/gzip"
"encoding/base64"
"errors"
"fmt"
"io"
"net"
"os"
"runtime"
"runtime/pprof"
"strconv"
"strings"
"sync/atomic"
"time"

"github.com/github/gh-ost/go/base"
)

var (
ErrCPUProfilingBadOption = errors.New("unrecognized cpu profiling option")
ErrCPUProfilingInProgress = errors.New("cpu profiling already in progress")
defaultCPUProfileDuration = time.Second * 30
)

type printStatusFunc func(PrintStatusRule, io.Writer)

// Server listens for requests on a socket file or via TCP
Expand All @@ -27,6 +40,7 @@ type Server struct {
tcpListener net.Listener
hooksExecutor *HooksExecutor
printStatus printStatusFunc
isCPUProfiling int64
}

func NewServer(migrationContext *base.MigrationContext, hooksExecutor *HooksExecutor, printStatus printStatusFunc) *Server {
Expand All @@ -37,6 +51,54 @@ func NewServer(migrationContext *base.MigrationContext, hooksExecutor *HooksExec
}
}

func (this *Server) runCPUProfile(args string) (io.Reader, error) {
duration := defaultCPUProfileDuration

var err error
var blockProfile, useGzip bool
if args != "" {
s := strings.Split(args, ",")
// a duration string must be the 1st field, if any
if duration, err = time.ParseDuration(s[0]); err != nil {
return nil, err
}
for _, arg := range s[1:] {
switch arg {
case "block", "blocked", "blocking":
blockProfile = true
case "gzip":
useGzip = true
default:
return nil, ErrCPUProfilingBadOption
}
}
}

if atomic.LoadInt64(&this.isCPUProfiling) > 0 {
return nil, ErrCPUProfilingInProgress
}
atomic.StoreInt64(&this.isCPUProfiling, 1)
defer atomic.StoreInt64(&this.isCPUProfiling, 0)

var buf bytes.Buffer
var writer io.Writer = &buf
if blockProfile {
runtime.SetBlockProfileRate(1)
defer runtime.SetBlockProfileRate(0)
}
if useGzip {
writer = gzip.NewWriter(writer)
}
if err = pprof.StartCPUProfile(writer); err != nil {
return nil, err
}

time.Sleep(duration)
pprof.StopCPUProfile()
this.migrationContext.Log.Infof("Captured %d byte runtime/pprof CPU profile (gzip=%v)", buf.Len(), useGzip)
return &buf, nil
}

func (this *Server) BindSocketFile() (err error) {
if this.migrationContext.ServeSocketFile == "" {
return nil
Expand Down Expand Up @@ -144,6 +206,7 @@ func (this *Server) applyServerCommand(command string, writer *bufio.Writer) (pr
fmt.Fprint(writer, `available commands:
status # Print a detailed status message
sup # Print a short status message
cpu-profile=<options> # Print a base64-encoded runtime/pprof CPU profile using a duration, default: 30s. Comma-separated options 'gzip' and/or 'block' (blocked profile) may follow the profile duration
coordinates # Print the currently inspected coordinates
applier # Print the hostname of the applier
inspector # Print the hostname of the inspector
Expand All @@ -169,6 +232,12 @@ help # This message
return ForcePrintStatusOnlyRule, nil
case "info", "status":
return ForcePrintStatusAndHintRule, nil
case "cpu-profile":
cpuProfile, err := this.runCPUProfile(arg)
if err == nil {
fmt.Fprint(base64.NewEncoder(base64.StdEncoding, writer), cpuProfile)
}
return NoPrintStatusRule, err
case "coordinates":
{
if argIsQuestion || arg == "" {
Expand Down
68 changes: 68 additions & 0 deletions go/logic/server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package logic

import (
"testing"
"time"

"github.com/github/gh-ost/go/base"
"github.com/openark/golib/tests"
)

func TestServerRunCPUProfile(t *testing.T) {
t.Parallel()

t.Run("failed already running", func(t *testing.T) {
s := &Server{isCPUProfiling: 1}
profile, err := s.runCPUProfile("15ms")
tests.S(t).ExpectEquals(err, ErrCPUProfilingInProgress)
tests.S(t).ExpectEquals(profile, nil)
})

t.Run("failed bad duration", func(t *testing.T) {
s := &Server{isCPUProfiling: 0}
profile, err := s.runCPUProfile("should-fail")
tests.S(t).ExpectNotNil(err)
tests.S(t).ExpectEquals(profile, nil)
})

t.Run("failed bad option", func(t *testing.T) {
s := &Server{isCPUProfiling: 0}
profile, err := s.runCPUProfile("10ms,badoption")
tests.S(t).ExpectEquals(err, ErrCPUProfilingBadOption)
tests.S(t).ExpectEquals(profile, nil)
})

t.Run("success", func(t *testing.T) {
s := &Server{
isCPUProfiling: 0,
migrationContext: base.NewMigrationContext(),
}
defaultCPUProfileDuration = time.Millisecond * 10
profile, err := s.runCPUProfile("")
tests.S(t).ExpectNil(err)
tests.S(t).ExpectNotEquals(profile, nil)
tests.S(t).ExpectEquals(s.isCPUProfiling, int64(0))
})

t.Run("success with block", func(t *testing.T) {
s := &Server{
isCPUProfiling: 0,
migrationContext: base.NewMigrationContext(),
}
profile, err := s.runCPUProfile("10ms,block")
tests.S(t).ExpectNil(err)
tests.S(t).ExpectNotEquals(profile, nil)
tests.S(t).ExpectEquals(s.isCPUProfiling, int64(0))
})

t.Run("success with block and gzip", func(t *testing.T) {
s := &Server{
isCPUProfiling: 0,
migrationContext: base.NewMigrationContext(),
}
profile, err := s.runCPUProfile("10ms,block,gzip")
tests.S(t).ExpectNil(err)
tests.S(t).ExpectNotEquals(profile, nil)
tests.S(t).ExpectEquals(s.isCPUProfiling, int64(0))
})
}
Loading