Skip to content

Commit

Permalink
fix: Use limited parallelism to prevent thread starvation
Browse files Browse the repository at this point in the history
`Dispatchers.IO` has a default thread limit of 64. This can lead to
thread starvation resulting in deadlocks, if more than 64 coroutines
doing blocking calls are running in parallel. Exceeding this limit can
also negatively impact performance if no blocking calls are involved.

To fix this, use `limitedParallelism` [1] to limit the amount of
coroutines running at the same time, and to make use of the elasticity
feature of `Dispatchers.IO` [2].

This commit applies a conservative limit of 20 to places where
`Dispatchers.IO` is used and many parallel coroutines are started. This
value can later be adjusted if it has negative impact on performance.

[1]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-dispatcher/limited-parallelism.html
[2]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-i-o.html

Signed-off-by: Martin Nonnenmacher <[email protected]>
  • Loading branch information
mnonnenmacher committed Dec 12, 2024
1 parent 1d0805f commit 23c9bb0
Show file tree
Hide file tree
Showing 6 changed files with 9 additions and 9 deletions.
2 changes: 1 addition & 1 deletion analyzer/src/main/kotlin/Analyzer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ private class PackageManagerRunner(
private suspend fun run() {
logger.info { "Starting ${manager.managerName} analysis." }

withContext(Dispatchers.IO) {
withContext(Dispatchers.IO.limitedParallelism(20)) {
val result = manager.resolveDependencies(definitionFiles, labels)

logger.info { "Finished ${manager.managerName} analysis." }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ suspend fun ClearlyDefinedService.getDefinitionsChunked(
chunkSize: Int = ClearlyDefinedService.MAX_REQUEST_CHUNK_SIZE
): Map<Coordinates, ClearlyDefinedService.Defined> =
buildMap {
withContext(Dispatchers.IO) {
withContext(Dispatchers.IO.limitedParallelism(20)) {
coordinates.chunked(chunkSize).map { chunk ->
async { call { getDefinitions(chunk) } }
}.awaitAll()
Expand All @@ -306,7 +306,7 @@ suspend fun ClearlyDefinedService.getCurationsChunked(
chunkSize: Int = ClearlyDefinedService.MAX_REQUEST_CHUNK_SIZE
): Map<Coordinates, Curation> =
buildMap {
withContext(Dispatchers.IO) {
withContext(Dispatchers.IO.limitedParallelism(20)) {
coordinates.chunked(chunkSize).map { chunk ->
async { call { getCurations(chunk).values } }
}.awaitAll()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ class Yarn2(

val chunks = moduleIds.chunked(YARN_NPM_INFO_CHUNK_SIZE)

return runBlocking(Dispatchers.IO) {
return runBlocking(Dispatchers.IO.limitedParallelism(20)) {
chunks.mapIndexed { index, chunk ->
async {
logger.info { "Fetching packages details chunk #$index." }
Expand Down
2 changes: 1 addition & 1 deletion plugins/reporters/fossid/src/main/kotlin/FossIdReporter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class FossIdReporter(
val reportType = ReportType.valueOf(config.reportType)
val selectionType = SelectionType.valueOf(config.selectionType)

return runBlocking(Dispatchers.IO) {
return runBlocking(Dispatchers.IO.limitedParallelism(20)) {
val service = FossIdRestService.create(config.serverUrl)
val scanResults = input.ortResult.getScanResults().values.flatten()
val scanCodes = scanResults.flatMapTo(mutableSetOf()) {
Expand Down
2 changes: 1 addition & 1 deletion plugins/scanners/fossid/src/main/kotlin/FossId.kt
Original file line number Diff line number Diff line change
Expand Up @@ -994,7 +994,7 @@ class FossId internal constructor(
val licenseFindingsByPath = licenseFindings.groupBy { it.location.path }
val result = mutableListOf<String>()

runBlocking(Dispatchers.IO) {
runBlocking(Dispatchers.IO.limitedParallelism(20)) {
val candidatePathsToMark = snippetChoices.groupBy({ it.given.sourceLocation.path }) {
it.choice.reason
}
Expand Down
6 changes: 3 additions & 3 deletions scanner/src/main/kotlin/Scanner.kt
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ class Scanner(
logger.info { "Resolving provenance for ${controller.packages.size} package(s)." }

val duration = measureTime {
withContext(Dispatchers.IO) {
withContext(Dispatchers.IO.limitedParallelism(20)) {
controller.packages.map { pkg ->
async {
pkg to runCatching {
Expand All @@ -250,7 +250,7 @@ class Scanner(
logger.info { "Resolving nested provenances for ${controller.packages.size} package(s)." }

val duration = measureTime {
withContext(Dispatchers.IO) {
withContext(Dispatchers.IO.limitedParallelism(20)) {
controller.getPackageProvenancesWithoutVcsPath().map { provenance ->
async {
provenance to runCatching {
Expand Down Expand Up @@ -649,7 +649,7 @@ class Scanner(
logger.info { "Creating file lists for ${provenances.size} provenances." }

val duration = measureTime {
withContext(Dispatchers.IO) {
withContext(Dispatchers.IO.limitedParallelism(20)) {
provenances.mapIndexed { index, provenance ->
async {
logger.info {
Expand Down

0 comments on commit 23c9bb0

Please sign in to comment.