Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add post query cycle script execution hook #32

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

agrawalreetika
Copy link
Member

@agrawalreetika agrawalreetika commented Nov 14, 2024

This PR includes 2 changes -

  • Add post-query cycle script execution hook
  • Add pre stage script execution hook
  • Add cache cleanup scripts for Presto coordinator and workers (General Util functions to invalidate different types of worker and coordinator caches

@@ -44,6 +44,8 @@ type Stage struct {
PostStageShellScripts []string `json:"post_stage_scripts,omitempty"`
// Run shell scripts after executing each query.
PostQueryShellScripts []string `json:"post_query_scripts,omitempty"`
// Run shell scripts after finishing full query cycle runs each query.
PostQueryCycleShellScripts []string `json:"post_query_cycle_scripts,omitempty"`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@steveburnett for doc

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A full cycle here means when we set cold_runs and warm_runs, each query in the benchmark will be run cold_runs + warm_runs times in total.

post_query_scripts will be called after each query execution, post_query_cycle_scripts will be called after all the cold_runs and warm_runs are done for a unique query.

sys.exit(-1)

file_path = sys.argv[1]
increment_file_value(file_path)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we touched this file in this script, then I guess the unit test assert result should be updated?

conn = create_connection(hostname, username, password, catalogName)
cur = conn.cursor()
cur.execute(query)
rows = cur.fetchall()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the error behavior?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it could be because of either the connection issue or query execution issue.
Let me wrap it under try-except

@agrawalreetika agrawalreetika force-pushed the cache-invalidation branch 2 times, most recently from 4ea4c28 to d90f64e Compare November 18, 2024 18:42
@agrawalreetika
Copy link
Member Author

@ethanyzhang Thanks for your review. I have addressed your earlier comments.

Also, I have added one more commit in this PR for adding one more hook to enable pre-stage script execution hook.
Please let me know if that needs to be in a separate PR or If It's Okay to have it in this same PR for review.

import paramiko
import argparse

def create_connection(host_name, user_name, user_password, db_name):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ethanyzhang how does pbench manage the utility functions like create_connection? It also appears in benchmarks/scripts/cache_cleaning_coordinator_post_query.py

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@agrawalreetika I think it may be better to move create_connection to a different file then include it in both the worker script and the coordinator script.

Say this file is called presto_utils.py, then here you can do:

from presto_utils import create_connection

@@ -0,0 +1,11 @@
import sys

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is for the two .py file names. Why did you add my_ in front? I think it's better to remove it

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this folder is for testing and demoing.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does "my" stand for?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No meaning. it is for pbench test, not for production.

@@ -0,0 +1,80 @@
import prestodb

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cache cleaning should be done before (not after) each query cycle (cold+warm runs). And it should only be applied to TPC Power Tests.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So in the current implementation, I added 2 hooks -

  • post_query_scripts: run this script before this stage is started
  • pre_stage_scripts run this script after each query in this stage is complete
    Which will eventually trigger the clean-up script, once when the overall query execution starts as part pre_stage_scripts and then using post_query_scripts after each query cycle (cold+warm runs) the same clean-up script would be executed. So by using both the hooks, I was running clean up script.

But now with this script name, I think it looks a little confusing. I will add one more hook named pre_query_scripts just call that before each query cycle (cold+warm runs) for each query and rename these clean up script from post* to pre*

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @agrawalreetika I think the clean cache scripts should be called in every pre_query_cycle_scripts, not post_query_scripts and pre_stage_scripts. So we want this order suppose cold runs=1 and warm runs=1
clean cache, q1, q1, clean cache, q2, q2, clean cache, q3, q3, ...

Copy link
Collaborator

@ethanyzhang ethanyzhang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @agrawalreetika, I developed some questions about error propagation. See my comments and I am happy to discuss further.


def clean_directory_list_cache(hostname, username, password, catalogName):
query = "CALL " + catalogName + ".system.invalidate_directory_list_cache()"
conn = create_connection(hostname, username, password, catalogName)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at your implementation here it seems like this can just use execute_presto_select_query.
Maybe rename execute_presto_select_query to execute_presto_query.


def clean_metastore_cache(hostname, username, password, catalogName):
query = "CALL " + catalogName + ".system.invalidate_metastore_cache()"
conn = create_connection(hostname, username, password, catalogName)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

stage/stage.go Outdated
// run post query cycle shell scripts
postQueryCycleErr := s.runShellScripts(ctx, s.PostQueryCycleShellScripts)
if retErr == nil {
retErr = postQueryCycleErr
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, if the query was executed successfully, but something went wrong when you run the script to clean up the cache, what should we do? I guess we should not ignore the cache cleanup error because it will give us false benchmark signals?

If so, the way for the PostQueryCycleShellScripts errors to propagate is to return a non-zero return value from the script using sys.exit(1). Then this will cause postQueryCycleErr here to be set as an execution error.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ethanyzhang @agrawalreetika As we haven't tried it in real runs yet, this may or may not happen. But assume it happens because of some non-recoverable errors, the users won't be able to run any benchmark at all. Maybe we can consider issuing a warning but still continue the queries?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we see all benchmark runs will run this script? I think probably we need two modes: with and without this script.
For benchmarks run with this script, we need to make sure the script runs successfully

if rows[0][0] == True:
print("Directory list cache clean up is successfull for", catalogName)
else:
print("Directory list cache clean up is failed for", catalogName)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comments below at stage.go for adding error propagation here.

if rows[0][0] == True:
print("Metastore cache clean up is successfull for", catalogName)
else:
print("Metastore cache clean up is failed for", catalogName)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for error propagation

import paramiko
import argparse

def create_connection(host_name, user_name, user_password, db_name):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@agrawalreetika I think it may be better to move create_connection to a different file then include it in both the worker script and the coordinator script.

Say this file is called presto_utils.py, then here you can do:

from presto_utils import create_connection

cleanup_worker_disk_cache(worker_public_ips, native_cache_directory_worker, "centos", args.sshkey)

if is_worker_os_cache_cleanup_enabled:
cleanup_worker_os_cache(worker_public_ips, "centos", args.sshkey)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar question about propagating errors that could happen here back to pbench. The benchmark should fail if we actually couldn't effectively clean up the cache.

@@ -0,0 +1,11 @@
import sys
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this folder is for testing and demoing.

@agrawalreetika
Copy link
Member Author

@ethanyzhang @yingsu00 Thanks for your review. I have made the changes based on the review comments. Please check.

def cleanup_worker_disk_cache(worker_public_ips, directory_to_cleanup, login_user, ssh_key_path):
for worker_ip in worker_public_ips:
try:
ssh = paramiko.SSHClient()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be better to extract the SSH connection to a utility function

# Directory list cache clean up
if is_list_cache_cleanup_enabled:
for catalogName in catalog_list:
print("Cleaning up directory list cache for", catalogName)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be a space after "for"? Same for other occurrances

for catalogName in catalog_list:
print("Cleaning up directory list cache for", catalogName)
rows = clean_directory_list_cache(args.host, args.username, args.password, catalogName)
print("directory_list_cache_cleanup_query Query Result:", rows)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

space after "Result:"?

is_worker_os_cache_cleanup_enabled = True

if is_worker_disk_cache_cleanup_enabled:
native_cache_directory_worker = "/home/centos/presto/async_data_cache"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this work on Ubuntu?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currenlty pbench is getting called on Presto clusters, which runs on CentOs, where this is tested.

@@ -0,0 +1,114 @@
from mysql_utils import create_connection
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this being called?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@agrawalreetika agrawalreetika force-pushed the cache-invalidation branch 4 times, most recently from 339f67e to 4c9dc6f Compare December 30, 2024 07:39
is_metadata_cache_cleanup_enabled = False

# Directory list cache clean up
if is_list_cache_cleanup_enabled:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@agrawalreetika Is it possible to attach the output of these? What would the output look like if output rows are multiple lines?


def clean_directory_list_cache(hostname, username, password, catalogName):
query = "CALL " + catalogName + ".system.invalidate_directory_list_cache()"
conn = create_connection(hostname, username, password, catalogName)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we extract line 7-11 to a util function?


def clean_metastore_cache(hostname, username, password, catalogName):
query = "CALL " + catalogName + ".system.invalidate_metastore_cache()"
conn = create_connection(hostname, username, password, catalogName)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same as the previous

db_name=database
)

clusterName = args.clustername

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we use Camel notation or snake_case notation? we need to unify them

args = parser.parse_args()

with open(args.mysql, 'r') as file:
mysqlDetails = json.load(file)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Camal?


# Directory list cache clean up
if is_list_cache_cleanup_enabled:
for cluster_name in catalog_list:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why catalog_list elements became cluster name?

stage/stage.go Outdated
// run post query cycle shell scripts
postQueryCycleErr := s.runShellScripts(ctx, s.PostQueryCycleShellScripts)
if retErr == nil {
retErr = postQueryCycleErr
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we see all benchmark runs will run this script? I think probably we need two modes: with and without this script.
For benchmarks run with this script, we need to make sure the script runs successfully

@@ -0,0 +1,11 @@
import sys
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No meaning. it is for pbench test, not for production.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants