@@ -17,7 +17,7 @@ use crate::optimizer::rule::implementation::ImplementationRuleImpl;
1717use crate :: optimizer:: rule:: normalization:: NormalizationRuleImpl ;
1818use crate :: parser:: parse_sql;
1919use crate :: planner:: LogicalPlan ;
20- use crate :: storage:: rocksdb:: RocksStorage ;
20+ use crate :: storage:: rocksdb:: { OptimisticRocksStorage , RocksStorage } ;
2121use crate :: storage:: { StatisticsMetaCache , Storage , TableCache , Transaction , ViewCache } ;
2222use crate :: types:: tuple:: { SchemaRef , Tuple } ;
2323use crate :: types:: value:: DataValue ;
@@ -86,6 +86,21 @@ impl DataBaseBuilder {
8686
8787 pub fn build ( self ) -> Result < Database < RocksStorage > , DatabaseError > {
8888 let storage = RocksStorage :: new ( self . path ) ?;
89+
90+ Self :: _build :: < RocksStorage > ( storage, self . scala_functions , self . table_functions )
91+ }
92+
93+ pub fn build_optimistic ( self ) -> Result < Database < OptimisticRocksStorage > , DatabaseError > {
94+ let storage = OptimisticRocksStorage :: new ( self . path ) ?;
95+
96+ Self :: _build :: < OptimisticRocksStorage > ( storage, self . scala_functions , self . table_functions )
97+ }
98+
99+ fn _build < T : Storage > (
100+ storage : T ,
101+ scala_functions : ScalaFunctions ,
102+ table_functions : TableFunctions ,
103+ ) -> Result < Database < T > , DatabaseError > {
89104 let meta_cache = SharedLruCache :: new ( 256 , 8 , RandomState :: new ( ) ) ?;
90105 let table_cache = SharedLruCache :: new ( 48 , 4 , RandomState :: new ( ) ) ?;
91106 let view_cache = SharedLruCache :: new ( 12 , 4 , RandomState :: new ( ) ) ?;
@@ -94,8 +109,8 @@ impl DataBaseBuilder {
94109 storage,
95110 mdl : Default :: default ( ) ,
96111 state : Arc :: new ( State {
97- scala_functions : self . scala_functions ,
98- table_functions : self . table_functions ,
112+ scala_functions,
113+ table_functions,
99114 meta_cache,
100115 table_cache,
101116 view_cache,
@@ -662,6 +677,53 @@ pub(crate) mod test {
662677 tx_1. run ( "insert into t1 values(0, 0)" ) ?. done ( ) ?;
663678 tx_1. run ( "insert into t1 values(1, 1)" ) ?. done ( ) ?;
664679
680+ assert ! ( tx_2. run( "insert into t1 values(0, 0)" ) ?. done( ) . is_err( ) ) ;
681+ tx_2. run ( "insert into t1 values(3, 3)" ) ?. done ( ) ?;
682+
683+ let mut iter_1 = tx_1. run ( "select * from t1" ) ?;
684+ let mut iter_2 = tx_2. run ( "select * from t1" ) ?;
685+
686+ assert_eq ! (
687+ iter_1. next( ) . unwrap( ) ?. values,
688+ vec![ DataValue :: Int32 ( 0 ) , DataValue :: Int32 ( 0 ) ]
689+ ) ;
690+ assert_eq ! (
691+ iter_1. next( ) . unwrap( ) ?. values,
692+ vec![ DataValue :: Int32 ( 1 ) , DataValue :: Int32 ( 1 ) ]
693+ ) ;
694+
695+ assert_eq ! (
696+ iter_2. next( ) . unwrap( ) ?. values,
697+ vec![ DataValue :: Int32 ( 3 ) , DataValue :: Int32 ( 3 ) ]
698+ ) ;
699+ drop ( iter_1) ;
700+ drop ( iter_2) ;
701+
702+ tx_1. commit ( ) ?;
703+ tx_2. commit ( ) ?;
704+
705+ let mut tx_3 = kite_sql. new_transaction ( ) ?;
706+ let res = tx_3. run ( "create table t2 (a int primary key, b int)" ) ;
707+ assert ! ( res. is_err( ) ) ;
708+
709+ Ok ( ( ) )
710+ }
711+
712+ #[ test]
713+ fn test_optimistic_transaction_sql ( ) -> Result < ( ) , DatabaseError > {
714+ let temp_dir = TempDir :: new ( ) . expect ( "unable to create temporary working directory" ) ;
715+ let kite_sql = DataBaseBuilder :: path ( temp_dir. path ( ) ) . build_optimistic ( ) ?;
716+
717+ kite_sql
718+ . run ( "create table t1 (a int primary key, b int)" ) ?
719+ . done ( ) ?;
720+
721+ let mut tx_1 = kite_sql. new_transaction ( ) ?;
722+ let mut tx_2 = kite_sql. new_transaction ( ) ?;
723+
724+ tx_1. run ( "insert into t1 values(0, 0)" ) ?. done ( ) ?;
725+ tx_1. run ( "insert into t1 values(1, 1)" ) ?. done ( ) ?;
726+
665727 tx_2. run ( "insert into t1 values(0, 0)" ) ?. done ( ) ?;
666728 tx_2. run ( "insert into t1 values(3, 3)" ) ?. done ( ) ?;
667729
0 commit comments