From ed5a79058e951147b40691c92615b3548273bb64 Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Wed, 8 Jan 2025 18:15:33 +0100 Subject: [PATCH] chore: lakefs rust integration test Signed-off-by: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> --- .github/workflows/build.yml | 37 ++++++++++++++++++++ .github/workflows/codecov.yml | 2 +- crates/core/src/operations/optimize.rs | 2 +- crates/lakefs/Cargo.toml | 1 + crates/lakefs/tests/context.rs | 47 +++++++++++++++++++++----- crates/lakefs/tests/integration.rs | 16 +++++++++ 6 files changed, 94 insertions(+), 11 deletions(-) create mode 100644 crates/lakefs/tests/integration.rs diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index e206ab82e1..34be99d214 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -139,3 +139,40 @@ jobs: - name: Run tests with native-tls run: | cargo test --no-default-features --features integration_test,s3-native-tls,datafusion + + integration_test_lakefs: + name: Integration Tests (LakeFS v1.48) + runs-on: ubuntu-latest + env: + CARGO_INCREMENTAL: 0 + # Disable full debug symbol generation to speed up CI build and keep memory down + # + RUSTFLAGS: "-C debuginfo=line-tables-only" + # https://github.com/rust-lang/cargo/issues/10280 + CARGO_NET_GIT_FETCH_WITH_CLI: "true" + RUST_BACKTRACE: "1" + RUST_LOG: debug + + steps: + - uses: actions/checkout@v3 + + - name: Install minimal stable with clippy and rustfmt + uses: actions-rs/toolchain@v1 + with: + profile: default + toolchain: '1.81' + override: true + + - name: Download Lakectl + run: | + wget -q https://github.com/treeverse/lakeFS/releases/download/v1.48.0/lakeFS_1.48.0_Linux_x86_64.tar.gz + tar -xf lakeFS_1.48.0_Linux_x86_64.tar.gz -C $GITHUB_WORKSPACE + echo "$GITHUB_WORKSPACE/lakectl" >> $GITHUB_PATH + + - name: Start emulated services + run: docker compose -f docker-compose-lakefs.yml up -d + + - name: Run tests with rustls (default) + run: | + cargo test --features integration_test,lakefs,datafusion + diff --git a/.github/workflows/codecov.yml b/.github/workflows/codecov.yml index f0e68536fc..69212c55b0 100644 --- a/.github/workflows/codecov.yml +++ b/.github/workflows/codecov.yml @@ -26,7 +26,7 @@ jobs: uses: taiki-e/install-action@cargo-llvm-cov - uses: Swatinem/rust-cache@v2 - name: Generate code coverage - run: cargo llvm-cov --features ${DEFAULT_FEATURES} --workspace --codecov --output-path codecov.json -- --skip read_table_version_hdfs + run: cargo llvm-cov --features ${DEFAULT_FEATURES} --workspace --codecov --output-path codecov.json -- --skip read_table_version_hdfs --skip test_read_tables_lakefs - name: Upload coverage to Codecov uses: codecov/codecov-action@v4 with: diff --git a/crates/core/src/operations/optimize.rs b/crates/core/src/operations/optimize.rs index 4180bc1e46..a7d936e963 100644 --- a/crates/core/src/operations/optimize.rs +++ b/crates/core/src/operations/optimize.rs @@ -748,7 +748,7 @@ impl MergePlan { #[cfg(not(feature = "datafusion"))] let exec_context = Arc::new(zorder::ZOrderExecContext::new( zorder_columns, - log_store.object_store(), + log_store.object_store(Some(operation_id)), // If there aren't enough bins to use all threads, then instead // use threads within the bins. This is important for the case where // the table is un-partitioned, in which case the entire table is just diff --git a/crates/lakefs/Cargo.toml b/crates/lakefs/Cargo.toml index 374d856bb8..e731844c6d 100644 --- a/crates/lakefs/Cargo.toml +++ b/crates/lakefs/Cargo.toml @@ -39,6 +39,7 @@ serial_test = "3" deltalake-test = { path = "../test" } pretty_env_logger = "0.5.0" rand = "0.8" +which = "7" [features] diff --git a/crates/lakefs/tests/context.rs b/crates/lakefs/tests/context.rs index af73b2dc7d..ea3f52ca7f 100644 --- a/crates/lakefs/tests/context.rs +++ b/crates/lakefs/tests/context.rs @@ -1,10 +1,7 @@ -// #![cfg(feature = "integration_test")] +#![cfg(feature = "integration_test")] use deltalake_lakefs::register_handlers; use deltalake_test::utils::*; -use std::{ - collections::HashSet, - process::{Command, ExitStatus}, -}; +use std::process::{Command, ExitStatus}; use which::which; @@ -25,21 +22,53 @@ impl StorageIntegration for LakeFSIntegration { set_env_if_not_set("access_key_id", "LAKEFSID"); set_env_if_not_set("secret_access_key", "LAKEFSKEY"); set_env_if_not_set("allow_http", "true"); + + set_env_if_not_set("LAKECTL_CREDENTIALS_ACCESS_KEY_ID", "LAKEFSID"); + set_env_if_not_set("LAKECTL_CREDENTIALS_SECRET_ACCESS_KEY", "LAKEFSKEY"); + set_env_if_not_set("LAKECTL_SERVER_ENDPOINT_URL", "http://127.0.0.1:8000"); } fn create_bucket(&self) -> std::io::Result { - Ok(()) + // Bucket is already created in docker-compose + Ok(ExitStatus::default()) } fn bucket_name(&self) -> String { - "bronze" + "bronze".to_string() } fn root_uri(&self) -> String { + // Default branch is always main format!("lakefs://{}/main", self.bucket_name()) } fn copy_directory(&self, source: &str, destination: &str) -> std::io::Result { - + println!( + "Copy directory called with {} {}", + source, + &format!("{}/{}", self.root_uri(), destination) + ); + let lakectl = which("lakectl").expect("Failed to find lakectl executable"); + + // Upload files to branch + Command::new(lakectl.clone()) + .args([ + "fs", + "upload", + "-r", + "--source", + &format!("{}/", source), + &format!("{}/{}/", self.root_uri(), destination), + ]) + .status()?; + + // Commit changes + Command::new(lakectl) + .args([ + "commit", + &format!("{}/", self.root_uri()), + "--allow-empty-message", + ]) + .status() } -} \ No newline at end of file +} diff --git a/crates/lakefs/tests/integration.rs b/crates/lakefs/tests/integration.rs new file mode 100644 index 0000000000..0fdea5572f --- /dev/null +++ b/crates/lakefs/tests/integration.rs @@ -0,0 +1,16 @@ +#![cfg(feature = "integration_test")] +use deltalake_test::{test_read_tables, IntegrationContext, TestResult}; +use serial_test::serial; + +mod context; +use context::*; +// +#[tokio::test] +#[serial] +async fn test_read_tables_lakefs() -> TestResult { + let context = IntegrationContext::new(Box::::default())?; + + test_read_tables(&context).await?; + + Ok(()) +}