Skip to content

Commit

Permalink
chore: add deprecation notices for commit logic on DeltaTable
Browse files Browse the repository at this point in the history
  • Loading branch information
roeap committed May 1, 2023
1 parent eda7b04 commit 174b6a7
Show file tree
Hide file tree
Showing 12 changed files with 104 additions and 95 deletions.
64 changes: 40 additions & 24 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ use deltalake::delta_datafusion::DeltaDataChecker;
use deltalake::operations::transaction::commit;
use deltalake::operations::vacuum::VacuumBuilder;
use deltalake::partitions::PartitionFilter;
use deltalake::{DeltaDataTypeLong, DeltaDataTypeTimestamp, DeltaTableMetaData, Invariant, Schema};
use deltalake::{
DeltaConfigKey, DeltaDataTypeLong, DeltaDataTypeTimestamp, DeltaOps, Invariant, Schema,
};
use pyo3::create_exception;
use pyo3::exceptions::PyException;
use pyo3::exceptions::PyValueError;
Expand All @@ -29,6 +31,8 @@ use pyo3::types::PyType;
use std::collections::HashMap;
use std::collections::HashSet;
use std::convert::TryFrom;
use std::future::IntoFuture;
use std::str::FromStr;
use std::sync::Arc;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
Expand Down Expand Up @@ -740,33 +744,45 @@ fn write_new_deltalake(
configuration: Option<HashMap<String, Option<String>>>,
storage_options: Option<HashMap<String, String>>,
) -> PyResult<()> {
let mut table = DeltaTableBuilder::from_uri(table_uri)
let table = DeltaTableBuilder::from_uri(table_uri)
.with_storage_options(storage_options.unwrap_or_default())
.build()
.map_err(PyDeltaTableError::from_raw)?;

let metadata = DeltaTableMetaData::new(
name,
description,
None, // Format
(&schema.0)
.try_into()
.map_err(PyDeltaTableError::from_arrow)?,
partition_by,
configuration.unwrap_or_default(),
);

let fut = table.create(
metadata,
action::Protocol {
min_reader_version: 1,
min_writer_version: 1, // TODO: Make sure we comply with protocol
},
None, // TODO
Some(add_actions.iter().map(|add| add.into()).collect()),
);

rt()?.block_on(fut).map_err(PyDeltaTableError::from_raw)?;
let schema: Schema = (&schema.0)
.try_into()
.map_err(PyDeltaTableError::from_arrow)?;

let configuration = configuration
.unwrap_or_default()
.into_iter()
.filter_map(|(key, value)| {
if let Ok(key) = DeltaConfigKey::from_str(&key) {
Some((key, value))
} else {
None
}
})
.collect();

let mut builder = DeltaOps(table)
.create()
.with_columns(schema.get_fields().clone())
.with_partition_columns(partition_by)
.with_configuration(configuration)
.with_actions(add_actions.iter().map(|add| Action::add(add.into())));

if let Some(name) = &name {
builder = builder.with_table_name(name);
};

if let Some(description) = &description {
builder = builder.with_comment(description);
};

rt()?
.block_on(builder.into_future())
.map_err(PyDeltaTableError::from_raw)?;

Ok(())
}
Expand Down
33 changes: 8 additions & 25 deletions rust/examples/recordbatch-writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
*/

use chrono::prelude::*;
use deltalake::action::*;
use deltalake::arrow::array::*;
use deltalake::arrow::record_batch::RecordBatch;
use deltalake::writer::{DeltaWriter, RecordBatchWriter};
Expand Down Expand Up @@ -75,8 +74,8 @@ struct WeatherRecord {
}

