Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: time travel when checkpointed and logs removed #2389

Merged
merged 10 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 26 additions & 6 deletions crates/core/src/table/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
//! Delta Table read and write implementation

use std::cmp::Ordering;
use std::cmp::{min, Ordering};
use std::collections::HashMap;
use std::fmt;
use std::fmt::Formatter;

use chrono::{DateTime, Utc};
use futures::TryStreamExt;
use futures::{StreamExt, TryStreamExt};
use object_store::{path::Path, ObjectStore};
use serde::de::{Error, SeqAccess, Visitor};
use serde::ser::SerializeSeq;
Expand All @@ -18,7 +18,7 @@ use self::state::DeltaTableState;
use crate::kernel::{
Action, CommitInfo, DataCheck, DataType, LogicalFile, Metadata, Protocol, StructType,
};
use crate::logstore::{self, LogStoreConfig, LogStoreRef};
use crate::logstore::{self, extract_version_from_filename, LogStoreConfig, LogStoreRef};
use crate::partitions::PartitionFilter;
use crate::storage::{commit_uri_from_version, ObjectStoreRef};
use crate::{DeltaResult, DeltaTableError};
Expand Down Expand Up @@ -514,9 +514,29 @@ impl DeltaTable {
&mut self,
datetime: DateTime<Utc>,
) -> Result<(), DeltaTableError> {
let mut min_version = 0;
let mut min_version: i64 = -1;
let log_store = self.log_store();
let prefix = Some(log_store.log_path());
let offset_path = commit_uri_from_version(min_version);
let object_store = log_store.object_store();
let mut files = object_store.list_with_offset(prefix, &offset_path);

while let Some(obj_meta) = files.next().await {
let obj_meta = obj_meta?;
if let Some(log_version) = extract_version_from_filename(obj_meta.location.as_ref()) {
if min_version == -1 {
min_version = log_version
} else {
min_version = min(min_version, log_version);
}
}
if min_version == 0 {
break;
}
}
let mut max_version = self.get_latest_version().await?;
let mut version = min_version;
let lowest_table_version = min_version;
let target_ts = datetime.timestamp_millis();

// binary search
Expand All @@ -538,8 +558,8 @@ impl DeltaTable {
}
}

if version < 0 {
version = 0;
if version < lowest_table_version {
version = lowest_table_version;
}

self.load_version(version).await
Expand Down
48 changes: 47 additions & 1 deletion python/tests/test_table_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from pathlib import Path
from threading import Barrier, Thread
from types import SimpleNamespace
from typing import Any
from typing import Any, List, Tuple
from unittest.mock import Mock

from packaging import version
Expand Down Expand Up @@ -101,6 +101,52 @@ def test_load_as_version_datetime(date_value: str, expected_version):
assert dt.version() == expected_version


@pytest.mark.parametrize(
["date_value", "expected_version", "log_mtime_pairs"],
[
("2020-05-01T00:47:31-07:00", 1, [("00000000000000000000.json", 158839841.0)]),
(
"2020-05-02T22:47:31-07:00",
2,
[
("00000000000000000000.json", 158839841.0),
("00000000000000000001.json", 1588484851.0),
],
),
],
)
def test_load_as_version_datetime_with_logs_removed(
tmp_path,
sample_table,
date_value: str,
expected_version,
log_mtime_pairs: List[Tuple[str, int]],
):
log_path = tmp_path / "_delta_log"
for i in range(6):
write_deltalake(tmp_path, data=sample_table, mode="append")

for file_name, dt_epoch in log_mtime_pairs:
file_path = log_path / file_name
print(file_path)
os.utime(file_path, (dt_epoch, dt_epoch))

dt = DeltaTable(tmp_path, version=expected_version)
dt.create_checkpoint()
file = log_path / f"0000000000000000000{expected_version}.checkpoint.parquet"
assert file.exists()
dt.cleanup_metadata()

file = log_path / f"0000000000000000000{expected_version-1}.json"
assert not file.exists()
dt = DeltaTable(tmp_path)
dt.load_as_version(date_value)
assert dt.version() == expected_version
dt = DeltaTable(tmp_path)
dt.load_as_version(datetime.fromisoformat(date_value))
assert dt.version() == expected_version


def test_load_as_version_datetime_bad_format():
table_path = "../crates/test/tests/data/simple_table"
dt = DeltaTable(table_path)
Expand Down
Loading