@@ -271,6 +271,75 @@ def test_write_row_event(self):
271271 self .assertEqual (event .rows [0 ]["values" ]["data" ], "Hello World" )
272272 self .assertEqual (event .columns [1 ].name , "data" )
273273
274+ def test_fetch_column_names_from_schema (self ):
275+ # This test is for MySQL 5.7+
276+ if not self .isMySQL57AndMore ():
277+ self .skipTest ("Test for MySQL 5.7+ where binlog_row_metadata can be MINIMAL" )
278+
279+ self .execute ("SET SESSION binlog_row_metadata = 'MINIMAL'" )
280+ query = "CREATE TABLE test_column_cache (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
281+ self .execute (query )
282+ self .execute ("INSERT INTO test_column_cache (data) VALUES('Hello')" )
283+ self .execute ("COMMIT" )
284+
285+ # Test with use_column_name_cache = True
286+ self .stream .close ()
287+ self .stream = BinLogStreamReader (
288+ self .database ,
289+ server_id = 1024 ,
290+ use_column_name_cache = True ,
291+ only_events = [WriteRowsEvent ],
292+ )
293+
294+ event = self .stream .fetchone ()
295+ self .assertIsInstance (event , WriteRowsEvent )
296+ self .assertEqual (event .table , "test_column_cache" )
297+ self .assertIn ("id" , event .rows [0 ]["values" ])
298+ self .assertIn ("data" , event .rows [0 ]["values" ])
299+ self .assertEqual (event .rows [0 ]["values" ]["id" ], 1 )
300+ self .assertEqual (event .rows [0 ]["values" ]["data" ], "Hello" )
301+
302+ # Test with use_column_name_cache = False
303+ self .stream .close ()
304+
305+ # Clear cache before next run
306+ from pymysqlreplication import row_event
307+ row_event ._COLUMN_NAME_CACHE .clear ()
308+
309+ self .stream = BinLogStreamReader (
310+ self .database ,
311+ server_id = 1025 , # different server_id to avoid caching issues
312+ use_column_name_cache = False ,
313+ only_events = [WriteRowsEvent ],
314+ )
315+
316+ # Reset and replay events
317+ self .resetBinLog ()
318+ self .execute ("SET SESSION binlog_row_metadata = 'MINIMAL'" )
319+ self .execute ("INSERT INTO test_column_cache (data) VALUES('World')" )
320+ self .execute ("COMMIT" )
321+
322+ # Skip RotateEvent and FormatDescriptionEvent
323+ self .stream .fetchone ()
324+ self .stream .fetchone ()
325+ # Skip QueryEvent for BEGIN
326+ if not self .isMariaDB ():
327+ self .stream .fetchone ()
328+ # Skip TableMapEvent
329+ self .stream .fetchone ()
330+
331+ event = self .stream .fetchone ()
332+ self .assertIsInstance (event , WriteRowsEvent )
333+ self .assertEqual (event .table , "test_column_cache" )
334+ # With cache disabled, we should not have column names
335+ self .assertNotIn ("id" , event .rows [0 ]["values" ])
336+ self .assertNotIn ("data" , event .rows [0 ]["values" ])
337+
338+ # cleanup
339+ self .execute ("SET SESSION binlog_row_metadata = 'FULL'" )
340+ row_event ._COLUMN_NAME_CACHE .clear ()
341+
342+
274343 def test_delete_row_event (self ):
275344 query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
276345 self .execute (query )
0 commit comments