Skip to content

Commit

Permalink
opt: add limit on reusing buffer size (#664)
Browse files Browse the repository at this point in the history
  • Loading branch information
AsterDY authored Jul 9, 2024
1 parent 4adf417 commit 456a3b9
Show file tree
Hide file tree
Showing 10 changed files with 99 additions and 41 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,9 @@ For better performance, in previous case the `ast.Visitor` will be the better ch

But `ast.Visitor` is not a very handy API. You might need to write a lot of code to implement your visitor and carefully maintain the tree hierarchy during decoding. Please read the comments in [ast/visitor.go](https://github.com/bytedance/sonic/blob/main/ast/visitor.go) carefully if you decide to use this API.

### Buffer Size
Sonic use memory pool in many places like `encoder.Encode`, `ast.Node.MarshalJSON` to improve performace, which may produce more memory usage (in-use) when server's load is high. See [issue 614](https://github.com/bytedance/sonic/issues/614). Therefore, we introduce some options to let user control the behavior of memory pool. See [option](https://pkg.go.dev/github.com/bytedance/[email protected]/option#pkg-variables) package.

## Community

Sonic is a subproject of [CloudWeGo](https://www.cloudwego.io/). We are committed to building a cloud native ecosystem.
3 changes: 3 additions & 0 deletions README_ZH_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,9 @@ go someFunc(user)

但是,`ast.Visitor` 并不是一个很易用的 API。你可能需要写大量的代码去实现自己的 `ast.Visitor`,并且需要在解析过程中仔细维护树的层级。如果你决定要使用这个 API,请先仔细阅读 [ast/visitor.go](https://github.com/bytedance/sonic/blob/main/ast/visitor.go) 中的注释。

### 缓冲区大小
Sonic在许多地方使用内存池,如`encoder.Encode`, `ast.Node.MarshalJSON`等来提高性能,这可能会在服务器负载高时产生更多的内存使用(in-use)。参见[issue 614](https://github.com/bytedance/sonic/issues/614)。因此,我们引入了一些选项来让用户配置内存池的行为。参见[option](https://pkg.go.dev/github.com/bytedance/[email protected]/option#pkg-variables)包。

## 社区

Sonic 是 [CloudWeGo](https://www.cloudwego.io/) 下的一个子项目。我们致力于构建云原生生态系统。
22 changes: 13 additions & 9 deletions ast/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@ import (
"unicode/utf8"

"github.com/bytedance/sonic/internal/rt"
)

const (
_MaxBuffer = 1024 // 1KB buffer size
"github.com/bytedance/sonic/option"
)

func quoteString(e *[]byte, s string) {
Expand Down Expand Up @@ -100,23 +97,30 @@ func (self *Node) MarshalJSON() ([]byte, error) {
freeBuffer(buf)
return nil, err
}

ret := make([]byte, len(*buf))
copy(ret, *buf)
freeBuffer(buf)
var ret []byte
if !rt.CanSizeResue(cap(*buf)) {
ret = *buf
} else {
ret = make([]byte, len(*buf))
copy(ret, *buf)
freeBuffer(buf)
}
return ret, err
}

func newBuffer() *[]byte {
if ret := bytesPool.Get(); ret != nil {
return ret.(*[]byte)
} else {
buf := make([]byte, 0, _MaxBuffer)
buf := make([]byte, 0, option.DefaultAstBufferSize)
return &buf
}
}

func freeBuffer(buf *[]byte) {
if !rt.CanSizeResue(cap(*buf)) {
return
}
*buf = (*buf)[:0]
bytesPool.Put(buf)
}
Expand Down
10 changes: 8 additions & 2 deletions internal/decoder/api/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ var bufPool = sync.Pool{
},
}

func freeBytes(buf []byte) {
if rt.CanSizeResue(cap(buf)) {
bufPool.Put(buf[:0])
}
}

// NewStreamDecoder adapts to encoding/json.NewDecoder API.
//
// NewStreamDecoder returns a new decoder that reads from r.
Expand Down Expand Up @@ -105,7 +111,7 @@ func (self *StreamDecoder) Decode(val interface{}) (err error) {
// no remain valid bytes, thus we just recycle buffer
mem := self.buf
self.buf = nil
bufPool.Put(mem[:0])
freeBytes(mem)
} else {
// println("keep")
// remain undecoded bytes, move them onto head
Expand Down Expand Up @@ -178,7 +184,7 @@ func (self *StreamDecoder) setErr(err error) {
self.err = err
mem := self.buf[:0]
self.buf = nil
bufPool.Put(mem)
freeBytes(mem)
}

func (self *StreamDecoder) peek() (byte, error) {
Expand Down
20 changes: 19 additions & 1 deletion internal/encoder/encode_race.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,21 @@ package encoder

import (
`encoding/json`

`github.com/bytedance/sonic/internal/rt`
)


func helpDetectDataRace(val interface{}) {
_, _ = json.Marshal(val)
var out []byte
defer func() {
if v := recover(); v != nil {
// NOTICE: help user to locate where panic occurs
println("panic when encoding on: ", truncate(out))
panic(v)
}
}()
out, _ = json.Marshal(val)
}

func encodeIntoCheckRace(buf *[]byte, val interface{}, opts Options) error {
Expand All @@ -34,3 +44,11 @@ func encodeIntoCheckRace(buf *[]byte, val interface{}, opts Options) error {
helpDetectDataRace(val)
return err
}

func truncate(json []byte) string {
if len(json) <= 256 {
return rt.Mem2Str(json)
} else {
return rt.Mem2Str(json[len(json)-256:])
}
}
30 changes: 19 additions & 11 deletions internal/encoder/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,14 @@ func Encode(val interface{}, opts Options) ([]byte, error) {
}

/* make a copy of the result */
ret = make([]byte, len(*buf))
copy(ret, *buf)

vars.FreeBytes(buf)
if rt.CanSizeResue(cap(*buf)) {
ret = make([]byte, len(*buf))
copy(ret, *buf)
vars.FreeBytes(buf)
} else {
ret = *buf
}

/* return the buffer into pool */
return ret, nil
}
Expand Down Expand Up @@ -269,21 +273,25 @@ func EncodeIndented(val interface{}, prefix string, indent string, opts Options)
/* indent the JSON */
buf = vars.NewBuffer()
err = json.Indent(buf, *out, prefix, indent)
vars.FreeBytes(out)

/* check for errors */
if err != nil {
vars.FreeBytes(out)
vars.FreeBuffer(buf)
return nil, err
}

/* copy to the result buffer */
ret := make([]byte, buf.Len())
copy(ret, buf.Bytes())

/* return the buffers into pool */
vars.FreeBytes(out)
vars.FreeBuffer(buf)
var ret []byte
if rt.CanSizeResue(cap(buf.Bytes())) {
ret = make([]byte, buf.Len())
copy(ret, buf.Bytes())
/* return the buffers into pool */
vars.FreeBuffer(buf)
} else {
ret = buf.Bytes()
}

return ret, nil
}

Expand Down
17 changes: 8 additions & 9 deletions internal/encoder/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,18 @@ func NewStreamEncoder(w io.Writer) *StreamEncoder {

// Encode encodes interface{} as JSON to io.Writer
func (enc *StreamEncoder) Encode(val interface{}) (err error) {
buf := vars.NewBytes()
out := *buf
out := vars.NewBytes()

/* encode into the buffer */
err = EncodeInto(&out, val, enc.Opts)
err = EncodeInto(out, val, enc.Opts)
if err != nil {
goto free_bytes
}

if enc.indent != "" || enc.prefix != "" {
/* indent the JSON */
buf := vars.NewBuffer()
err = json.Indent(buf, out, enc.prefix, enc.indent)
err = json.Indent(buf, *out, enc.prefix, enc.indent)
if err != nil {
vars.FreeBuffer(buf)
goto free_bytes
Expand All @@ -71,9 +70,10 @@ func (enc *StreamEncoder) Encode(val interface{}) (err error) {
} else {
/* copy into io.Writer */
var n int
for len(out) > 0 {
n, err = enc.w.Write(out)
out = out[n:]
buf := *out
for len(buf) > 0 {
n, err = enc.w.Write(buf)
buf = buf[n:]
if err != nil {
goto free_bytes
}
Expand All @@ -86,7 +86,6 @@ func (enc *StreamEncoder) Encode(val interface{}) (err error) {
}

free_bytes:
*buf = out
vars.FreeBytes(buf)
vars.FreeBytes(out)
return err
}
12 changes: 8 additions & 4 deletions internal/encoder/vars/stack.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,10 @@ func NewBuffer() *bytes.Buffer {
}

func FreeBytes(p *[]byte) {
(*p) = (*p)[:0]
bytesPool.Put(p)
if rt.CanSizeResue(cap(*p)) {
(*p) = (*p)[:0]
bytesPool.Put(p)
}
}

func FreeStack(p *Stack) {
Expand All @@ -129,8 +131,10 @@ func FreeStack(p *Stack) {
}

func FreeBuffer(p *bytes.Buffer) {
p.Reset()
bufferPool.Put(p)
if rt.CanSizeResue(cap(p.Bytes())) {
p.Reset()
bufferPool.Put(p)
}
}

var (
Expand Down
11 changes: 9 additions & 2 deletions internal/rt/fastmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
package rt

import (
`unsafe`
`reflect`
"reflect"
"unsafe"

"github.com/bytedance/sonic/option"
)

//go:nosplit
Expand Down Expand Up @@ -146,3 +148,8 @@ func MoreStack(size uintptr)
func Add(ptr unsafe.Pointer, off uintptr) unsafe.Pointer {
return unsafe.Pointer(uintptr(ptr) + off)
}

// CanSizeResue
func CanSizeResue(cap int) bool {
return cap <= int(option.LimitBufferSize)
}
12 changes: 9 additions & 3 deletions option/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,17 @@ package option

var (
// DefaultDecoderBufferSize is the initial buffer size of StreamDecoder
DefaultDecoderBufferSize uint = 128 * 1024
DefaultDecoderBufferSize uint = 4 * 1024

// DefaultEncoderBufferSize is the initial buffer size of Encoder
DefaultEncoderBufferSize uint = 128 * 1024
DefaultEncoderBufferSize uint = 4 * 1024

// DefaultAstBufferSize is the initial buffer size of ast.Node.MarshalJSON()
DefaultAstBufferSize uint = 4 * 1024

// LimitBufferSize indicates the max pool buffer size, in case of OOM.
// See issue https://github.com/bytedance/sonic/issues/614
LimitBufferSize uint = 1024 * 1024
)

// CompileOptions includes all options for encoder or decoder compiler.
Expand Down Expand Up @@ -83,4 +90,3 @@ func WithCompileMaxInlineDepth(depth int) CompileOption {
o.MaxInlineDepth = depth
}
}

0 comments on commit 456a3b9

Please sign in to comment.