From 503440285ece75f364934a7d6d9466cd31a6597f Mon Sep 17 00:00:00 2001 From: zhangbowen-coder <2439796518@qq.com> Date: Sun, 19 Oct 2025 06:29:01 +0000 Subject: [PATCH 1/2] All functions needing to be replaced have been replaced. --- pandas/tests/io/test_common.py | 203 ++++++++++++++++---------------- pandas/tests/io/test_feather.py | 85 ++++++------- 2 files changed, 146 insertions(+), 142 deletions(-) diff --git a/pandas/tests/io/test_common.py b/pandas/tests/io/test_common.py index ab27fda8dcdf5..20b3103b74b9b 100644 --- a/pandas/tests/io/test_common.py +++ b/pandas/tests/io/test_common.py @@ -86,12 +86,11 @@ def test_stringify_path_fspath(self): result = icom.stringify_path(p) assert result == "foo/bar.csv" - def test_stringify_file_and_path_like(self): + def test_stringify_file_and_path_like(self, temp_file): # GH 38125: do not stringify file objects that are also path-like fsspec = pytest.importorskip("fsspec") - with tm.ensure_clean() as path: - with fsspec.open(f"file://{path}", mode="wb") as fsspec_obj: - assert fsspec_obj == icom.stringify_path(fsspec_obj) + with fsspec.open(f"file://{temp_file}", mode="wb") as fsspec_obj: + assert fsspec_obj == icom.stringify_path(fsspec_obj) @pytest.mark.parametrize("path_type", [str, CustomFSPath, Path]) def test_infer_compression_from_path(self, compression_format, path_type): @@ -338,49 +337,53 @@ def test_read_fspath_all(self, reader, module, path, datapath): ("to_stata", {"time_stamp": pd.to_datetime("2019-01-01 00:00")}, "os"), ], ) - def test_write_fspath_all(self, writer_name, writer_kwargs, module): + def test_write_fspath_all(self, writer_name, writer_kwargs, module, tmp_path): if writer_name in ["to_latex"]: # uses Styler implementation pytest.importorskip("jinja2") - p1 = tm.ensure_clean("string") - p2 = tm.ensure_clean("fspath") + p1 = tmp_path / "string" + p2 = tmp_path / "fspath" df = pd.DataFrame({"A": [1, 2]}) - with p1 as string, p2 as fspath: - pytest.importorskip(module) - mypath = CustomFSPath(fspath) - writer = getattr(df, writer_name) - - writer(string, **writer_kwargs) - writer(mypath, **writer_kwargs) - with open(string, "rb") as f_str, open(fspath, "rb") as f_path: - if writer_name == "to_excel": - # binary representation of excel contains time creation - # data that causes flaky CI failures - result = pd.read_excel(f_str, **writer_kwargs) - expected = pd.read_excel(f_path, **writer_kwargs) - tm.assert_frame_equal(result, expected) - else: - result = f_str.read() - expected = f_path.read() - assert result == expected - - def test_write_fspath_hdf5(self): + string = str(p1) + fspath = str(p2) # if hasattr(p2, '__fspath__') else p2 + + pytest.importorskip(module) + mypath = CustomFSPath(fspath) + writer = getattr(df, writer_name) + + writer(string, **writer_kwargs) + writer(mypath, **writer_kwargs) + with open(string, "rb") as f_str, open(fspath, "rb") as f_path: + if writer_name == "to_excel": + # binary representation of excel contains time creation + # data that causes flaky CI failures + result = pd.read_excel(f_str, **writer_kwargs) + expected = pd.read_excel(f_path, **writer_kwargs) + tm.assert_frame_equal(result, expected) + else: + result = f_str.read() + expected = f_path.read() + assert result == expected + + def test_write_fspath_hdf5(self, tmp_path): # Same test as write_fspath_all, except HDF5 files aren't # necessarily byte-for-byte identical for a given dataframe, so we'll # have to read and compare equality pytest.importorskip("tables") df = pd.DataFrame({"A": [1, 2]}) - p1 = tm.ensure_clean("string") - p2 = tm.ensure_clean("fspath") + p1 = tmp_path / "string" + p2 = tmp_path / "fspath" + + string = str(p1) + fspath = str(p2) - with p1 as string, p2 as fspath: - mypath = CustomFSPath(fspath) - df.to_hdf(mypath, key="bar") - df.to_hdf(string, key="bar") + mypath = CustomFSPath(fspath) + df.to_hdf(mypath, key="bar") + df.to_hdf(string, key="bar") - result = pd.read_hdf(fspath, key="bar") - expected = pd.read_hdf(string, key="bar") + result = pd.read_hdf(fspath, key="bar") + expected = pd.read_hdf(string, key="bar") tm.assert_frame_equal(result, expected) @@ -432,35 +435,35 @@ def test_next(self, mmap_file): with pytest.raises(StopIteration, match=r"^$"): next(wrapper) - def test_unknown_engine(self): - with tm.ensure_clean() as path: - df = pd.DataFrame( - 1.1 * np.arange(120).reshape((30, 4)), - columns=pd.Index(list("ABCD")), - index=pd.Index([f"i-{i}" for i in range(30)]), - ) - df.to_csv(path) - with pytest.raises(ValueError, match="Unknown engine"): - pd.read_csv(path, engine="pyt") - - def test_binary_mode(self): + def test_unknown_engine(self, temp_file): + path = temp_file + df = pd.DataFrame( + 1.1 * np.arange(120).reshape((30, 4)), + columns=pd.Index(list("ABCD")), + index=pd.Index([f"i-{i}" for i in range(30)]), + ) + df.to_csv(path) + with pytest.raises(ValueError, match="Unknown engine"): + pd.read_csv(path, engine="pyt") + + def test_binary_mode(self, temp_file): """ 'encoding' shouldn't be passed to 'open' in binary mode. GH 35058 """ - with tm.ensure_clean() as path: - df = pd.DataFrame( - 1.1 * np.arange(120).reshape((30, 4)), - columns=pd.Index(list("ABCD")), - index=pd.Index([f"i-{i}" for i in range(30)]), - ) - df.to_csv(path, mode="w+b") - tm.assert_frame_equal(df, pd.read_csv(path, index_col=0)) + path = temp_file + df = pd.DataFrame( + 1.1 * np.arange(120).reshape((30, 4)), + columns=pd.Index(list("ABCD")), + index=pd.Index([f"i-{i}" for i in range(30)]), + ) + df.to_csv(path, mode="w+b") + tm.assert_frame_equal(df, pd.read_csv(path, index_col=0)) @pytest.mark.parametrize("encoding", ["utf-16", "utf-32"]) @pytest.mark.parametrize("compression_", ["bz2", "xz"]) - def test_warning_missing_utf_bom(self, encoding, compression_): + def test_warning_missing_utf_bom(self, encoding, compression_, temp_file): """ bz2 and xz do not write the byte order mark (BOM) for utf-16/32. @@ -473,17 +476,17 @@ def test_warning_missing_utf_bom(self, encoding, compression_): columns=pd.Index(list("ABCD")), index=pd.Index([f"i-{i}" for i in range(30)]), ) - with tm.ensure_clean() as path: - with tm.assert_produces_warning(UnicodeWarning, match="byte order mark"): - df.to_csv(path, compression=compression_, encoding=encoding) - - # reading should fail (otherwise we wouldn't need the warning) - msg = ( - r"UTF-\d+ stream does not start with BOM|" - r"'utf-\d+' codec can't decode byte" - ) - with pytest.raises(UnicodeError, match=msg): - pd.read_csv(path, compression=compression_, encoding=encoding) + path = temp_file + with tm.assert_produces_warning(UnicodeWarning, match="byte order mark"): + df.to_csv(path, compression=compression_, encoding=encoding) + + # reading should fail (otherwise we wouldn't need the warning) + msg = ( + r"UTF-\d+ stream does not start with BOM|" + r"'utf-\d+' codec can't decode byte" + ) + with pytest.raises(UnicodeError, match=msg): + pd.read_csv(path, compression=compression_, encoding=encoding) def test_is_fsspec_url(): @@ -514,38 +517,39 @@ def test_is_fsspec_url_chained(): @pytest.mark.parametrize("format", ["csv", "json"]) -def test_codecs_encoding(format): +def test_codecs_encoding(format, temp_file): # GH39247 expected = pd.DataFrame( 1.1 * np.arange(120).reshape((30, 4)), columns=pd.Index(list("ABCD")), index=pd.Index([f"i-{i}" for i in range(30)]), ) - with tm.ensure_clean() as path: - with open(path, mode="w", encoding="utf-8") as handle: - getattr(expected, f"to_{format}")(handle) - with open(path, encoding="utf-8") as handle: - if format == "csv": - df = pd.read_csv(handle, index_col=0) - else: - df = pd.read_json(handle) + + path = temp_file + with open(path, mode="w", encoding="utf-8") as handle: + getattr(expected, f"to_{format}")(handle) + with open(path, encoding="utf-8") as handle: + if format == "csv": + df = pd.read_csv(handle, index_col=0) + else: + df = pd.read_json(handle) tm.assert_frame_equal(expected, df) -def test_codecs_get_writer_reader(): +def test_codecs_get_writer_reader(temp_file): # GH39247 expected = pd.DataFrame( 1.1 * np.arange(120).reshape((30, 4)), columns=pd.Index(list("ABCD")), index=pd.Index([f"i-{i}" for i in range(30)]), ) - with tm.ensure_clean() as path: - with open(path, "wb") as handle: - with codecs.getwriter("utf-8")(handle) as encoded: - expected.to_csv(encoded) - with open(path, "rb") as handle: - with codecs.getreader("utf-8")(handle) as encoded: - df = pd.read_csv(encoded, index_col=0) + path = temp_file + with open(path, "wb") as handle: + with codecs.getwriter("utf-8")(handle) as encoded: + expected.to_csv(encoded) + with open(path, "rb") as handle: + with codecs.getreader("utf-8")(handle) as encoded: + df = pd.read_csv(encoded, index_col=0) tm.assert_frame_equal(expected, df) @@ -572,7 +576,7 @@ def test_explicit_encoding(io_class, mode, msg): @pytest.mark.parametrize("encoding_errors", ["strict", "replace"]) @pytest.mark.parametrize("format", ["csv", "json"]) -def test_encoding_errors(encoding_errors, format): +def test_encoding_errors(encoding_errors, format, temp_file): # GH39450 msg = "'utf-8' codec can't decode byte" bad_encoding = b"\xe4" @@ -591,18 +595,18 @@ def test_encoding_errors(encoding_errors, format): + b'"}}' ) reader = partial(pd.read_json, orient="index") - with tm.ensure_clean() as path: - file = Path(path) - file.write_bytes(content) + path = temp_file + file = Path(path) + file.write_bytes(content) - if encoding_errors != "replace": - with pytest.raises(UnicodeDecodeError, match=msg): - reader(path, encoding_errors=encoding_errors) - else: - df = reader(path, encoding_errors=encoding_errors) - decoded = bad_encoding.decode(errors=encoding_errors) - expected = pd.DataFrame({decoded: [decoded]}, index=[decoded * 2]) - tm.assert_frame_equal(df, expected) + if encoding_errors != "replace": + with pytest.raises(UnicodeDecodeError, match=msg): + reader(path, encoding_errors=encoding_errors) + else: + df = reader(path, encoding_errors=encoding_errors) + decoded = bad_encoding.decode(errors=encoding_errors) + expected = pd.DataFrame({decoded: [decoded]}, index=[decoded * 2]) + tm.assert_frame_equal(df, expected) @pytest.mark.parametrize("encoding_errors", [0, None]) @@ -616,11 +620,10 @@ def test_encoding_errors_badtype(encoding_errors): reader(content) -def test_bad_encdoing_errors(): +def test_bad_encdoing_errors(temp_file): # GH 39777 - with tm.ensure_clean() as path: - with pytest.raises(LookupError, match="unknown error handler name"): - icom.get_handle(path, "w", errors="bad") + with pytest.raises(LookupError, match="unknown error handler name"): + icom.get_handle(temp_file, "w", errors="bad") @pytest.mark.skipif(WASM, reason="limited file system access on WASM") diff --git a/pandas/tests/io/test_feather.py b/pandas/tests/io/test_feather.py index 904c3a047bab2..540d676391d32 100644 --- a/pandas/tests/io/test_feather.py +++ b/pandas/tests/io/test_feather.py @@ -26,36 +26,36 @@ @pytest.mark.single_cpu class TestFeather: - def check_error_on_write(self, df, exc, err_msg): + def check_error_on_write(self, df, exc, err_msg, temp_file): # check that we are raising the exception # on writing with pytest.raises(exc, match=err_msg): - with tm.ensure_clean() as path: - to_feather(df, path) + to_feather(df, temp_file) - def check_external_error_on_write(self, df): + def check_external_error_on_write(self, df, temp_file): # check that we are raising the exception # on writing with tm.external_error_raised(Exception): - with tm.ensure_clean() as path: - to_feather(df, path) + to_feather(df, temp_file) - def check_round_trip(self, df, expected=None, write_kwargs=None, **read_kwargs): + def check_round_trip( + self, df, temp_file, expected=None, write_kwargs=None, **read_kwargs + ): if write_kwargs is None: write_kwargs = {} if expected is None: expected = df.copy() - with tm.ensure_clean() as path: - to_feather(df, path, **write_kwargs) + path = temp_file + to_feather(df, path, **write_kwargs) - result = read_feather(path, **read_kwargs) + result = read_feather(path, **read_kwargs) - tm.assert_frame_equal(result, expected) + tm.assert_frame_equal(result, expected) - def test_error(self): + def test_error(self, temp_file): msg = "feather only support IO with DataFrames" for obj in [ pd.Series([1, 2, 3]), @@ -64,9 +64,9 @@ def test_error(self): pd.Timestamp("20130101"), np.array([1, 2, 3]), ]: - self.check_error_on_write(obj, ValueError, msg) + self.check_error_on_write(obj, ValueError, msg, temp_file) - def test_basic(self): + def test_basic(self, temp_file): tz = zoneinfo.ZoneInfo("US/Eastern") df = pd.DataFrame( { @@ -103,15 +103,15 @@ def test_basic(self): expected = df.copy() expected.loc[1, "bool_with_null"] = None - self.check_round_trip(df, expected=expected) + self.check_round_trip(df, temp_file, expected=expected) - def test_duplicate_columns(self): + def test_duplicate_columns(self, temp_file): # https://github.com/wesm/feather/issues/53 # not currently able to handle duplicate columns df = pd.DataFrame(np.arange(12).reshape(4, 3), columns=list("aaa")).copy() - self.check_external_error_on_write(df) + self.check_external_error_on_write(df, temp_file) - def test_read_columns(self): + def test_read_columns(self, temp_file): # GH 24025 df = pd.DataFrame( { @@ -122,23 +122,23 @@ def test_read_columns(self): } ) columns = ["col1", "col3"] - self.check_round_trip(df, expected=df[columns], columns=columns) + self.check_round_trip(df, temp_file, expected=df[columns], columns=columns) - def test_read_columns_different_order(self): + def test_read_columns_different_order(self, temp_file): # GH 33878 df = pd.DataFrame({"A": [1, 2], "B": ["x", "y"], "C": [True, False]}) expected = df[["B", "A"]] - self.check_round_trip(df, expected, columns=["B", "A"]) + self.check_round_trip(df, temp_file, expected, columns=["B", "A"]) - def test_unsupported_other(self): + def test_unsupported_other(self, temp_file): # mixed python objects df = pd.DataFrame({"a": ["a", 1, 2.0]}) - self.check_external_error_on_write(df) + self.check_external_error_on_write(df, temp_file) - def test_rw_use_threads(self): + def test_rw_use_threads(self, temp_file): df = pd.DataFrame({"A": np.arange(100000)}) - self.check_round_trip(df, use_threads=True) - self.check_round_trip(df, use_threads=False) + self.check_round_trip(df, temp_file, use_threads=True) + self.check_round_trip(df, temp_file, use_threads=False) def test_path_pathlib(self): df = pd.DataFrame( @@ -149,13 +149,13 @@ def test_path_pathlib(self): result = tm.round_trip_pathlib(df.to_feather, read_feather) tm.assert_frame_equal(df, result) - def test_passthrough_keywords(self): + def test_passthrough_keywords(self, temp_file): df = pd.DataFrame( 1.1 * np.arange(120).reshape((30, 4)), columns=pd.Index(list("ABCD")), index=pd.Index([f"i-{i}" for i in range(30)]), ).reset_index() - self.check_round_trip(df, write_kwargs={"version": 1}) + self.check_round_trip(df, temp_file, write_kwargs={"version": 1}) @pytest.mark.network @pytest.mark.single_cpu @@ -168,7 +168,7 @@ def test_http_path(self, feather_file, httpserver): tm.assert_frame_equal(expected, res) def test_read_feather_dtype_backend( - self, string_storage, dtype_backend, using_infer_string + self, string_storage, dtype_backend, using_infer_string, temp_file ): # GH#50765 df = pd.DataFrame( @@ -184,10 +184,10 @@ def test_read_feather_dtype_backend( } ) - with tm.ensure_clean() as path: - to_feather(df, path) - with pd.option_context("mode.string_storage", string_storage): - result = read_feather(path, dtype_backend=dtype_backend) + path = temp_file + to_feather(df, path) + with pd.option_context("mode.string_storage", string_storage): + result = read_feather(path, dtype_backend=dtype_backend) if dtype_backend == "pyarrow": pa = pytest.importorskip("pyarrow") @@ -227,20 +227,21 @@ def test_read_feather_dtype_backend( ) tm.assert_frame_equal(result, expected) - def test_int_columns_and_index(self): + def test_int_columns_and_index(self, temp_file): df = pd.DataFrame({"a": [1, 2, 3]}, index=pd.Index([3, 4, 5], name="test")) - self.check_round_trip(df) + self.check_round_trip(df, temp_file) - def test_invalid_dtype_backend(self): + def test_invalid_dtype_backend(self, tmp_path): msg = ( "dtype_backend numpy is invalid, only 'numpy_nullable' and " "'pyarrow' are allowed." ) df = pd.DataFrame({"int": list(range(1, 4))}) - with tm.ensure_clean("tmp.feather") as path: - df.to_feather(path) - with pytest.raises(ValueError, match=msg): - read_feather(path, dtype_backend="numpy") + + path = tmp_path / "tmp.feather" + df.to_feather(path) + with pytest.raises(ValueError, match=msg): + read_feather(path, dtype_backend="numpy") def test_string_inference(self, tmp_path, using_infer_string): # GH#54431 @@ -283,7 +284,7 @@ def test_string_inference_string_view_type(self, tmp_path): ) tm.assert_frame_equal(result, expected) - def test_out_of_bounds_datetime_to_feather(self): + def test_out_of_bounds_datetime_to_feather(self, temp_file): # GH#47832 df = pd.DataFrame( { @@ -293,4 +294,4 @@ def test_out_of_bounds_datetime_to_feather(self): ], } ) - self.check_round_trip(df) + self.check_round_trip(df, temp_file) From 35fc186d462dd580809686e1462a0547b415d1ad Mon Sep 17 00:00:00 2001 From: zhangbowen-coder <2439796518@qq.com> Date: Tue, 21 Oct 2025 12:10:32 +0000 Subject: [PATCH 2/2] TST: ALL the ensure_clean functions in file pandas/pandas/tests/io/pytables/test_store.py and pandas/pandas/tests/io/parser/common/test_file_buffer_url.py have benn replaced. --- .../io/parser/common/test_file_buffer_url.py | 62 +++++++-------- pandas/tests/io/pytables/test_store.py | 79 +++++++++---------- 2 files changed, 68 insertions(+), 73 deletions(-) diff --git a/pandas/tests/io/parser/common/test_file_buffer_url.py b/pandas/tests/io/parser/common/test_file_buffer_url.py index cef57318195ec..2777714dec8b2 100644 --- a/pandas/tests/io/parser/common/test_file_buffer_url.py +++ b/pandas/tests/io/parser/common/test_file_buffer_url.py @@ -97,25 +97,25 @@ def test_nonexistent_path(all_parsers): @pytest.mark.skipif(WASM, reason="limited file system access on WASM") @td.skip_if_windows # os.chmod does not work in windows -def test_no_permission(all_parsers): +def test_no_permission(all_parsers, temp_file): # GH 23784 parser = all_parsers msg = r"\[Errno 13\]" - with tm.ensure_clean() as path: - os.chmod(path, 0) # make file unreadable + path = temp_file + os.chmod(path, 0) # make file unreadable - # verify that this process cannot open the file (not running as sudo) - try: - with open(path, encoding="utf-8"): - pass - pytest.skip("Running as sudo.") - except PermissionError: + # verify that this process cannot open the file (not running as sudo) + try: + with open(path, encoding="utf-8"): pass + pytest.skip("Running as sudo.") + except PermissionError: + pass - with pytest.raises(PermissionError, match=msg) as e: - parser.read_csv(path) - assert path == e.value.filename + with pytest.raises(PermissionError, match=msg) as e: + parser.read_csv(path) + assert path == e.value.filename @pytest.mark.parametrize( @@ -269,19 +269,18 @@ def test_internal_eof_byte(all_parsers): tm.assert_frame_equal(result, expected) -def test_internal_eof_byte_to_file(all_parsers): +def test_internal_eof_byte_to_file(all_parsers, tmp_path): # see gh-16559 parser = all_parsers data = b'c1,c2\r\n"test \x1a test", test\r\n' expected = DataFrame([["test \x1a test", " test"]], columns=["c1", "c2"]) - path = f"__{uuid.uuid4()}__.csv" + path = tmp_path / f"__{uuid.uuid4()}__.csv" - with tm.ensure_clean(path) as path: - with open(path, "wb") as f: - f.write(data) + with open(path, "wb") as f: + f.write(data) - result = parser.read_csv(path) - tm.assert_frame_equal(result, expected) + result = parser.read_csv(path) + tm.assert_frame_equal(result, expected) def test_file_handle_string_io(all_parsers): @@ -372,7 +371,7 @@ def test_read_csv_file_handle(all_parsers, io_class, encoding): assert not handle.closed -def test_memory_map_compression(all_parsers, compression): +def test_memory_map_compression(all_parsers, compression, temp_file): """ Support memory map for compressed files. @@ -381,16 +380,16 @@ def test_memory_map_compression(all_parsers, compression): parser = all_parsers expected = DataFrame({"a": [1], "b": [2]}) - with tm.ensure_clean() as path: - expected.to_csv(path, index=False, compression=compression) + path = temp_file + expected.to_csv(path, index=False, compression=compression) - if parser.engine == "pyarrow": - msg = "The 'memory_map' option is not supported with the 'pyarrow' engine" - with pytest.raises(ValueError, match=msg): - parser.read_csv(path, memory_map=True, compression=compression) - return + if parser.engine == "pyarrow": + msg = "The 'memory_map' option is not supported with the 'pyarrow' engine" + with pytest.raises(ValueError, match=msg): + parser.read_csv(path, memory_map=True, compression=compression) + return - result = parser.read_csv(path, memory_map=True, compression=compression) + result = parser.read_csv(path, memory_map=True, compression=compression) tm.assert_frame_equal( result, @@ -442,12 +441,11 @@ def test_context_manageri_user_provided(all_parsers, datapath): @skip_pyarrow # ParserError: Empty CSV file -def test_file_descriptor_leak(all_parsers): +def test_file_descriptor_leak(all_parsers, temp_file): # GH 31488 parser = all_parsers - with tm.ensure_clean() as path: - with pytest.raises(EmptyDataError, match="No columns to parse from file"): - parser.read_csv(path) + with pytest.raises(EmptyDataError, match="No columns to parse from file"): + parser.read_csv(temp_file) def test_memory_map(all_parsers, csv_dir_path): diff --git a/pandas/tests/io/pytables/test_store.py b/pandas/tests/io/pytables/test_store.py index 5cfefeb469e8a..8565c255ac067 100644 --- a/pandas/tests/io/pytables/test_store.py +++ b/pandas/tests/io/pytables/test_store.py @@ -41,22 +41,20 @@ tables = pytest.importorskip("tables") -def test_context(setup_path): - with tm.ensure_clean(setup_path) as path: - try: - with HDFStore(path) as tbl: - raise ValueError("blah") - except ValueError: - pass - with tm.ensure_clean(setup_path) as path: - with HDFStore(path) as tbl: - tbl["a"] = DataFrame( - 1.1 * np.arange(120).reshape((30, 4)), - columns=Index(list("ABCD"), dtype=object), - index=Index([f"i-{i}" for i in range(30)], dtype=object), - ) - assert len(tbl) == 1 - assert type(tbl["a"]) == DataFrame +def test_context(temp_file): + try: + with HDFStore(temp_file) as tbl: + raise ValueError("blah") + except ValueError: + pass + with HDFStore(temp_file) as tbl: + tbl["a"] = DataFrame( + 1.1 * np.arange(120).reshape((30, 4)), + columns=Index(list("ABCD"), dtype=object), + index=Index([f"i-{i}" for i in range(30)], dtype=object), + ) + assert len(tbl) == 1 + assert type(tbl["a"]) == DataFrame def test_no_track_times(tmp_path, setup_path): @@ -971,37 +969,36 @@ def test_pickle_path_localpath(): @pytest.mark.parametrize("propindexes", [True, False]) -def test_copy(propindexes): +def test_copy(propindexes, temp_file): df = DataFrame( 1.1 * np.arange(120).reshape((30, 4)), columns=Index(list("ABCD")), index=Index([f"i-{i}" for i in range(30)]), ) - with tm.ensure_clean() as path: - with HDFStore(path) as st: - st.append("df", df, data_columns=["A"]) - with tempfile.NamedTemporaryFile() as new_f: - with HDFStore(path) as store: - with contextlib.closing( - store.copy(new_f.name, keys=None, propindexes=propindexes) - ) as tstore: - # check keys - keys = store.keys() - assert set(keys) == set(tstore.keys()) - # check indices & nrows - for k in tstore.keys(): - if tstore.get_storer(k).is_table: - new_t = tstore.get_storer(k) - orig_t = store.get_storer(k) - - assert orig_t.nrows == new_t.nrows - - # check propindixes - if propindexes: - for a in orig_t.axes: - if a.is_indexed: - assert new_t[a.name].is_indexed + with HDFStore(temp_file) as st: + st.append("df", df, data_columns=["A"]) + with tempfile.NamedTemporaryFile() as new_f: + with HDFStore(temp_file) as store: + with contextlib.closing( + store.copy(new_f.name, keys=None, propindexes=propindexes) + ) as tstore: + # check keys + keys = store.keys() + assert set(keys) == set(tstore.keys()) + # check indices & nrows + for k in tstore.keys(): + if tstore.get_storer(k).is_table: + new_t = tstore.get_storer(k) + orig_t = store.get_storer(k) + + assert orig_t.nrows == new_t.nrows + + # check propindixes + if propindexes: + for a in orig_t.axes: + if a.is_indexed: + assert new_t[a.name].is_indexed def test_duplicate_column_name(tmp_path, setup_path):