impl WeatherRecord {
fn schema() -> Schema {
Schema::new(vec![
fn columns() -> Vec<SchemaField> {
vec![
SchemaField::new(
"timestamp".to_string(),
SchemaDataType::primitive("timestamp".to_string()),
Expand All @@ -101,7 +100,7 @@ impl WeatherRecord {
true,
HashMap::new(),
),
])
]
}
}

Expand Down Expand Up @@ -189,27 +188,11 @@ fn convert_to_batch(table: &DeltaTable, records: &Vec<WeatherRecord>) -> RecordB
* Table in an existing directory that doesn't currently contain a Delta table
*/
async fn create_initialized_table(table_path: &Path) -> DeltaTable {
let mut table = DeltaTableBuilder::from_uri(table_path).build().unwrap();
let table_schema = WeatherRecord::schema();
let mut commit_info = serde_json::Map::<String, serde_json::Value>::new();
commit_info.insert(
"operation".to_string(),
serde_json::Value::String("CREATE TABLE".to_string()),
);
commit_info.insert(
"userName".to_string(),
serde_json::Value::String("test user".to_string()),
);

let protocol = Protocol {
min_reader_version: 1,
min_writer_version: 1,
};

let metadata = DeltaTableMetaData::new(None, None, None, table_schema, vec![], HashMap::new());

table
.create(metadata, protocol, Some(commit_info), None)
let table = DeltaOps::try_from_uri(table_path)
.await
.unwrap()
.create()
.with_columns(WeatherRecord::columns())
.await
.unwrap();

Expand Down
31 changes: 30 additions & 1 deletion rust/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1112,6 +1112,10 @@ impl DeltaTable {
}

/// Vacuum the delta table. See [`VacuumBuilder`] for more information.
#[deprecated(
since = "0.10.0",
note = "use DelaOps from operations module to create a Vacuum operation."
)]
pub async fn vacuum(
&mut self,
retention_hours: Option<u64>,
Expand All @@ -1133,6 +1137,11 @@ impl DeltaTable {
/// Creates a new DeltaTransaction for the DeltaTable.
/// The transaction holds a mutable reference to the DeltaTable, preventing other references
/// until the transaction is dropped.
#[deprecated(
since = "0.10.0",
note = "use 'commit' function from operations module to commit to Delta table."
)]
#[allow(deprecated)]
pub fn create_transaction(
&mut self,
options: Option<DeltaTransactionOptions>,
Expand All @@ -1145,6 +1154,11 @@ impl DeltaTable {
/// This is low-level transaction API. If user does not want to maintain the commit loop then
/// the `DeltaTransaction.commit` is desired to be used as it handles `try_commit_transaction`
/// with retry logic.
#[deprecated(
since = "0.10.0",
note = "use 'commit' function from operations module to commite to Delta table."
)]
#[allow(deprecated)]
pub async fn try_commit_transaction(
&mut self,
commit: &PreparedCommit,
Expand All @@ -1168,6 +1182,11 @@ impl DeltaTable {
}

/// Create a DeltaTable with version 0 given the provided MetaData, Protocol, and CommitInfo
#[deprecated(
since = "0.10.0",
note = "use DelaOps from operations module to create a Create operation."
)]
#[allow(deprecated)]
pub async fn create(
&mut self,
metadata: DeltaTableMetaData,
Expand Down Expand Up @@ -1322,12 +1341,17 @@ impl Default for DeltaTransactionOptions {
/// Please not that in case of non-retryable error the temporary commit file such as
/// `_delta_log/_commit_<uuid>.json` will orphaned in storage.
#[derive(Debug)]
#[deprecated(
since = "0.10.0",
note = "use 'commit' function from operations module to commit to Delta table."
)]
pub struct DeltaTransaction<'a> {
delta_table: &'a mut DeltaTable,
actions: Vec<Action>,
options: DeltaTransactionOptions,
}

