Skip to content

Commit

Permalink
fix: compatible to write to local file systems that do not support ha…
Browse files Browse the repository at this point in the history
…rd link (#1868)

compatible to write to local file systems that do not support hard link.

# Description

When we write to the local file system, sometimes hard link is not
supported, such as blobfuse, goofys, s3fs, so deal with it with
compatibility.

It is important to note that:
There is another problem with blobfuse, that is, when it comes to
rename, it will report errors. Because rename did not release the file
handle before.
See here for details: #1765

Arrow-rs is required to cooperate with the modification, for example:
https://github.com/GlareDB/arrow-rs/pull/2/files
Because object_store has been upgraded to 0.8, there are a lot of
breaking change, so I haven't changed this one for the time being. Will
fix it after upgrading to 0.8
#1858

# Related Issue(s)

#1765
 
#1376 

# Documentation
  • Loading branch information
RobinLin666 authored Mar 15, 2024
1 parent 456c6ce commit 9812bec
Show file tree
Hide file tree
Showing 12 changed files with 762 additions and 2 deletions.
4 changes: 3 additions & 1 deletion crates/deltalake/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@ rust-version.workspace = true
[package.metadata.docs.rs]
# We cannot use all_features because TLS features are mutually exclusive.
# We cannot use hdfs feature because it requires Java to be installed.
features = ["azure", "datafusion", "gcs", "hdfs", "json", "python", "s3", "unity-experimental"]
features = ["azure", "datafusion", "gcs", "hdfs", "json", "mount", "python", "s3", "unity-experimental"]

[dependencies]
deltalake-core = { version = "0.17.1", path = "../core" }
deltalake-aws = { version = "0.1.0", path = "../aws", default-features = false, optional = true }
deltalake-azure = { version = "0.1.0", path = "../azure", optional = true }
deltalake-gcp = { version = "0.1.0", path = "../gcp", optional = true }
deltalake-catalog-glue = { version = "0.1.0", path = "../catalog-glue", optional = true }
deltalake-mount = { version = "0.1.0", path = "../mount", optional = true }

[features]
# All of these features are just reflected into the core crate until that
Expand All @@ -34,6 +35,7 @@ gcs = ["deltalake-gcp"]
glue = ["deltalake-catalog-glue"]
hdfs = []
json = ["deltalake-core/json"]
mount = ["deltalake-mount"]
python = ["deltalake-core/python"]
s3-native-tls = ["deltalake-aws/native-tls"]
s3 = ["deltalake-aws/rustls"]
Expand Down
2 changes: 2 additions & 0 deletions crates/deltalake/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@ pub use deltalake_aws as aws;
pub use deltalake_azure as azure;
#[cfg(feature = "gcs")]
pub use deltalake_gcp as gcp;
#[cfg(feature = "mount")]
pub use deltalake_mount as mount;
43 changes: 43 additions & 0 deletions crates/mount/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
[package]
name = "deltalake-mount"
version = "0.1.0"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
edition.workspace = true
homepage.workspace = true
description.workspace = true
license.workspace = true
repository.workspace = true
rust-version.workspace = true

[dependencies]
deltalake-core = { version = "0.17.0", path = "../core", features = [
"datafusion",
] }
lazy_static = "1"
errno = "0.3"

# workspace depenndecies
async-trait = { workspace = true }
bytes = { workspace = true }
futures = { workspace = true }
tracing = { workspace = true }
object_store = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
regex = { workspace = true }
url = { workspace = true }

[dev-dependencies]
chrono = { workspace = true }
serial_test = "3"
deltalake-test = { path = "../test" }
pretty_env_logger = "0.5.0"
rand = "0.8"
serde_json = { workspace = true }
tempfile = "3"
fs_extra = "1.3.0"

[features]
integration_test = []
80 changes: 80 additions & 0 deletions crates/mount/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
//! Auxiliary module for generating a valig Mount configuration.
use std::collections::{hash_map::Entry, HashMap};
use std::str::FromStr;

use crate::error::{Error, Result};

/// Typed property keys that can be defined on a mounted path
#[derive(PartialEq, Eq, Hash, Clone, Debug, Copy)]
#[non_exhaustive]
pub enum MountConfigKey {
/// If set to "true", allows creating commits without concurrent writer protection.
/// Only safe if there is one writer to a given table.
AllowUnsafeRename,
}

impl AsRef<str> for MountConfigKey {
fn as_ref(&self) -> &str {
match self {
Self::AllowUnsafeRename => "mount_allow_unsafe_rename",
}
}
}

impl FromStr for MountConfigKey {
type Err = Error;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"mount_allow_unsafe_rename" | "allow_unsafe_rename" => Ok(Self::AllowUnsafeRename),
_ => Err(Error::UnknownConfigKey(s.to_string())),
}
}
}

/// Helper struct to create full configuration from passed options and environment
pub(crate) struct MountConfigHelper {
config: HashMap<MountConfigKey, String>,
env_config: HashMap<MountConfigKey, String>,
}

impl MountConfigHelper {
/// Create a new [`ConfigHelper`]
pub fn try_new(
config: impl IntoIterator<Item = (impl AsRef<str>, impl Into<String>)>,
) -> Result<Self> {
let mut env_config = HashMap::new();
for (os_key, os_value) in std::env::vars_os() {
if let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str()) {
if key.starts_with("MOUNT_") {
if let Ok(config_key) = MountConfigKey::from_str(&key.to_ascii_lowercase()) {
env_config.insert(config_key, value.to_string());
}
}
}
}

Ok(Self {
config: config
.into_iter()
.map(|(key, value)| Ok((MountConfigKey::from_str(key.as_ref())?, value.into())))
.collect::<Result<_, Error>>()?,
env_config,
})
}

/// Generate a cofiguration augmented with options from the environment
pub fn build(mut self) -> Result<HashMap<MountConfigKey, String>> {
// Add keys from the environment to the configuration, as e.g. client configuration options.
// NOTE We have to specifically configure omitting keys, since workload identity can
// work purely using defaults, but partial config may be present in the environment.
// Preference of conflicting configs (e.g. msi resource id vs. client id is handled in object store)
for key in self.env_config.keys() {
if let Entry::Vacant(e) = self.config.entry(*key) {
e.insert(self.env_config.get(key).unwrap().to_owned());
}
}

Ok(self.config)
}
}
33 changes: 33 additions & 0 deletions crates/mount/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use deltalake_core::errors::DeltaTableError;

pub(crate) type Result<T, E = Error> = std::result::Result<T, E>;

#[derive(thiserror::Error, Debug)]
pub enum Error {
#[allow(dead_code)]
#[error("failed to parse config: {0}")]
Parse(String),

/// Unknown configuration key
#[error("Unknown configuration key: {0}")]
UnknownConfigKey(String),

#[error("The `allow_unsafe_rename` parameter must be specified")]
AllowUnsafeRenameNotSpecified,

#[error(transparent)]
ObjectStore(#[from] object_store::Error),
}

impl From<Error> for DeltaTableError {
fn from(e: Error) -> Self {
match e {
Error::Parse(msg) => DeltaTableError::Generic(msg),
Error::UnknownConfigKey(msg) => DeltaTableError::Generic(msg),
Error::AllowUnsafeRenameNotSpecified => DeltaTableError::Generic(
"The `allow_unsafe_rename` parameter must be specified".to_string(),
),
Error::ObjectStore(e) => DeltaTableError::ObjectStore { source: e },
}
}
}
Loading

0 comments on commit 9812bec

Please sign in to comment.