Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
355 changes: 355 additions & 0 deletions tests/integration/test_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,3 +184,358 @@ def test_join_literal(c):
df_expected = pd.DataFrame({"user_id": [], "b": [], "user_id0": [], "c": []})

assert_frame_equal(df.reset_index(), df_expected.reset_index(), check_dtype=False)


def test_join_lricomplex(c):
# ---------- Panel data (equality and inequality conditions)

# Correct answer
dfcorrpn = pd.DataFrame(
[
[
0,
1,
pd.NA,
331,
"c1",
3.1,
pd.Timestamp("2003-01-01"),
0,
2,
pd.NA,
110,
"a1",
1.1,
pd.Timestamp("2001-01-01"),
],
[
0,
2,
pd.NA,
332,
"c2",
3.2,
pd.Timestamp("2003-02-01"),
0,
2,
pd.NA,
110,
"a1",
1.1,
pd.Timestamp("2001-01-01"),
],
[
0,
3,
pd.NA,
333,
"c3",
3.3,
pd.Timestamp("2003-03-01"),
pd.NA,
pd.NA,
pd.NA,
pd.NA,
np.nan,
np.nan,
pd.NaT,
],
[
1,
3,
pd.NA,
334,
"c4",
np.nan,
pd.Timestamp("2003-04-01"),
2,
5,
pd.NA,
112,
"a3",
np.nan,
pd.Timestamp("2001-03-01"),
],
[
1,
4,
35,
335,
"c5",
np.nan,
pd.Timestamp("2003-05-01"),
2,
5,
pd.NA,
112,
"a3",
np.nan,
pd.Timestamp("2001-03-01"),
],
[
1,
4,
35,
335,
"c5",
np.nan,
pd.Timestamp("2003-05-01"),
4,
6,
13,
113,
"a4",
np.nan,
pd.Timestamp("2001-04-01"),
],
[
2,
1,
36,
336,
"c6",
np.nan,
pd.Timestamp("2003-06-01"),
pd.NA,
pd.NA,
pd.NA,
pd.NA,
np.nan,
np.nan,
pd.NaT,
],
[
2,
3,
37,
337,
"c7",
np.nan,
pd.NaT,
pd.NA,
pd.NA,
pd.NA,
pd.NA,
np.nan,
np.nan,
pd.NaT,
],
[3, 2, 38, 338, "c8", 3.8, pd.NaT, 1, 2, 14, 114, "a5", np.nan, pd.NaT],
[3, 2, 39, 339, "c9", 3.9, pd.NaT, 1, 2, 14, 114, "a5", np.nan, pd.NaT],
[3, 2, 38, 338, "c8", 3.8, pd.NaT, 2, 3, 15, 115, "a6", 1.6, pd.NaT],
[3, 2, 39, 339, "c9", 3.9, pd.NaT, 2, 3, 15, 115, "a6", 1.6, pd.NaT],
],
columns=[
"ids",
"dates",
"pn_nullint",
"pn_int",
"pn_str",
"pn_float",
"pn_date",
"startdate",
"enddate",
"lk_nullint",
"lk_int",
"lk_str",
"lk_float",
"lk_date",
],
)
change_types = {
"pn_nullint": "Int32",
"lk_nullint": "Int32",
"startdate": "Int64",
"enddate": "Int64",
"lk_int": "Int64",
"pn_str": "string",
"lk_str": "string",
}
for k, v in change_types.items():
dfcorrpn[k] = dfcorrpn[k].astype(v)

# Left Join
querypnl = """
select a.*, b.startdate, b.enddate, b.lk_nullint, b.lk_int, b.lk_str,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have tested this Query using PostgresSQL, it somewhat matches with the correct dataset specified here except for some dtype mismatch, I have one suggestion can Please you add this testing on test_postgres.py as well, where you can make use of assert_query_gives_same_result to compare your result directly with Postgres (similar to the one you specified in issue) result. let me know if you need any help.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All right. Let me check. To do the Postgre tests, I need to install docker, right?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes :), if not, a quick hack for testing purpose is you can hardcore any available postgresSQL ip address in the engine fixture of test_postgres.py.whichever is easier.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I've added the tests in test_postgres.py. Also I don't know why but I cannot connect to the postgres container using the original test_postgres.py. So I exposed port 5432 in client.containers.run and change address to "localhost". Then, the tests work fine.

b.lk_float, b.lk_date
from user_table_pn a left join user_table_lk b
on a.ids=b.id and b.startdate<=a.dates and a.dates<=b.enddate
"""
dftestpnl = (
c.sql(querypnl).compute().sort_values(["ids", "dates", "startdate", "enddate"])
)
assert_frame_equal(
dftestpnl.reset_index(drop=True), dfcorrpn.reset_index(drop=True)
)

