1111)
1212from contextlib import (
1313 ExitStack ,
14+ closing ,
1415 contextmanager ,
1516)
1617from datetime import (
@@ -1651,10 +1652,18 @@ def run_transaction(self):
16511652
16521653 def execute (self , sql : str | Select | TextClause , params = None ):
16531654 """Simple passthrough to SQLAlchemy connectable"""
1655+ from sqlalchemy .exc import DBAPIError as SQLAlchemyDatabaseError
1656+
16541657 args = [] if params is None else [params ]
16551658 if isinstance (sql , str ):
1656- return self .con .exec_driver_sql (sql , * args )
1657- return self .con .execute (sql , * args )
1659+ execute_function = self .con .exec_driver_sql
1660+ else :
1661+ execute_function = self .con .execute
1662+
1663+ try :
1664+ return execute_function (sql , * args )
1665+ except SQLAlchemyDatabaseError as exc :
1666+ raise DatabaseError (f"Execution failed on sql '{ sql } ': { exc } " ) from exc
16581667
16591668 def read_table (
16601669 self ,
@@ -2108,17 +2117,19 @@ def run_transaction(self):
21082117 self .con .commit ()
21092118
21102119 def execute (self , sql : str | Select | TextClause , params = None ):
2120+ from adbc_driver_manager import DatabaseError as ADBCDatabaseError
2121+
21112122 if not isinstance (sql , str ):
21122123 raise TypeError ("Query must be a string unless using sqlalchemy." )
21132124 args = [] if params is None else [params ]
21142125 cur = self .con .cursor ()
21152126 try :
21162127 cur .execute (sql , * args )
21172128 return cur
2118- except Exception as exc :
2129+ except ADBCDatabaseError as exc :
21192130 try :
21202131 self .con .rollback ()
2121- except Exception as inner_exc : # pragma: no cover
2132+ except ADBCDatabaseError as inner_exc : # pragma: no cover
21222133 ex = DatabaseError (
21232134 f"Execution failed on sql: { sql } \n { exc } \n unable to rollback"
21242135 )
@@ -2207,8 +2218,7 @@ def read_table(
22072218 else :
22082219 stmt = f"SELECT { select_list } FROM { table_name } "
22092220
2210- with self .con .cursor () as cur :
2211- cur .execute (stmt )
2221+ with closing (self .execute (stmt )) as cur :
22122222 pa_table = cur .fetch_arrow_table ()
22132223 df = arrow_table_to_pandas (pa_table , dtype_backend = dtype_backend )
22142224
@@ -2278,8 +2288,7 @@ def read_query(
22782288 if chunksize :
22792289 raise NotImplementedError ("'chunksize' is not implemented for ADBC drivers" )
22802290
2281- with self .con .cursor () as cur :
2282- cur .execute (sql )
2291+ with closing (self .execute (sql )) as cur :
22832292 pa_table = cur .fetch_arrow_table ()
22842293 df = arrow_table_to_pandas (pa_table , dtype_backend = dtype_backend )
22852294
@@ -2335,6 +2344,9 @@ def to_sql(
23352344 engine : {'auto', 'sqlalchemy'}, default 'auto'
23362345 Raises NotImplementedError if not set to 'auto'
23372346 """
2347+ from adbc_driver_manager import DatabaseError as ADBCDatabaseError
2348+ import pyarrow as pa
2349+
23382350 if index_label :
23392351 raise NotImplementedError (
23402352 "'index_label' is not implemented for ADBC drivers"
@@ -2364,22 +2376,25 @@ def to_sql(
23642376 if if_exists == "fail" :
23652377 raise ValueError (f"Table '{ table_name } ' already exists." )
23662378 elif if_exists == "replace" :
2367- with self . con . cursor () as cur :
2368- cur .execute (f"DROP TABLE { table_name } " )
2379+ sql_statement = f"DROP TABLE { table_name } "
2380+ self .execute (sql_statement ). close ( )
23692381 elif if_exists == "append" :
23702382 mode = "append"
23712383
2372- import pyarrow as pa
2373-
23742384 try :
23752385 tbl = pa .Table .from_pandas (frame , preserve_index = index )
23762386 except pa .ArrowNotImplementedError as exc :
23772387 raise ValueError ("datatypes not supported" ) from exc
23782388
23792389 with self .con .cursor () as cur :
2380- total_inserted = cur .adbc_ingest (
2381- table_name = name , data = tbl , mode = mode , db_schema_name = schema
2382- )
2390+ try :
2391+ total_inserted = cur .adbc_ingest (
2392+ table_name = name , data = tbl , mode = mode , db_schema_name = schema
2393+ )
2394+ except ADBCDatabaseError as exc :
2395+ raise DatabaseError (
2396+ f"Failed to insert records on table={ name } with { mode = } "
2397+ ) from exc
23832398
23842399 self .con .commit ()
23852400 return total_inserted
@@ -2496,9 +2511,9 @@ def sql_schema(self) -> str:
24962511 return str (";\n " .join (self .table ))
24972512
24982513 def _execute_create (self ) -> None :
2499- with self .pd_sql .run_transaction () as conn :
2514+ with self .pd_sql .run_transaction ():
25002515 for stmt in self .table :
2501- conn . execute (stmt )
2516+ self . pd_sql . execute (stmt ). close ( )
25022517
25032518 def insert_statement (self , * , num_rows : int ) -> str :
25042519 names = list (map (str , self .frame .columns ))
@@ -2520,8 +2535,13 @@ def insert_statement(self, *, num_rows: int) -> str:
25202535 return insert_statement
25212536
25222537 def _execute_insert (self , conn , keys , data_iter ) -> int :
2538+ from sqlite3 import DatabaseError as SQLiteDatabaseError
2539+
25232540 data_list = list (data_iter )
2524- conn .executemany (self .insert_statement (num_rows = 1 ), data_list )
2541+ try :
2542+ conn .executemany (self .insert_statement (num_rows = 1 ), data_list )
2543+ except SQLiteDatabaseError as exc :
2544+ raise DatabaseError ("Execution failed" ) from exc
25252545 return conn .rowcount
25262546
25272547 def _execute_insert_multi (self , conn , keys , data_iter ) -> int :
@@ -2643,17 +2663,19 @@ def run_transaction(self):
26432663 cur .close ()
26442664
26452665 def execute (self , sql : str | Select | TextClause , params = None ):
2666+ from sqlite3 import DatabaseError as SQLiteDatabaseError
2667+
26462668 if not isinstance (sql , str ):
26472669 raise TypeError ("Query must be a string unless using sqlalchemy." )
26482670 args = [] if params is None else [params ]
26492671 cur = self .con .cursor ()
26502672 try :
26512673 cur .execute (sql , * args )
26522674 return cur
2653- except Exception as exc :
2675+ except SQLiteDatabaseError as exc :
26542676 try :
26552677 self .con .rollback ()
2656- except Exception as inner_exc : # pragma: no cover
2678+ except SQLiteDatabaseError as inner_exc : # pragma: no cover
26572679 ex = DatabaseError (
26582680 f"Execution failed on sql: { sql } \n { exc } \n unable to rollback"
26592681 )
0 commit comments