Skip to content

Commit

Permalink
[DI] Batch outgoing http requests (#5007)
Browse files Browse the repository at this point in the history
  • Loading branch information
watson authored Jan 7, 2025
1 parent 1e76223 commit b4f99e8
Show file tree
Hide file tree
Showing 13 changed files with 483 additions and 185 deletions.
282 changes: 150 additions & 132 deletions integration-tests/debugger/basic.spec.js

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion integration-tests/debugger/snapshot-pruning.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ describe('Dynamic Instrumentation', function () {
beforeEach(t.triggerBreakpoint)

it('should prune snapshot if payload is too large', function (done) {
t.agent.on('debugger-input', ({ payload }) => {
t.agent.on('debugger-input', ({ payload: [payload] }) => {
assert.isBelow(Buffer.byteLength(JSON.stringify(payload)), 1024 * 1024) // 1MB
assert.deepEqual(payload['debugger.snapshot'].captures, {
lines: {
Expand Down
10 changes: 5 additions & 5 deletions integration-tests/debugger/snapshot.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ describe('Dynamic Instrumentation', function () {
beforeEach(t.triggerBreakpoint)

it('should capture a snapshot', function (done) {
t.agent.on('debugger-input', ({ payload: { 'debugger.snapshot': { captures } } }) => {
t.agent.on('debugger-input', ({ payload: [{ 'debugger.snapshot': { captures } }] }) => {
assert.deepEqual(Object.keys(captures), ['lines'])
assert.deepEqual(Object.keys(captures.lines), [String(t.breakpoint.line)])

Expand Down Expand Up @@ -114,7 +114,7 @@ describe('Dynamic Instrumentation', function () {
})

it('should respect maxReferenceDepth', function (done) {
t.agent.on('debugger-input', ({ payload: { 'debugger.snapshot': { captures } } }) => {
t.agent.on('debugger-input', ({ payload: [{ 'debugger.snapshot': { captures } }] }) => {
const { locals } = captures.lines[t.breakpoint.line]
delete locals.request
delete locals.fastify
Expand Down Expand Up @@ -150,7 +150,7 @@ describe('Dynamic Instrumentation', function () {
})

it('should respect maxLength', function (done) {
t.agent.on('debugger-input', ({ payload: { 'debugger.snapshot': { captures } } }) => {
t.agent.on('debugger-input', ({ payload: [{ 'debugger.snapshot': { captures } }] }) => {
const { locals } = captures.lines[t.breakpoint.line]

assert.deepEqual(locals.lstr, {
Expand All @@ -167,7 +167,7 @@ describe('Dynamic Instrumentation', function () {
})

it('should respect maxCollectionSize', function (done) {
t.agent.on('debugger-input', ({ payload: { 'debugger.snapshot': { captures } } }) => {
t.agent.on('debugger-input', ({ payload: [{ 'debugger.snapshot': { captures } }] }) => {
const { locals } = captures.lines[t.breakpoint.line]

assert.deepEqual(locals.arr, {
Expand Down Expand Up @@ -205,7 +205,7 @@ describe('Dynamic Instrumentation', function () {
}
}

t.agent.on('debugger-input', ({ payload: { 'debugger.snapshot': { captures } } }) => {
t.agent.on('debugger-input', ({ payload: [{ 'debugger.snapshot': { captures } }] }) => {
const { locals } = captures.lines[t.breakpoint.line]

assert.deepEqual(Object.keys(locals), [
Expand Down
8 changes: 5 additions & 3 deletions integration-tests/debugger/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@ function setup ({ env, testApp } = {}) {
function triggerBreakpoint (url) {
// Trigger the breakpoint once probe is successfully installed
t.agent.on('debugger-diagnostics', ({ payload }) => {
if (payload.debugger.diagnostics.status === 'INSTALLED') {
t.axios.get(url)
}
payload.forEach((event) => {
if (event.debugger.diagnostics.status === 'INSTALLED') {
t.axios.get(url)
}
})
})
}

Expand Down
3 changes: 2 additions & 1 deletion packages/dd-trace/src/debugger/devtools_client/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ const config = module.exports = {
service: parentConfig.service,
commitSHA: parentConfig.commitSHA,
repositoryUrl: parentConfig.repositoryUrl,
parentThreadId
parentThreadId,
maxTotalPayloadSize: 5 * 1024 * 1024 // 5MB
}

updateUrl(parentConfig)
Expand Down
5 changes: 2 additions & 3 deletions packages/dd-trace/src/debugger/devtools_client/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,8 @@ session.on('Debugger.paused', async ({ params }) => {
}

// TODO: Process template (DEBUG-2628)
send(probe.template, logger, dd, snapshot, (err) => {
if (err) log.error('Debugger error', err)
else ackEmitting(probe)
send(probe.template, logger, dd, snapshot, () => {
ackEmitting(probe)
})
}
})
Expand Down
36 changes: 36 additions & 0 deletions packages/dd-trace/src/debugger/devtools_client/json-buffer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
'use strict'

class JSONBuffer {
constructor ({ size, timeout, onFlush }) {
this._maxSize = size
this._timeout = timeout
this._onFlush = onFlush
this._reset()
}

_reset () {
clearTimeout(this._timer)
this._timer = null
this._partialJson = null
}

_flush () {
const json = `${this._partialJson}]`
this._reset()
this._onFlush(json)
}

write (str, size = Buffer.byteLength(str)) {
if (this._timer === null) {
this._partialJson = `[${str}`
this._timer = setTimeout(() => this._flush(), this._timeout)
} else if (Buffer.byteLength(this._partialJson) + size + 2 > this._maxSize) {
this._flush()
this.write(str, size)
} else {
this._partialJson += `,${str}`
}
}
}

module.exports = JSONBuffer
38 changes: 28 additions & 10 deletions packages/dd-trace/src/debugger/devtools_client/send.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ const { hostname: getHostname } = require('os')
const { stringify } = require('querystring')

const config = require('./config')
const JSONBuffer = require('./json-buffer')
const request = require('../../exporters/common/request')
const { GIT_COMMIT_SHA, GIT_REPOSITORY_URL } = require('../../plugins/util/tags')
const log = require('../../log')
const { version } = require('../../../../../package.json')

module.exports = send

const MAX_PAYLOAD_SIZE = 1024 * 1024 // 1MB
const MAX_LOG_PAYLOAD_SIZE = 1024 * 1024 // 1MB

const ddsource = 'dd_debugger'
const hostname = getHostname()
Expand All @@ -27,14 +29,10 @@ const ddtags = [

const path = `/debugger/v1/input?${stringify({ ddtags })}`

function send (message, logger, dd, snapshot, cb) {
const opts = {
method: 'POST',
url: config.url,
path,
headers: { 'Content-Type': 'application/json; charset=utf-8' }
}
let callbacks = []
const jsonBuffer = new JSONBuffer({ size: config.maxTotalPayloadSize, timeout: 1000, onFlush })

function send (message, logger, dd, snapshot, cb) {
const payload = {
ddsource,
hostname,
Expand All @@ -46,16 +44,36 @@ function send (message, logger, dd, snapshot, cb) {
}

let json = JSON.stringify(payload)
let size = Buffer.byteLength(json)

if (Buffer.byteLength(json) > MAX_PAYLOAD_SIZE) {
if (size > MAX_LOG_PAYLOAD_SIZE) {
// TODO: This is a very crude way to handle large payloads. Proper pruning will be implemented later (DEBUG-2624)
const line = Object.values(payload['debugger.snapshot'].captures.lines)[0]
line.locals = {
notCapturedReason: 'Snapshot was too large',
size: Object.keys(line.locals).length
}
json = JSON.stringify(payload)
size = Buffer.byteLength(json)
}

jsonBuffer.write(json, size)
callbacks.push(cb)
}

function onFlush (payload) {
const opts = {
method: 'POST',
url: config.url,
path,
headers: { 'Content-Type': 'application/json; charset=utf-8' }
}

request(json, opts, cb)
const _callbacks = callbacks
callbacks = []

request(payload, opts, (err) => {
if (err) log.error('Could not send debugger payload', err)
else _callbacks.forEach(cb => cb())
})
}
11 changes: 9 additions & 2 deletions packages/dd-trace/src/debugger/devtools_client/status.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

const LRUCache = require('lru-cache')
const config = require('./config')
const JSONBuffer = require('./json-buffer')
const request = require('../../exporters/common/request')
const FormData = require('../../exporters/common/form-data')
const log = require('../../log')
Expand All @@ -25,6 +26,8 @@ const cache = new LRUCache({
ttlAutopurge: true
})

const jsonBuffer = new JSONBuffer({ size: config.maxTotalPayloadSize, timeout: 1000, onFlush })

const STATUSES = {
RECEIVED: 'RECEIVED',
INSTALLED: 'INSTALLED',
Expand Down Expand Up @@ -71,11 +74,15 @@ function ackError (err, { id: probeId, version }) {
}

function send (payload) {
jsonBuffer.write(JSON.stringify(payload))
}

function onFlush (payload) {
const form = new FormData()

form.append(
'event',
JSON.stringify(payload),
payload,
{ filename: 'event.json', contentType: 'application/json; charset=utf-8' }
)

Expand All @@ -87,7 +94,7 @@ function send (payload) {
}

request(form, options, (err) => {
if (err) log.error('[debugger:devtools_client] Error sending debugger payload', err)
if (err) log.error('[debugger:devtools_client] Error sending probe payload', err)
})
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
'use strict'

require('../../setup/mocha')

const JSONBuffer = require('../../../src/debugger/devtools_client/json-buffer')

const MAX_SAFE_SIGNED_INTEGER = 2 ** 31 - 1

describe('JSONBuffer', () => {
it('should call onFlush with the expected payload when the timeout is reached', function (done) {
const onFlush = (json) => {
const diff = Date.now() - start
expect(json).to.equal('[{"message":1},{"message":2},{"message":3}]')
expect(diff).to.be.within(95, 110)
done()
}

const jsonBuffer = new JSONBuffer({ size: Infinity, timeout: 100, onFlush })

const start = Date.now()
jsonBuffer.write(JSON.stringify({ message: 1 }))
jsonBuffer.write(JSON.stringify({ message: 2 }))
jsonBuffer.write(JSON.stringify({ message: 3 }))
})

it('should call onFlush with the expected payload when the size is reached', function (done) {
const expectedPayloads = [
'[{"message":1},{"message":2}]',
'[{"message":3},{"message":4}]'
]

const onFlush = (json) => {
expect(json).to.equal(expectedPayloads.shift())
if (expectedPayloads.length === 0) done()
}

const jsonBuffer = new JSONBuffer({ size: 30, timeout: MAX_SAFE_SIGNED_INTEGER, onFlush })

jsonBuffer.write(JSON.stringify({ message: 1 })) // size: 15
jsonBuffer.write(JSON.stringify({ message: 2 })) // size: 29
jsonBuffer.write(JSON.stringify({ message: 3 })) // size: 15 (flushed, and re-added)
jsonBuffer.write(JSON.stringify({ message: 4 })) // size: 29
jsonBuffer.write(JSON.stringify({ message: 5 })) // size: 15 (flushed, and re-added)
})
})
Loading

0 comments on commit b4f99e8

Please sign in to comment.