Skip to content

Commit

Permalink
Add analysis failure callback
Browse files Browse the repository at this point in the history
  • Loading branch information
jdesjean committed Jan 8, 2025
1 parent 36643f7 commit 11d42f2
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,16 @@ object QueryPlanningTracker {
* Callbacks after planning phase completion.
*/
abstract class QueryPlanningTrackerCallback {
/**
* Called when query fails analysis
*
* @param tracker tracker that triggered the callback.
* @param parsedPlan The plan prior to analysis
* see @org.apache.spark.sql.catalyst.analysis.Analyzer
*/
def analysisFailed(tracker: QueryPlanningTracker, parsedPlan: LogicalPlan): Unit = {
// Noop by default for backward compatibility
}
/**
* Called when query has been analyzed.
*
Expand Down Expand Up @@ -147,6 +157,17 @@ class QueryPlanningTracker(
ret
}

/**
* Set when the query has been parsed.
* Can be called multiple times upon plan change.
*
* @param parsedPlan The plan prior analysis
* see @org.apache.spark.sql.catalyst.analysis.Analyzer
*/
private[sql] def setAnalysisFailed(parsedPlan: LogicalPlan): Unit = {
trackerCallback.foreach(_.analysisFailed(this, parsedPlan))
}

/**
* Set when the query has been analysed.
* Can be called multiple times upon plan change.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,13 @@ class QueryPlanningTrackerSuite extends SparkFunSuite {
val mockCallback = mock[QueryPlanningTrackerCallback]
val mockPlan1 = mock[LogicalPlan]
val mockPlan2 = mock[LogicalPlan]
val mockPlan3 = mock[LogicalPlan]
val mockPlan4 = mock[LogicalPlan]
val t = new QueryPlanningTracker(Some(mockCallback))
t.setAnalysisFailed(mockPlan3)
verify(mockCallback, times(1)).analysisFailed(t, mockPlan3)
t.setAnalysisFailed(mockPlan4)
verify(mockCallback, times(1)).analysisFailed(t, mockPlan4)
t.setAnalyzed(mockPlan1)
verify(mockCallback, times(1)).analyzed(t, mockPlan1)
t.setAnalyzed(mockPlan2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,19 @@ case class ExecuteEventsManager(executeHolder: ExecuteHolder, clock: Clock) {
*
* @param analyzedPlan
* The analyzed plan generated by the Connect request plan. None when the request does not
* generate a Spark plan or analysis fails.
* @param parsedPlan
* The analyzed plan generated by the Connect request plan. None when the request does not
* generate a plan.
*/
def postAnalyzed(analyzedPlan: Option[LogicalPlan] = None): Unit = {
def postAnalyzed(
analyzedPlan: Option[LogicalPlan] = None,
parsedPlan: Option[LogicalPlan] = None): Unit = {
assertStatus(List(ExecuteStatus.Started, ExecuteStatus.Analyzed), ExecuteStatus.Analyzed)
val event =
SparkListenerConnectOperationAnalyzed(jobTag, operationId, clock.getTimeMillis())
event.analyzedPlan = analyzedPlan
event.parsedPlan = parsedPlan
listenerBus.post(event)
}

Expand Down Expand Up @@ -251,6 +257,10 @@ case class ExecuteEventsManager(executeHolder: ExecuteHolder, clock: Clock) {
postAnalyzed(Some(analyzedPlan))
}

override def analysisFailed(tracker: QueryPlanningTracker, parsedPlan: LogicalPlan): Unit = {
postAnalyzed(None, Some(parsedPlan))
}

def readyForExecution(tracker: QueryPlanningTracker): Unit = postReadyForExecution()
}))
}
Expand Down Expand Up @@ -342,9 +352,15 @@ case class SparkListenerConnectOperationAnalyzed(
extends SparkListenerEvent {

/**
* Analyzed Spark plan generated by the Connect request. None when the Connect request does not
* Parsed Spark plan generated by the Connect request. None when the Connect request does not
* generate a Spark plan.
*/
@JsonIgnore var parsedPlan: Option[LogicalPlan] = None

/**
* Analyzed Spark plan generated by the Connect request. None when the Connect request does not
* generate a Spark plan or analysis fails.
*/
@JsonIgnore var analyzedPlan: Option[LogicalPlan] = None
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.{BufferedWriter, OutputStreamWriter}
import java.util.UUID
import java.util.concurrent.atomic.AtomicLong

import scala.util.Try
import scala.util.{Failure, Success, Try}
import scala.util.control.NonFatal

import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -99,11 +99,15 @@ class QueryExecution(
sparkSession.sessionState.analyzer.executeAndCheck(logical, tracker)
}
}
plan.toOption.getOrElse(logical).foreach(tracker.setAnalyzed)
plan match {
case Success(plan) =>
tracker.setAnalyzed(plan)
case Failure(_) =>
tracker.setAnalysisFailed(logical)
}
plan.get
}


def analyzed: LogicalPlan = lazyAnalyzed.get

private val lazyCommandExecuted = LazyTry {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ class QueryExecutionSuite extends SharedSparkSession {
}

test("SPARK-50600: Failed analysis should send analyzed event") {
val mockCallback = MockCallback(isFailed = true)
val mockCallback = MockCallback()

def table(ref: String): LogicalPlan = UnresolvedRelation(TableIdentifier(ref))

Expand Down Expand Up @@ -461,12 +461,20 @@ class QueryExecutionSuite extends SharedSparkSession {
}
case class MockCallback(
var trackerAnalyzed: QueryPlanningTracker = null,
var trackerReadyForExecution: QueryPlanningTracker = null,
val isFailed: Boolean = false)
var trackerReadyForExecution: QueryPlanningTracker = null)
extends QueryPlanningTrackerCallback {
override def analysisFailed(
trackerFromCallback: QueryPlanningTracker,
analyzedPlan: LogicalPlan): Unit = {
trackerAnalyzed = trackerFromCallback
assert(!trackerAnalyzed.phases.keySet.contains(QueryPlanningTracker.ANALYSIS))
assert(!trackerAnalyzed.phases.keySet.contains(QueryPlanningTracker.OPTIMIZATION))
assert(!trackerAnalyzed.phases.keySet.contains(QueryPlanningTracker.PLANNING))
assert(analyzedPlan != null)
}
def analyzed(trackerFromCallback: QueryPlanningTracker, plan: LogicalPlan): Unit = {
trackerAnalyzed = trackerFromCallback
assert(trackerAnalyzed.phases.keySet.contains(QueryPlanningTracker.ANALYSIS) == !isFailed)
assert(trackerAnalyzed.phases.keySet.contains(QueryPlanningTracker.ANALYSIS))
assert(!trackerAnalyzed.phases.keySet.contains(QueryPlanningTracker.OPTIMIZATION))
assert(!trackerAnalyzed.phases.keySet.contains(QueryPlanningTracker.PLANNING))
assert(plan != null)
Expand Down

0 comments on commit 11d42f2

Please sign in to comment.