# Right Join
querypnr = """
select b.*, a.startdate, a.enddate, a.lk_nullint, a.lk_int, a.lk_str,
a.lk_float, a.lk_date
from user_table_lk a right join user_table_pn b
on b.ids=a.id and a.startdate<=b.dates and b.dates<=a.enddate
"""
dftestpnr = (
c.sql(querypnr).compute().sort_values(["ids", "dates", "startdate", "enddate"])
)
assert_frame_equal(
dftestpnr.reset_index(drop=True), dfcorrpn.reset_index(drop=True)
)

# Inner Join
querypni = """
select a.*, b.startdate, b.enddate, b.lk_nullint, b.lk_int, b.lk_str,
b.lk_float, b.lk_date
from user_table_pn a inner join user_table_lk b
on a.ids=b.id and b.startdate<=a.dates and a.dates<=b.enddate
"""
dftestpni = (
c.sql(querypni).compute().sort_values(["ids", "dates", "startdate", "enddate"])
)
assert_frame_equal(
dftestpni.reset_index(drop=True),
dfcorrpn.dropna(subset=["startdate"])
.assign(
startdate=lambda x: x["startdate"].astype("int64"),
enddate=lambda x: x["enddate"].astype("int64"),
lk_int=lambda x: x["lk_int"].astype("int64"),
)
.reset_index(drop=True),
)

# ---------- Time-series data (inequality condition only)

# Correct answer
dfcorrts = pd.DataFrame(
[
[
3,
pd.NA,
221,
"b1",
2.1,
pd.Timestamp("2002-01-01"),
2,
5,
pd.NA,
112,
"a3",
np.nan,
pd.Timestamp("2001-03-01"),
],
[
4,
22,
222,
"b2",
np.nan,
pd.Timestamp("2002-02-01"),
2,
5,
pd.NA,
112,
"a3",
np.nan,
pd.Timestamp("2001-03-01"),
],
[
4,
22,
222,
"b2",
np.nan,
pd.Timestamp("2002-02-01"),
4,
6,
13,
113,
"a4",
np.nan,
pd.Timestamp("2001-04-01"),
],
[
7,
23,
223,
"b3",
2.3,
pd.NaT,
pd.NA,
pd.NA,
pd.NA,
pd.NA,
np.nan,
np.nan,
pd.NaT,
],
],
columns=[
"dates",
"ts_nullint",
"ts_int",
"ts_str",
"ts_float",
"ts_date",
"startdate",
"enddate",
"lk_nullint",
"lk_int",
"lk_str",
"lk_float",
"lk_date",
],
)
change_types = {
"ts_nullint": "Int32",
"lk_nullint": "Int32",
"startdate": "Int64",
"enddate": "Int64",
"lk_int": "Int64",
"lk_str": "string",
"ts_str": "string",
}
for k, v in change_types.items():
dfcorrts[k] = dfcorrts[k].astype(v)

# Left Join
querytsl = """
select a.*, b.startdate, b.enddate, b.lk_nullint, b.lk_int, b.lk_str,
b.lk_float, b.lk_date
from user_table_ts a left join user_table_lk2 b
on b.startdate<=a.dates and a.dates<=b.enddate
"""
dftesttsl = c.sql(querytsl).compute().sort_values(["dates", "startdate", "enddate"])
assert_frame_equal(
dftesttsl.reset_index(drop=True), dfcorrts.reset_index(drop=True)
)

# Right Join
querytsr = """
select b.*, a.startdate, a.enddate, a.lk_nullint, a.lk_int, a.lk_str,
a.lk_float, a.lk_date
from user_table_lk2 a right join user_table_ts b
on a.startdate<=b.dates and b.dates<=a.enddate
"""
dftesttsr = c.sql(querytsr).compute().sort_values(["dates", "startdate", "enddate"])
assert_frame_equal(
dftesttsr.reset_index(drop=True), dfcorrts.reset_index(drop=True)
)

# Inner Join
querytsi = """
select a.*, b.startdate, b.enddate, b.lk_nullint, b.lk_int, b.lk_str,
b.lk_float, b.lk_date
from user_table_ts a inner join user_table_lk2 b
on b.startdate<=a.dates and a.dates<=b.enddate
"""
dftesttsi = c.sql(querytsi).compute().sort_values(["dates", "startdate", "enddate"])
assert_frame_equal(
dftesttsi.reset_index(drop=True),
dfcorrts.dropna(subset=["startdate"])
.assign(
startdate=lambda x: x["startdate"].astype("int64"),
enddate=lambda x: x["enddate"].astype("int64"),
lk_int=lambda x: x["lk_int"].astype("int64"),
)
.reset_index(drop=True),
)