#[allow(deprecated)]
impl<'a> DeltaTransaction<'a> {
/// Creates a new delta transaction.
/// Holds a mutable reference to the delta table to prevent outside mutation while a transaction commit is in progress.
Expand Down Expand Up @@ -1377,7 +1401,6 @@ impl<'a> DeltaTransaction<'a> {
// } else {
// IsolationLevel::Serializable
// };

let prepared_commit = self.prepare_commit(operation, app_metadata).await?;

// try to commit in a loop in case other writers write the next version first
Expand Down Expand Up @@ -1430,6 +1453,7 @@ impl<'a> DeltaTransaction<'a> {
Ok(PreparedCommit { uri: path })
}

#[allow(deprecated)]
async fn try_commit_loop(
&mut self,
commit: &PreparedCommit,
Expand Down Expand Up @@ -1473,6 +1497,10 @@ impl<'a> DeltaTransaction<'a> {
/// Holds the uri to prepared commit temporary file created with `DeltaTransaction.prepare_commit`.
/// Once created, the actual commit could be executed with `DeltaTransaction.try_commit`.
#[derive(Debug)]
#[deprecated(
since = "0.10.0",
note = "use 'commit' function from operations module to commit to Delta table."
)]
pub struct PreparedCommit {
uri: Path,
}
Expand Down Expand Up @@ -1624,6 +1652,7 @@ mod tests {
serde_json::Value::String("test user".to_string()),
);
// Action
#[allow(deprecated)]
dt.create(delta_md.clone(), protocol.clone(), Some(commit_info), None)
.await
.unwrap();
Expand Down
1 change: 1 addition & 0 deletions rust/src/delta_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::DeltaTableError;
/// Typed property keys that can be defined on a delta table
/// <https://docs.delta.io/latest/table-properties.html#delta-table-properties-reference>
/// <https://learn.microsoft.com/en-us/azure/databricks/delta/table-properties>
#[derive(PartialEq, Eq, Hash)]
pub enum DeltaConfigKey {
/// true for this Delta table to be append-only. If append-only,
/// existing records cannot be deleted, and existing values cannot be updated.
Expand Down
1 change: 1 addition & 0 deletions rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ pub mod writer;
pub use self::builder::*;
pub use self::data_catalog::{get_data_catalog, DataCatalog, DataCatalogError};
pub use self::delta::*;
pub use self::delta_config::*;
pub use self::partitions::*;
pub use self::schema::*;
pub use object_store::{path::Path, Error as ObjectStoreError, ObjectMeta, ObjectStore};
Expand Down
7 changes: 5 additions & 2 deletions rust/src/operations/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,11 @@ impl CreateBuilder {
}

/// Specify columns to append to schema
pub fn with_columns(mut self, columns: impl IntoIterator<Item = SchemaField>) -> Self {
self.columns.extend(columns);
pub fn with_columns(
mut self,
columns: impl IntoIterator<Item = impl Into<SchemaField>>,
) -> Self {
self.columns.extend(columns.into_iter().map(|c| c.into()));
self
}

Expand Down
1 change: 1 addition & 0 deletions rust/src/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ pub trait DeltaWriter<T> {
table: &mut DeltaTable,
) -> Result<DeltaDataTypeVersion, DeltaTableError> {
let mut adds = self.flush().await?;
#[allow(deprecated)]
let mut tx = table.create_transaction(None);
tx.add_actions(adds.drain(..).map(Action::add).collect());
let version = tx.commit(None, None).await?;
Expand Down
1 change: 1 addition & 0 deletions rust/src/writer/test_utils.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#![allow(deprecated)]
//! Utilities for writing unit tests
use super::*;
use crate::{
Expand Down
54 changes: 14 additions & 40 deletions rust/tests/command_optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,14 @@ use arrow::{
datatypes::{DataType, Field},
record_batch::RecordBatch,
};
use deltalake::action::{Action, Protocol, Remove};
use deltalake::builder::DeltaTableBuilder;
use deltalake::action::{Action, Remove};
use deltalake::operations::optimize::{create_merge_plan, MetricDetails, Metrics};
use deltalake::operations::DeltaOps;
use deltalake::writer::{DeltaWriter, RecordBatchWriter};
use deltalake::{
DeltaTable, DeltaTableError, DeltaTableMetaData, PartitionFilter, Schema, SchemaDataType,
SchemaField,
};
use deltalake::{DeltaTable, DeltaTableError, PartitionFilter, SchemaDataType, SchemaField};
use parquet::file::properties::WriterProperties;
use rand::prelude::*;
use serde_json::{json, Map, Value};
use serde_json::json;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
use std::{collections::HashMap, error::Error, sync::Arc};
Expand All @@ -29,7 +25,7 @@ struct Context {
}

async fn setup_test(partitioned: bool) -> Result<Context, Box<dyn Error>> {
let schema = Schema::new(vec![
let columns = vec![
SchemaField::new(
"x".to_owned(),
SchemaDataType::primitive("integer".to_owned()),
Expand All @@ -48,45 +44,22 @@ async fn setup_test(partitioned: bool) -> Result<Context, Box<dyn Error>> {
false,
HashMap::new(),
),
]);
];

let p = if partitioned {
let partition_columns = if partitioned {
vec!["date".to_owned()]
} else {
vec![]
};

let table_meta = DeltaTableMetaData::new(
Some("opt_table".to_owned()),
Some("Table for optimize tests".to_owned()),
None,
schema.clone(),
p,
HashMap::new(),
);

let tmp_dir = tempdir::TempDir::new("opt_table").unwrap();
let p = tmp_dir.path().to_str().to_owned().unwrap();
let mut dt = DeltaTableBuilder::from_uri(p).build()?;

let mut commit_info = Map::<String, Value>::new();

let protocol = Protocol {
min_reader_version: 1,
min_writer_version: 2,
};

commit_info.insert(
"operation".to_string(),
serde_json::Value::String("CREATE TABLE".to_string()),
);
dt.create(
table_meta.clone(),
protocol.clone(),
Some(commit_info),
None,
)
.await?;
let table_uri = tmp_dir.path().to_str().to_owned().unwrap();
let dt = DeltaOps::try_from_uri(table_uri)
.await?
.create()
.with_columns(columns)
.with_partition_columns(partition_columns)
.await?;

Ok(Context { tmp_dir, table: dt })
}
Expand Down Expand Up @@ -314,6 +287,7 @@ async fn test_conflict_for_remove_actions() -> Result<(), Box<dyn Error>> {
tags: Some(HashMap::new()),
};

#[allow(deprecated)]
let mut transaction = other_dt.create_transaction(None);
transaction.add_action(Action::remove(remove));
transaction.commit(None, None).await?;
Expand Down
2 changes: 1 addition & 1 deletion rust/tests/commit_info_format.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#[allow(dead_code)]
#![allow(dead_code, deprecated)]
mod fs_common;

use deltalake::action::{Action, DeltaOperation, SaveMode};
Expand Down
3 changes: 1 addition & 2 deletions rust/tests/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#![allow(dead_code)]
#![allow(unused_variables)]
#![allow(dead_code, unused_variables, deprecated)]

use bytes::Bytes;
use deltalake::action::{self, Add, Remove};
Expand Down
Loading

0 comments on commit 174b6a7

Please sign in to comment.