-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-18073: Prevent dropped records from failed retriable exceptions #18146
Conversation
@chia7712 Could you take a look at this when you get the chance? I saw you on the past couple PRs for this file. This bug causes data loss by default for all Kafka Connect source connectors. Thanks for the help! |
@@ -396,12 +396,25 @@ boolean sendRecords() { | |||
for (final SourceRecord preTransformRecord : toSend) { | |||
ProcessingContext<SourceRecord> context = new ProcessingContext<>(preTransformRecord); | |||
final SourceRecord record = transformationChain.apply(context, preTransformRecord); | |||
// If the result of a transformation is null, then the record should be filtered/skipped & there was no error | |||
if (record == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The "retriable exception causes data loss" also applies to the transformations. If the transformation chain gives up retrying, it will return null
with context.failed().
I think you can let the null
pass through convertTransformedRecord
, and then have separate null and context.failed() checks there that cover all of the transformation and conversion steps.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch, updated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm thinking now that this fix is in the "wrong place", it's in the WorkerSourceTask, when it's also a problem on the sink task side.
I think the problem is really inside of the RetryWithToleranceOperator, and it shouldn't return null
after a RetriableException when it's not within tolerance limits; it should throw an exception like other non-retriable exceptions.
Good point, will update |
…on failed retriable exceptions
Hi @gharris1727 Can you take another look when you get the chance? Many thanks |
@gharris1727 gentle nudge on this, added tests to sink as well, all tests are passing. Please let me know if there's anything else I can do. |
Hi @gharris1727 do you think you could take another look? Thank you again for the help. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @twthorn for your patience, I was celebrating the winter holidays and then got sick afterwards.
context.error(e); | ||
return null; | ||
markAsFailed(); | ||
if (withinToleranceLimits()) { | ||
return null; | ||
} else { | ||
throw new ConnectException("Exceeded deadline & tolerance for retriable exception", e); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This duplicates (and double-wraps) the exception, once with "Tolerance exceeded in error handler" and once with "Exceeded deadline & tolerance for retriable exception".
I think we can reuse the existing handling for non-retriable exceptions in #execAndHandleError
by just rethrowing e
right after the trace message.
@@ -155,4 +165,29 @@ public static void assertAssignment(boolean expectFailed, | |||
assertEquals(expectedDelay, assignment.delay(), | |||
"Wrong rebalance delay in " + assignment); | |||
} | |||
|
|||
public static TransformationChain getTransformationChain(RetryWithToleranceOperator toleranceOperator, List<Object> results) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add generic arguments to TransformationChain and RetryWithToleranceOperator in this method and callers?
return null; | ||
} else { | ||
throw new ConnectException("Exceeded deadline & tolerance for retriable exception", e); | ||
} | ||
} | ||
if (stopping) { | ||
log.trace("Shutdown has been scheduled. Marking operation as failed."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This data loss scenario is making me think about this stopping
flag, and whether it could cause data loss. Running out of retries and stopping retries due to a shutdown should probably behave similarly, and probably shouldn't skip the record.
But stopping is only set by #triggerStop
/WorkerTask#cancel
, which is a hard-shutdown operation after no further data is expected from the task, and no offsets are being committed.
I think we can probably leave this in-place, and if we ever change task shutdown we can address it then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. In both source/sinks the worker doesn't commit the offsets if it's been cancelled. So data loss should not be possible because of that conditional check. Will leave as is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks so much for reporting and fixing this @twthorn!
…#18146) Reviewers: Greg Harris <[email protected]>
Ticket KAFKA-18073
If any operation on a record (e.g., convert record fails due to connectivity issue to Schema Registry, transform record fails) results in a retriable exception that exceeds its retry configuration (default is no retries) then even with errors.tolerance set to None (the default) we will still drop the record. This causes silent & unexpected data loss by default for any Kafka Connect source or sink connector. We should prevent this and fail loudly and not drop records if a retriable exception fails.
Changes
In order to fix this for source/sink connectors, we fix the logic in RetryWithToleranceOperator operator. Specifically, if a RetriableException can no longer be retried, we either return null (skip) or raise an exception based on the error tolerance.
Testing
In AbstractWorkerSourceTaskTest, add several new tests for the different logical branches depending on errorrs.tolerance & whether convert or transform succeed/fail. We provide a way to avoid mocking the transformationChain so that the underlying retryWithToleranceOperator is used. Although this expands the scope of the logic tested, it is intentional & beneficial, as these mocks prevented this critical bug from being discovered earlier.
Also add similar tests in the worker sink task tests.
Also add tests in RetryWithToleranceOperatorTest for some additional cases of none vs. all. Refactor some unclear naming.
Committer Checklist (excluded from commit message)