@@ -543,6 +543,35 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
543543
544544 const auto & mvccSnapshot = record.HasMvccSnapshot () ? NOlap::TSnapshot{record.GetMvccSnapshot ().GetStep (), record.GetMvccSnapshot ().GetTxId ()} : NOlap::TSnapshot::Zero ();
545545
546+ if (mvccSnapshot != NOlap::TSnapshot::Zero ()) {
547+ auto snapshotSchema = TablesManager.GetPrimaryIndex ()->GetVersionedIndex ().GetSchemaVerified (mvccSnapshot);
548+ if (snapshotSchema->GetVersion () != schema->GetVersion ()) {
549+ const TString errorMessage = TStringBuilder () << " schema version mismatch with snapshot: "
550+ << " tx_id=" << record.GetTxId ()
551+ << " , snapshot=" << mvccSnapshot
552+ << " , snapshot_schema_version=" << snapshotSchema->GetVersion ()
553+ << " , request_schema_version=" << schema->GetVersion ()
554+ << " , table_id=" << schemeShardLocalPathId
555+ << " , lock_id=" << lockId;
556+
557+ AFL_ERROR (NKikimrServices::TX_COLUMNSHARD_WRITE)(" event" , " schema_version_mismatch" )
558+ (" tx_id" , record.GetTxId ())
559+ (" snapshot" , TStringBuilder () << mvccSnapshot)
560+ (" snapshot_schema_version" , snapshotSchema->GetVersion ())
561+ (" request_schema_version" , schema->GetVersion ())
562+ (" table_id" , schemeShardLocalPathId)
563+ (" lock_id" , lockId)
564+ (" path_id" , pathId)
565+ (" source" , source.ToString ())
566+ (" cookie" , cookie);
567+
568+ LWPROBE (EvWrite, TabletID (), source.ToString (), cookie, record.GetTxId (), writeTimeout.value_or (TDuration::Max ()), 0 , " " , false ,
569+ operation.GetIsBulk (), ToString (NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST), errorMessage);
570+ sendError (errorMessage, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST);
571+ return ;
572+ }
573+ }
574+
546575 LWPROBE (EvWrite, TabletID (), source.ToString (), cookie, record.GetTxId (), writeTimeout.value_or (TDuration::Max ()), arrowData->GetSize (), " " , true , operation.GetIsBulk (), " " , " " );
547576
548577 WriteTasksQueue->Enqueue (TWriteTask (arrowData, schema, source, ev->Recipient , granuleShardingVersionId, pathId, cookie, mvccSnapshot, lockId, *mType , behaviour, writeTimeout, record.GetTxId (),
0 commit comments