diff --git a/.idea/.idea.TableDependency/.idea/.gitignore b/.idea/.idea.TableDependency/.idea/.gitignore
new file mode 100644
index 0000000..0d1c99a
--- /dev/null
+++ b/.idea/.idea.TableDependency/.idea/.gitignore
@@ -0,0 +1,13 @@
+# Default ignored files
+/shelf/
+/workspace.xml
+# Rider ignored files
+/contentModel.xml
+/projectSettingsUpdater.xml
+/modules.xml
+/.idea.TableDependency.iml
+# Datasource local storage ignored files
+/dataSources/
+/dataSources.local.xml
+# Editor-based HTTP Client requests
+/httpRequests/
diff --git a/.idea/.idea.TableDependency/.idea/.name b/.idea/.idea.TableDependency/.idea/.name
new file mode 100644
index 0000000..94423f5
--- /dev/null
+++ b/.idea/.idea.TableDependency/.idea/.name
@@ -0,0 +1 @@
+TableDependency
\ No newline at end of file
diff --git a/.idea/.idea.TableDependency/.idea/encodings.xml b/.idea/.idea.TableDependency/.idea/encodings.xml
new file mode 100644
index 0000000..df87cf9
--- /dev/null
+++ b/.idea/.idea.TableDependency/.idea/encodings.xml
@@ -0,0 +1,4 @@
+
+
+
+
\ No newline at end of file
diff --git a/.idea/.idea.TableDependency/.idea/indexLayout.xml b/.idea/.idea.TableDependency/.idea/indexLayout.xml
new file mode 100644
index 0000000..7b08163
--- /dev/null
+++ b/.idea/.idea.TableDependency/.idea/indexLayout.xml
@@ -0,0 +1,8 @@
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/.idea.TableDependency/.idea/vcs.xml b/.idea/.idea.TableDependency/.idea/vcs.xml
new file mode 100644
index 0000000..94a25f7
--- /dev/null
+++ b/.idea/.idea.TableDependency/.idea/vcs.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/TableDependency.SqlClient/SqlTableDependency.cs b/TableDependency.SqlClient/SqlTableDependency.cs
index 1fa19cc..ab3f67a 100644
--- a/TableDependency.SqlClient/SqlTableDependency.cs
+++ b/TableDependency.SqlClient/SqlTableDependency.cs
@@ -1,4 +1,5 @@
#region License
+
// TableDependency, SqlTableDependency
// Copyright (c) 2015-2020 Christian Del Bianco. All rights reserved.
//
@@ -22,6 +23,7 @@
// WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
// OTHER DEALINGS IN THE SOFTWARE.
+
#endregion
using System;
@@ -33,7 +35,6 @@
using System.Text;
using System.Threading;
using System.Threading.Tasks;
-
using TableDependency.SqlClient.Base;
using TableDependency.SqlClient.Base.Abstracts;
using TableDependency.SqlClient.Base.Delegates;
@@ -63,6 +64,8 @@ namespace TableDependency.SqlClient
protected Guid ConversationHandle;
protected const string StartMessageTemplate = "{0}/StartMessage/{1}";
protected const string EndMessageTemplate = "{0}/EndMessage";
+ protected readonly bool _saveQueue;
+ protected bool _greatDBObject;
#endregion
@@ -129,6 +132,7 @@ namespace TableDependency.SqlClient
/// The notify on Insert, Delete, Update operation.
/// if set to true [skip user permission check].
/// if set to true [include old values].
+ /// If true then queue d't delete, and the missed messages are saved and available the next time you run them
public SqlTableDependency(
string connectionString,
string tableName = null,
@@ -138,9 +142,12 @@ public SqlTableDependency(
ITableDependencyFilter filter = null,
DmlTriggerType notifyOn = DmlTriggerType.All,
bool executeUserPermissionCheck = true,
- bool includeOldValues = false) : base(connectionString, tableName, schemaName, mapper, updateOf, filter, notifyOn, executeUserPermissionCheck)
+ bool includeOldValues = false,
+ bool saveQueue = false) : base(connectionString, tableName, schemaName, mapper, updateOf, filter, notifyOn,
+ executeUserPermissionCheck)
{
this.IncludeOldValues = includeOldValues;
+ this._saveQueue = saveQueue;
}
#endregion
@@ -156,7 +163,8 @@ public SqlTableDependency(
public override void Start(int timeOut = 120, int watchDogTimeOut = 180)
{
if (timeOut < 60) throw new ArgumentException("timeOut must be greater or equal to 60 seconds");
- if (watchDogTimeOut < 60 || watchDogTimeOut < (timeOut + 60)) throw new WatchDogTimeOutException("watchDogTimeOut must be at least 60 seconds bigger then timeOut");
+ if (watchDogTimeOut < 60 || watchDogTimeOut < (timeOut + 60))
+ throw new WatchDogTimeOutException("watchDogTimeOut must be at least 60 seconds bigger then timeOut");
if (_task != null) return;
if (this.OnChanged == null) throw new NoSubscriberException();
@@ -168,20 +176,22 @@ public override void Start(int timeOut = 120, int watchDogTimeOut = 180)
this.NotifyListenersAboutStatus(onStatusChangedSubscribedList, TableDependencyStatus.Starting);
_disposed = false;
+ _greatDBObject = !CheckIfDatabaseObjectExists();
_processableMessages = this.CreateDatabaseObjects(timeOut, watchDogTimeOut);
_cancellationTokenSource = new CancellationTokenSource();
-
+
_task = Task.Factory.StartNew(() =>
- WaitForNotifications(
- _cancellationTokenSource.Token,
- onChangedSubscribedList,
- onErrorSubscribedList,
- onStatusChangedSubscribedList,
- timeOut,
- watchDogTimeOut),
+ WaitForNotifications(
+ _cancellationTokenSource.Token,
+ onChangedSubscribedList,
+ onErrorSubscribedList,
+ onStatusChangedSubscribedList,
+ timeOut,
+ watchDogTimeOut),
_cancellationTokenSource.Token);
- this.WriteTraceMessage(TraceLevel.Info, $"Waiting for receiving {_tableName}'s records change notifications.");
+ this.WriteTraceMessage(TraceLevel.Info,
+ $"Waiting for receiving {_tableName}'s records change notifications.");
}
///
@@ -195,7 +205,7 @@ public override void Stop()
_task?.Wait();
}
- _task = null;
+ _task = null;
_disposed = true;
this.WriteTraceMessage(TraceLevel.Info, "Stopped waiting for notification.");
@@ -245,7 +255,9 @@ protected override string GetTableName(string tableName)
}
var tableNameFromDataAnnotation = this.GetTableNameFromDataAnnotation();
- return !string.IsNullOrWhiteSpace(tableNameFromDataAnnotation) ? tableNameFromDataAnnotation : typeof(T).Name;
+ return !string.IsNullOrWhiteSpace(tableNameFromDataAnnotation)
+ ? tableNameFromDataAnnotation
+ : typeof(T).Name;
}
protected override string GetSchemaName(string schemaName)
@@ -270,7 +282,7 @@ protected virtual SqlServerVersion GetSqlServerVersion()
var serverVersion = sqlConnection.ServerVersion;
if (string.IsNullOrWhiteSpace(serverVersion)) return SqlServerVersion.Unknown;
- var serverVersionDetails = serverVersion.Split(new[] { "." }, StringSplitOptions.None);
+ var serverVersionDetails = serverVersion.Split(new[] {"."}, StringSplitOptions.None);
var versionNumber = int.Parse(serverVersionDetails[0]);
if (versionNumber < 8) return SqlServerVersion.Unknown;
@@ -300,7 +312,8 @@ protected override IEnumerable GetTableColumnsList()
sqlConnection.Open();
using (var sqlCommand = sqlConnection.CreateCommand())
{
- sqlCommand.CommandText = string.Format(SqlScripts.InformationSchemaColumns, _schemaName, _tableName);
+ sqlCommand.CommandText =
+ string.Format(SqlScripts.InformationSchemaColumns, _schemaName, _tableName);
var reader = sqlCommand.ExecuteReader();
while (reader.Read())
{
@@ -328,8 +341,11 @@ protected virtual bool CheckIfDatabaseObjectExists()
using (var sqlConnection = new SqlConnection(_connectionString))
{
sqlConnection.Open();
- var sqlCommand = new SqlCommand($"SELECT COUNT(*) FROM sys.service_queues WITH (NOLOCK) WHERE name = N'{_dataBaseObjectsNamingConvention}';", sqlConnection);
- result = (int)sqlCommand.ExecuteScalar() > 0;
+ var sqlCommand =
+ new SqlCommand(
+ $"SELECT COUNT(*) FROM sys.service_queues WITH (NOLOCK) WHERE name LIKE N'{_dataBaseObjectsNamingConvention}%';",
+ sqlConnection);
+ result = (int) sqlCommand.ExecuteScalar() > 0;
sqlConnection.Close();
}
@@ -342,15 +358,13 @@ protected override IList CreateDatabaseObjects(int timeOut, int watchDog
var interestedColumns = _userInterestedColumns as TableColumnInfo[] ?? _userInterestedColumns.ToArray();
- if (this.CheckIfDatabaseObjectExists() == false)
- {
- var columnsForUpdateOf = _updateOf != null ? string.Join(" OR ", _updateOf.Where(c => !string.IsNullOrWhiteSpace(c)).Distinct(StringComparer.CurrentCultureIgnoreCase).Select(c => $"UPDATE([{c}])").ToList()) : null;
- processableMessages = this.CreateSqlServerDatabaseObjects(interestedColumns, columnsForUpdateOf, watchDogTimeOut);
- }
- else
- {
- throw new DbObjectsWithSameNameException(_dataBaseObjectsNamingConvention);
- }
+ var columnsForUpdateOf = _updateOf != null
+ ? string.Join(" OR ",
+ _updateOf.Where(c => !string.IsNullOrWhiteSpace(c))
+ .Distinct(StringComparer.CurrentCultureIgnoreCase).Select(c => $"UPDATE([{c}])").ToList())
+ : null;
+ processableMessages =
+ this.CreateSqlServerDatabaseObjects(interestedColumns, columnsForUpdateOf, watchDogTimeOut);
return processableMessages;
}
@@ -358,7 +372,7 @@ protected override IList CreateDatabaseObjects(int timeOut, int watchDog
protected override string GetBaseObjectsNamingConvention()
{
var name = $"{_schemaName}_{_tableName}";
- return $"{name}_{Guid.NewGuid()}";
+ return $"{name}_Replication";
}
protected override void DropDatabaseObjects()
@@ -372,14 +386,19 @@ protected override void DropDatabaseObjects()
{
using (var sqlCommand = sqlConnection.CreateCommand())
{
- var dropMessages = string.Join(Environment.NewLine, _processableMessages.Select((pm, index) =>
- {
+ var dropMessages = string.Join(Environment.NewLine, _processableMessages.Select((pm, index) =>
+ {
if (index > 0)
{
- return this.Spacer(8) + string.Format("IF EXISTS (SELECT * FROM sys.service_message_types WITH (NOLOCK) WHERE name = N'{0}') DROP MESSAGE TYPE [{0}];", pm);
+ return this.Spacer(8) +
+ string.Format(
+ "IF EXISTS (SELECT * FROM sys.service_message_types WITH (NOLOCK) WHERE name = N'{0}') DROP MESSAGE TYPE [{0}];",
+ pm);
}
- return string.Format("IF EXISTS (SELECT * FROM sys.service_message_types WITH (NOLOCK) WHERE name = N'{0}') DROP MESSAGE TYPE [{0}];", pm);
+ return string.Format(
+ "IF EXISTS (SELECT * FROM sys.service_message_types WITH (NOLOCK) WHERE name = N'{0}') DROP MESSAGE TYPE [{0}];",
+ pm);
}));
var dropAllScript = this.PrepareScriptDropAll(dropMessages);
@@ -402,7 +421,8 @@ protected override void CheckRdbmsDependentImplementation()
this.CheckIfServiceBrokerIsEnabled();
var sqlVersion = this.GetSqlServerVersion();
- if (sqlVersion < SqlServerVersion.SqlServer2008) throw new SqlServerVersionNotSupportedException(sqlVersion);
+ if (sqlVersion < SqlServerVersion.SqlServer2008)
+ throw new SqlServerVersionNotSupportedException(sqlVersion);
}
protected virtual string CreateWhereCondition(bool prependSpace = false)
@@ -418,12 +438,15 @@ protected virtual string CreateWhereCondition(bool prependSpace = false)
return where.Trim();
}
- protected virtual string PrepareInsertIntoTableVariableForUpdateChange(TableColumnInfo[] userInterestedColumns, string columnsForUpdateOf)
+ protected virtual string PrepareInsertIntoTableVariableForUpdateChange(TableColumnInfo[] userInterestedColumns,
+ string columnsForUpdateOf)
{
- var insertIntoExceptTableStatement = this.PrepareInsertIntoModifiedRecordsTableStatement(userInterestedColumns);
+ var insertIntoExceptTableStatement =
+ this.PrepareInsertIntoModifiedRecordsTableStatement(userInterestedColumns);
var scriptForInsertInTableVariable = !string.IsNullOrEmpty(columnsForUpdateOf)
- ? string.Format(SqlScripts.InsertInTableVariableConsideringUpdateOf, columnsForUpdateOf, ChangeType.Update, insertIntoExceptTableStatement)
+ ? string.Format(SqlScripts.InsertInTableVariableConsideringUpdateOf, columnsForUpdateOf,
+ ChangeType.Update, insertIntoExceptTableStatement)
: string.Format(SqlScripts.InsertInTableVariable, ChangeType.Update, insertIntoExceptTableStatement);
return scriptForInsertInTableVariable;
@@ -434,34 +457,52 @@ protected virtual IList CreateSqlServerDatabaseObjects(IEnumerable();
var tableColumns = userInterestedColumns as IList ?? userInterestedColumns.ToList();
- var columnsForModifiedRecordsTable = this.PrepareColumnListForTableVariable(tableColumns, this.IncludeOldValues);
+ var columnsForModifiedRecordsTable =
+ this.PrepareColumnListForTableVariable(tableColumns, this.IncludeOldValues);
var columnsForExceptTable = this.PrepareColumnListForTableVariable(tableColumns, false);
var columnsForDeletedTable = this.PrepareColumnListForTableVariable(tableColumns, false);
+ var greatDBObject = _saveQueue && _greatDBObject;
+
using (var sqlConnection = new SqlConnection(_connectionString))
{
sqlConnection.Open();
using (var transaction = sqlConnection.BeginTransaction())
{
- var sqlCommand = new SqlCommand { Connection = sqlConnection, Transaction = transaction };
+ var sqlCommand = new SqlCommand {Connection = sqlConnection, Transaction = transaction};
// Messages
- var startMessageInsert = string.Format(StartMessageTemplate, _dataBaseObjectsNamingConvention, ChangeType.Insert);
- sqlCommand.CommandText = $"CREATE MESSAGE TYPE [{startMessageInsert}] VALIDATION = NONE;";
- sqlCommand.ExecuteNonQuery();
+ var startMessageInsert = string.Format(StartMessageTemplate, _dataBaseObjectsNamingConvention,
+ ChangeType.Insert);
+ if (greatDBObject)
+ {
+ sqlCommand.CommandText = $"CREATE MESSAGE TYPE [{startMessageInsert}] VALIDATION = NONE;";
+ sqlCommand.ExecuteNonQuery();
+ }
+
this.WriteTraceMessage(TraceLevel.Verbose, $"Message {startMessageInsert} created.");
processableMessages.Add(startMessageInsert);
- var startMessageUpdate = string.Format(StartMessageTemplate, _dataBaseObjectsNamingConvention, ChangeType.Update);
- sqlCommand.CommandText = $"CREATE MESSAGE TYPE [{startMessageUpdate}] VALIDATION = NONE;";
- sqlCommand.ExecuteNonQuery();
+ var startMessageUpdate = string.Format(StartMessageTemplate, _dataBaseObjectsNamingConvention,
+ ChangeType.Update);
+ if (greatDBObject)
+ {
+ sqlCommand.CommandText = $"CREATE MESSAGE TYPE [{startMessageUpdate}] VALIDATION = NONE;";
+ sqlCommand.ExecuteNonQuery();
+ }
+
this.WriteTraceMessage(TraceLevel.Verbose, $"Message {startMessageUpdate} created.");
processableMessages.Add(startMessageUpdate);
- var startMessageDelete = string.Format(StartMessageTemplate, _dataBaseObjectsNamingConvention, ChangeType.Delete);
- sqlCommand.CommandText = $"CREATE MESSAGE TYPE [{startMessageDelete}] VALIDATION = NONE;";
- sqlCommand.ExecuteNonQuery();
+ var startMessageDelete = string.Format(StartMessageTemplate, _dataBaseObjectsNamingConvention,
+ ChangeType.Delete);
+ if (greatDBObject)
+ {
+ sqlCommand.CommandText = $"CREATE MESSAGE TYPE [{startMessageDelete}] VALIDATION = NONE;";
+ sqlCommand.ExecuteNonQuery();
+ }
+
this.WriteTraceMessage(TraceLevel.Verbose, $"Message {startMessageDelete} created.");
processableMessages.Add(startMessageDelete);
@@ -469,113 +510,172 @@ protected virtual IList CreateSqlServerDatabaseObjects(IEnumerable $"[{message}] SENT BY INITIATOR"));
- sqlCommand.CommandText = $"CREATE CONTRACT [{_dataBaseObjectsNamingConvention}] ({contractBody})";
- sqlCommand.ExecuteNonQuery();
+ var contractBody = string.Join("," + Environment.NewLine,
+ processableMessages.Select(message => $"[{message}] SENT BY INITIATOR"));
+ if (greatDBObject)
+ {
+ sqlCommand.CommandText = $"CREATE CONTRACT [{_dataBaseObjectsNamingConvention}] ({contractBody})";
+ sqlCommand.ExecuteNonQuery();
+ }
+
this.WriteTraceMessage(TraceLevel.Verbose, $"Contract {_dataBaseObjectsNamingConvention} created.");
// Queues
- sqlCommand.CommandText = $"CREATE QUEUE [{_schemaName}].[{_dataBaseObjectsNamingConvention}_Receiver] WITH STATUS = ON, RETENTION = OFF, POISON_MESSAGE_HANDLING (STATUS = OFF);";
- sqlCommand.ExecuteNonQuery();
- this.WriteTraceMessage(TraceLevel.Verbose, $"Queue {_dataBaseObjectsNamingConvention}_Receiver created.");
+ if (greatDBObject)
+ {
+ sqlCommand.CommandText =
+ $"CREATE QUEUE [{_schemaName}].[{_dataBaseObjectsNamingConvention}_Receiver] WITH STATUS = ON, RETENTION = OFF, POISON_MESSAGE_HANDLING (STATUS = OFF);";
+ sqlCommand.ExecuteNonQuery();
+ }
+
+ this.WriteTraceMessage(TraceLevel.Verbose,
+ $"Queue {_dataBaseObjectsNamingConvention}_Receiver created.");
+
+ if (greatDBObject)
+ {
+ sqlCommand.CommandText =
+ $"CREATE QUEUE [{_schemaName}].[{_dataBaseObjectsNamingConvention}_Sender] WITH STATUS = ON, RETENTION = OFF, POISON_MESSAGE_HANDLING (STATUS = OFF);";
+ sqlCommand.ExecuteNonQuery();
+ }
- sqlCommand.CommandText = $"CREATE QUEUE [{_schemaName}].[{_dataBaseObjectsNamingConvention}_Sender] WITH STATUS = ON, RETENTION = OFF, POISON_MESSAGE_HANDLING (STATUS = OFF);";
- sqlCommand.ExecuteNonQuery();
- this.WriteTraceMessage(TraceLevel.Verbose, $"Queue {_dataBaseObjectsNamingConvention}_Sender created.");
+ this.WriteTraceMessage(TraceLevel.Verbose,
+ $"Queue {_dataBaseObjectsNamingConvention}_Sender created.");
// Services
- sqlCommand.CommandText = string.IsNullOrWhiteSpace(this.ServiceAuthorization)
- ? $"CREATE SERVICE [{_dataBaseObjectsNamingConvention}_Sender] ON QUEUE [{_schemaName}].[{_dataBaseObjectsNamingConvention}_Sender];"
- : $"CREATE SERVICE [{_dataBaseObjectsNamingConvention}_Sender] AUTHORIZATION [{this.ServiceAuthorization}] ON QUEUE [{_schemaName}].[{_dataBaseObjectsNamingConvention}_Sender];";
- sqlCommand.ExecuteNonQuery();
- this.WriteTraceMessage(TraceLevel.Verbose, $"Service broker {_dataBaseObjectsNamingConvention}_Sender created.");
-
- sqlCommand.CommandText = string.IsNullOrWhiteSpace(this.ServiceAuthorization)
- ? $"CREATE SERVICE [{_dataBaseObjectsNamingConvention}_Receiver] ON QUEUE [{_schemaName}].[{_dataBaseObjectsNamingConvention}_Receiver] ([{_dataBaseObjectsNamingConvention}]);"
- : $"CREATE SERVICE [{_dataBaseObjectsNamingConvention}_Receiver] AUTHORIZATION [{this.ServiceAuthorization}] ON QUEUE [{_schemaName}].[{_dataBaseObjectsNamingConvention}_Receiver] ([{_dataBaseObjectsNamingConvention}]);";
- sqlCommand.ExecuteNonQuery();
- this.WriteTraceMessage(TraceLevel.Verbose, $"Service broker {_dataBaseObjectsNamingConvention}_Receiver created.");
+ if (greatDBObject)
+ {
+ sqlCommand.CommandText = string.IsNullOrWhiteSpace(this.ServiceAuthorization)
+ ? $"CREATE SERVICE [{_dataBaseObjectsNamingConvention}_Sender] ON QUEUE [{_schemaName}].[{_dataBaseObjectsNamingConvention}_Sender];"
+ : $"CREATE SERVICE [{_dataBaseObjectsNamingConvention}_Sender] AUTHORIZATION [{this.ServiceAuthorization}] ON QUEUE [{_schemaName}].[{_dataBaseObjectsNamingConvention}_Sender];";
+ sqlCommand.ExecuteNonQuery();
+ }
+
+ this.WriteTraceMessage(TraceLevel.Verbose,
+ $"Service broker {_dataBaseObjectsNamingConvention}_Sender created.");
+
+ if (greatDBObject)
+ {
+ sqlCommand.CommandText = string.IsNullOrWhiteSpace(this.ServiceAuthorization)
+ ? $"CREATE SERVICE [{_dataBaseObjectsNamingConvention}_Receiver] ON QUEUE [{_schemaName}].[{_dataBaseObjectsNamingConvention}_Receiver] ([{_dataBaseObjectsNamingConvention}]);"
+ : $"CREATE SERVICE [{_dataBaseObjectsNamingConvention}_Receiver] AUTHORIZATION [{this.ServiceAuthorization}] ON QUEUE [{_schemaName}].[{_dataBaseObjectsNamingConvention}_Receiver] ([{_dataBaseObjectsNamingConvention}]);";
+ sqlCommand.ExecuteNonQuery();
+ }
+
+ this.WriteTraceMessage(TraceLevel.Verbose,
+ $"Service broker {_dataBaseObjectsNamingConvention}_Receiver created.");
// Activation Store Procedure
- var dropMessages = string.Join(Environment.NewLine, processableMessages.Select((pm, index) =>
- {
- if (index > 0) return this.Spacer(8) + string.Format("IF EXISTS (SELECT * FROM sys.service_message_types WITH (NOLOCK) WHERE name = N'{0}') DROP MESSAGE TYPE [{0}];", pm);
- return string.Format("IF EXISTS (SELECT * FROM sys.service_message_types WITH (NOLOCK) WHERE name = N'{0}') DROP MESSAGE TYPE [{0}];", pm);
+ var dropMessages = string.Join(Environment.NewLine, processableMessages.Select((pm, index) =>
+ {
+ if (index > 0)
+ return this.Spacer(8) +
+ string.Format(
+ "IF EXISTS (SELECT * FROM sys.service_message_types WITH (NOLOCK) WHERE name = N'{0}') DROP MESSAGE TYPE [{0}];",
+ pm);
+ return string.Format(
+ "IF EXISTS (SELECT * FROM sys.service_message_types WITH (NOLOCK) WHERE name = N'{0}') DROP MESSAGE TYPE [{0}];",
+ pm);
}));
var dropAllScript = this.PrepareScriptDropAll(dropMessages);
- sqlCommand.CommandText = this.PrepareScriptProcedureQueueActivation(dropAllScript);
- sqlCommand.ExecuteNonQuery();
- this.WriteTraceMessage(TraceLevel.Verbose, $"Procedure {_dataBaseObjectsNamingConvention} created.");
+ if (greatDBObject)
+ {
+ sqlCommand.CommandText = this.PrepareScriptProcedureQueueActivation(dropAllScript);
+ sqlCommand.ExecuteNonQuery();
+ }
+
+ this.WriteTraceMessage(TraceLevel.Verbose,
+ $"Procedure {_dataBaseObjectsNamingConvention} created.");
// Begin conversation
this.ConversationHandle = this.BeginConversation(sqlCommand);
- this.WriteTraceMessage(TraceLevel.Verbose, $"Conversation with handler {this.ConversationHandle} started.");
+ this.WriteTraceMessage(TraceLevel.Verbose,
+ $"Conversation with handler {this.ConversationHandle} started.");
// Trigger
var declareVariableStatement = this.PrepareDeclareVariableStatement(interestedColumns);
var selectForSetVariablesStatement = this.PrepareSelectForSetVariables(interestedColumns);
- var sendInsertConversationStatements = this.PrepareSendConversation(ChangeType.Insert, interestedColumns);
- var sendUpdatedConversationStatements = this.PrepareSendConversation(ChangeType.Update, interestedColumns);
- var sendDeletedConversationStatements = this.PrepareSendConversation(ChangeType.Delete, interestedColumns);
-
- sqlCommand.CommandText = string.Format(
- SqlScripts.CreateTrigger,
- _dataBaseObjectsNamingConvention,
- $"[{_schemaName}].[{_tableName}]",
- columnsForModifiedRecordsTable,
- this.PrepareColumnListForSelectFromTableVariable(tableColumns),
- this.PrepareInsertIntoTableVariableForUpdateChange(interestedColumns, columnsForUpdateOf),
- declareVariableStatement,
- selectForSetVariablesStatement,
- sendInsertConversationStatements,
- sendUpdatedConversationStatements,
- sendDeletedConversationStatements,
- ChangeType.Insert,
- ChangeType.Update,
- ChangeType.Delete,
- string.Join(", ", this.GetDmlTriggerType(_dmlTriggerType)),
- this.CreateWhereCondition(),
- this.PrepareTriggerLogScript(),
- this.ActivateDatabaseLogging ? " WITH LOG" : string.Empty,
- columnsForExceptTable,
- columnsForDeletedTable,
- this.ConversationHandle,
- dropAllScript);
-
- sqlCommand.ExecuteNonQuery();
- this.WriteTraceMessage(TraceLevel.Verbose, $"Trigger {_dataBaseObjectsNamingConvention} created.");
-
- // Associate Activation Store Procedure to sender queue
- sqlCommand.CommandText = $"ALTER QUEUE [{_schemaName}].[{_dataBaseObjectsNamingConvention}_Sender] WITH ACTIVATION (PROCEDURE_NAME = [{_schemaName}].[{_dataBaseObjectsNamingConvention}_QueueActivationSender], MAX_QUEUE_READERS = 1, EXECUTE AS {this.QueueExecuteAs.ToUpper()}, STATUS = ON);";
- sqlCommand.ExecuteNonQuery();
-
- // Run the watch-dog
- sqlCommand.CommandText = $"BEGIN CONVERSATION TIMER ('{this.ConversationHandle.ToString().ToUpper()}') TIMEOUT = " + watchDogTimeOut + ";";
- sqlCommand.ExecuteNonQuery();
- this.WriteTraceMessage(TraceLevel.Verbose, "Watch dog started.");
+ var sendInsertConversationStatements =
+ this.PrepareSendConversation(ChangeType.Insert, interestedColumns);
+ var sendUpdatedConversationStatements =
+ this.PrepareSendConversation(ChangeType.Update, interestedColumns);
+ var sendDeletedConversationStatements =
+ this.PrepareSendConversation(ChangeType.Delete, interestedColumns);
+
+ if (greatDBObject)
+ {
+ sqlCommand.CommandText = string.Format(
+ SqlScripts.CreateTrigger,
+ _dataBaseObjectsNamingConvention,
+ $"[{_schemaName}].[{_tableName}]",
+ columnsForModifiedRecordsTable,
+ this.PrepareColumnListForSelectFromTableVariable(tableColumns),
+ this.PrepareInsertIntoTableVariableForUpdateChange(interestedColumns, columnsForUpdateOf),
+ declareVariableStatement,
+ selectForSetVariablesStatement,
+ sendInsertConversationStatements,
+ sendUpdatedConversationStatements,
+ sendDeletedConversationStatements,
+ ChangeType.Insert,
+ ChangeType.Update,
+ ChangeType.Delete,
+ string.Join(", ", this.GetDmlTriggerType(_dmlTriggerType)),
+ this.CreateWhereCondition(),
+ this.PrepareTriggerLogScript(),
+ this.ActivateDatabaseLogging ? " WITH LOG" : string.Empty,
+ columnsForExceptTable,
+ columnsForDeletedTable,
+ this.ConversationHandle,
+ dropAllScript);
+
+ sqlCommand.ExecuteNonQuery();
+ this.WriteTraceMessage(TraceLevel.Verbose,
+ $"Trigger {_dataBaseObjectsNamingConvention} created.");
+ // Associate Activation Store Procedure to sender queue
+ sqlCommand.CommandText = $"ALTER QUEUE [{_schemaName}].[{_dataBaseObjectsNamingConvention}_Sender] WITH ACTIVATION (PROCEDURE_NAME = [{_schemaName}].[{_dataBaseObjectsNamingConvention}_QueueActivationSender], MAX_QUEUE_READERS = 1, EXECUTE AS {this.QueueExecuteAs.ToUpper()}, STATUS = ON);";
+ sqlCommand.ExecuteNonQuery();
+
+ // Run the watch-dog
+ if (!_saveQueue)
+ {
+ sqlCommand.CommandText = $"BEGIN CONVERSATION TIMER ('{this.ConversationHandle.ToString().ToUpper()}') TIMEOUT = " + watchDogTimeOut + ";";
+ sqlCommand.ExecuteNonQuery();
+ this.WriteTraceMessage(TraceLevel.Verbose, "Watch dog started.");
+ }
+ }
// Persist all objects
transaction.Commit();
@@ -583,7 +683,8 @@ protected virtual IList CreateSqlServerDatabaseObjects(IEnumerable CreateSqlServerDatabaseObjects(IEnumerable interestedColumns)
+ protected virtual string PrepareInsertIntoModifiedRecordsTableStatement(
+ IReadOnlyCollection interestedColumns)
{
string insertIntoExceptStatement;
@@ -651,7 +758,9 @@ protected virtual string PrepareInsertIntoModifiedRecordsTableStatement(IReadOnl
this.Spacer(12) +
$"INSERT INTO @insertedTable SELECT {sBuilderColumns} FROM INSERTED" + Environment.NewLine;
- if (interestedColumns.Any(tableColumn => string.Equals(tableColumn.Type.ToLowerInvariant(), "timestamp", StringComparison.OrdinalIgnoreCase) || string.Equals(tableColumn.Type.ToLowerInvariant(), "rowversion", StringComparison.OrdinalIgnoreCase)))
+ if (interestedColumns.Any(tableColumn =>
+ string.Equals(tableColumn.Type.ToLowerInvariant(), "timestamp", StringComparison.OrdinalIgnoreCase) ||
+ string.Equals(tableColumn.Type.ToLowerInvariant(), "rowversion", StringComparison.OrdinalIgnoreCase)))
{
insertIntoExceptStatement =
insertedAndDeletedTableVariable +
@@ -673,11 +782,12 @@ protected virtual string PrepareInsertIntoModifiedRecordsTableStatement(IReadOnl
foreach (var column in interestedColumns)
{
sBuilderColumns.Append($"{comma.GetSeparator()}[{column.Name}]");
- sBuilderColumns.Append($"{comma.GetSeparator()}(SELECT d.[{column.Name}] FROM @deletedTable d WHERE d.[RowNumber] = e.[RowNumber])");
+ sBuilderColumns.Append(
+ $"{comma.GetSeparator()}(SELECT d.[{column.Name}] FROM @deletedTable d WHERE d.[RowNumber] = e.[RowNumber])");
}
}
- var insertIntoModifiedRecordsTable =
+ var insertIntoModifiedRecordsTable =
insertIntoExceptStatement + Environment.NewLine + Environment.NewLine +
this.Spacer(12) +
$"INSERT INTO @modifiedRecordsTable SELECT {sBuilderColumns} FROM @exceptTable e {whereCondition}";
@@ -696,9 +806,12 @@ protected virtual IEnumerable GetDmlTriggerType(DmlTriggerType dmlTrigge
}
else
{
- if (dmlTriggerType.HasFlag(DmlTriggerType.Insert)) afters.Add(DmlTriggerType.Insert.ToString().ToLowerInvariant());
- if (dmlTriggerType.HasFlag(DmlTriggerType.Delete)) afters.Add(DmlTriggerType.Delete.ToString().ToLowerInvariant());
- if (dmlTriggerType.HasFlag(DmlTriggerType.Update)) afters.Add(DmlTriggerType.Update.ToString().ToLowerInvariant());
+ if (dmlTriggerType.HasFlag(DmlTriggerType.Insert))
+ afters.Add(DmlTriggerType.Insert.ToString().ToLowerInvariant());
+ if (dmlTriggerType.HasFlag(DmlTriggerType.Delete))
+ afters.Add(DmlTriggerType.Delete.ToString().ToLowerInvariant());
+ if (dmlTriggerType.HasFlag(DmlTriggerType.Update))
+ afters.Add(DmlTriggerType.Update.ToString().ToLowerInvariant());
}
return afters;
@@ -708,7 +821,12 @@ protected virtual MessagesBag CreateMessagesBag(Encoding encoding, ICollection { string.Format(StartMessageTemplate, _dataBaseObjectsNamingConvention, ChangeType.Insert), string.Format(StartMessageTemplate, _dataBaseObjectsNamingConvention, ChangeType.Update), string.Format(StartMessageTemplate, _dataBaseObjectsNamingConvention, ChangeType.Delete) },
+ new List
+ {
+ string.Format(StartMessageTemplate, _dataBaseObjectsNamingConvention, ChangeType.Insert),
+ string.Format(StartMessageTemplate, _dataBaseObjectsNamingConvention, ChangeType.Update),
+ string.Format(StartMessageTemplate, _dataBaseObjectsNamingConvention, ChangeType.Delete)
+ },
string.Format(EndMessageTemplate, _dataBaseObjectsNamingConvention),
processableMessages);
}
@@ -730,7 +848,8 @@ protected virtual string PrepareColumnListForSelectFromTableVariable(IEnumerable
return string.Join(", ", columns.ToList());
}
- protected virtual string PrepareColumnListForTableVariable(IEnumerable tableColumns, bool includeOldValues)
+ protected virtual string PrepareColumnListForTableVariable(IEnumerable tableColumns,
+ bool includeOldValues)
{
var columns = tableColumns.Select(tableColumn =>
{
@@ -741,17 +860,19 @@ protected virtual string PrepareColumnListForTableVariable(IEnumerable userInterestedColumns, TableColumnInfo userInterestedColumn, bool isOld = false)
+ protected virtual string ConvertValueByType(IReadOnlyCollection userInterestedColumns,
+ TableColumnInfo userInterestedColumn, bool isOld = false)
{
var oldNameExtension = isOld ? "_old" : string.Empty;
- if (string.Equals(userInterestedColumn.Type, "binary", StringComparison.OrdinalIgnoreCase) || string.Equals(userInterestedColumn.Type, "varbinary", StringComparison.OrdinalIgnoreCase) || string.Equals(userInterestedColumn.Type, "timestamp", StringComparison.OrdinalIgnoreCase))
+ if (string.Equals(userInterestedColumn.Type, "binary", StringComparison.OrdinalIgnoreCase) ||
+ string.Equals(userInterestedColumn.Type, "varbinary", StringComparison.OrdinalIgnoreCase) ||
+ string.Equals(userInterestedColumn.Type, "timestamp", StringComparison.OrdinalIgnoreCase))
{
return this.SanitizeVariableName(userInterestedColumns, userInterestedColumn.Name) + oldNameExtension;
}
if (userInterestedColumn.Type.ToLower() == "float")
{
- return $"CONVERT(NVARCHAR(MAX), RTRIM(LTRIM(STR({this.SanitizeVariableName(userInterestedColumns, userInterestedColumn.Name)}{oldNameExtension}{this.ConvertFormat(userInterestedColumn)}, 53, 16))))";
+ return
+ $"CONVERT(NVARCHAR(MAX), RTRIM(LTRIM(STR({this.SanitizeVariableName(userInterestedColumns, userInterestedColumn.Name)}{oldNameExtension}{this.ConvertFormat(userInterestedColumn)}, 53, 16))))";
}
- return $"CONVERT(NVARCHAR(MAX), {this.SanitizeVariableName(userInterestedColumns, userInterestedColumn.Name)}{oldNameExtension}{this.ConvertFormat(userInterestedColumn)})";
+ return
+ $"CONVERT(NVARCHAR(MAX), {this.SanitizeVariableName(userInterestedColumns, userInterestedColumn.Name)}{oldNameExtension}{this.ConvertFormat(userInterestedColumn)})";
}
- protected virtual string PrepareSendConversation(ChangeType dmlType, IReadOnlyCollection userInterestedColumns)
+ protected virtual string PrepareSendConversation(ChangeType dmlType,
+ IReadOnlyCollection userInterestedColumns)
{
var sendList = userInterestedColumns
.Select(interestedColumn =>
{
- var sendStatement = this.Spacer(16) + $"IF {this.SanitizeVariableName(userInterestedColumns, interestedColumn.Name)} IS NOT NULL BEGIN" + Environment.NewLine + this.Spacer(20) + $";SEND ON CONVERSATION '{this.ConversationHandle}' MESSAGE TYPE [{_dataBaseObjectsNamingConvention}/{interestedColumn.Name}] ({this.ConvertValueByType(userInterestedColumns, interestedColumn)})" + Environment.NewLine + this.Spacer(16) + "END" + Environment.NewLine + this.Spacer(16) + "ELSE BEGIN" + Environment.NewLine + this.Spacer(20) + $";SEND ON CONVERSATION '{this.ConversationHandle}' MESSAGE TYPE [{_dataBaseObjectsNamingConvention}/{interestedColumn.Name}] (0x)" + Environment.NewLine + this.Spacer(16) + "END";
+ var sendStatement = this.Spacer(16) +
+ $"IF {this.SanitizeVariableName(userInterestedColumns, interestedColumn.Name)} IS NOT NULL BEGIN" +
+ Environment.NewLine + this.Spacer(20) +
+ $";SEND ON CONVERSATION '{this.ConversationHandle}' MESSAGE TYPE [{_dataBaseObjectsNamingConvention}/{interestedColumn.Name}] ({this.ConvertValueByType(userInterestedColumns, interestedColumn)})" +
+ Environment.NewLine + this.Spacer(16) + "END" + Environment.NewLine +
+ this.Spacer(16) + "ELSE BEGIN" + Environment.NewLine + this.Spacer(20) +
+ $";SEND ON CONVERSATION '{this.ConversationHandle}' MESSAGE TYPE [{_dataBaseObjectsNamingConvention}/{interestedColumn.Name}] (0x)" +
+ Environment.NewLine + this.Spacer(16) + "END";
if (this.IncludeOldValues)
{
- sendStatement += Environment.NewLine + this.Spacer(16) + $"IF {this.SanitizeVariableName(userInterestedColumns, interestedColumn.Name)}_old IS NOT NULL BEGIN" + Environment.NewLine + this.Spacer(20) + $";SEND ON CONVERSATION '{this.ConversationHandle}' MESSAGE TYPE [{_dataBaseObjectsNamingConvention}/{interestedColumn.Name}/old] ({this.ConvertValueByType(userInterestedColumns, interestedColumn, this.IncludeOldValues)})" + Environment.NewLine + this.Spacer(16) + "END" + Environment.NewLine + this.Spacer(16) + "ELSE BEGIN" + Environment.NewLine + this.Spacer(20) + $";SEND ON CONVERSATION '{this.ConversationHandle}' MESSAGE TYPE [{_dataBaseObjectsNamingConvention}/{interestedColumn.Name}/old] (0x)" + Environment.NewLine + this.Spacer(16) + "END";
+ sendStatement += Environment.NewLine + this.Spacer(16) +
+ $"IF {this.SanitizeVariableName(userInterestedColumns, interestedColumn.Name)}_old IS NOT NULL BEGIN" +
+ Environment.NewLine + this.Spacer(20) +
+ $";SEND ON CONVERSATION '{this.ConversationHandle}' MESSAGE TYPE [{_dataBaseObjectsNamingConvention}/{interestedColumn.Name}/old] ({this.ConvertValueByType(userInterestedColumns, interestedColumn, this.IncludeOldValues)})" +
+ Environment.NewLine + this.Spacer(16) + "END" + Environment.NewLine +
+ this.Spacer(16) + "ELSE BEGIN" + Environment.NewLine + this.Spacer(20) +
+ $";SEND ON CONVERSATION '{this.ConversationHandle}' MESSAGE TYPE [{_dataBaseObjectsNamingConvention}/{interestedColumn.Name}/old] (0x)" +
+ Environment.NewLine + this.Spacer(16) + "END";
}
return sendStatement;
})
.ToList();
- sendList.Insert(0, $";SEND ON CONVERSATION '{this.ConversationHandle}' MESSAGE TYPE [{string.Format(StartMessageTemplate, _dataBaseObjectsNamingConvention, dmlType)}] (CONVERT(NVARCHAR, @dmlType))" + Environment.NewLine);
- sendList.Add(Environment.NewLine + this.Spacer(16) + $";SEND ON CONVERSATION '{this.ConversationHandle}' MESSAGE TYPE [{string.Format(EndMessageTemplate, _dataBaseObjectsNamingConvention)}] (0x)");
+ sendList.Insert(0,
+ $";SEND ON CONVERSATION '{this.ConversationHandle}' MESSAGE TYPE [{string.Format(StartMessageTemplate, _dataBaseObjectsNamingConvention, dmlType)}] (CONVERT(NVARCHAR, @dmlType))" +
+ Environment.NewLine);
+ sendList.Add(Environment.NewLine + this.Spacer(16) +
+ $";SEND ON CONVERSATION '{this.ConversationHandle}' MESSAGE TYPE [{string.Format(EndMessageTemplate, _dataBaseObjectsNamingConvention)}] (0x)");
return string.Join(Environment.NewLine, sendList);
}
- protected virtual string PrepareSelectForSetVariables(IReadOnlyCollection userInterestedColumns)
+ protected virtual string PrepareSelectForSetVariables(
+ IReadOnlyCollection userInterestedColumns)
{
- var result = string.Join(", ", userInterestedColumns.Select(interestedColumn => $"{this.SanitizeVariableName(userInterestedColumns, interestedColumn.Name)} = [{interestedColumn.Name}]"));
- if (this.IncludeOldValues) result += ", " + string.Join(", ", userInterestedColumns.Select(interestedColumn => $"{this.SanitizeVariableName(userInterestedColumns, interestedColumn.Name)}_old = [{interestedColumn.Name}_old]"));
+ var result = string.Join(", ",
+ userInterestedColumns.Select(interestedColumn =>
+ $"{this.SanitizeVariableName(userInterestedColumns, interestedColumn.Name)} = [{interestedColumn.Name}]"));
+ if (this.IncludeOldValues)
+ result += ", " + string.Join(", ",
+ userInterestedColumns.Select(interestedColumn =>
+ $"{this.SanitizeVariableName(userInterestedColumns, interestedColumn.Name)}_old = [{interestedColumn.Name}_old]"));
return result;
}
@@ -869,15 +1030,17 @@ protected virtual string PrepareSelectForSetVariables(IReadOnlyCollection interestedColumns)
{
var columnsList = (from interestedColumn in interestedColumns
- let variableType = $"{interestedColumn.Type.ToLowerInvariant()}" + (string.IsNullOrWhiteSpace(interestedColumn.Size)
- ? string.Empty
- : $"({interestedColumn.Size})")
- select this.DeclareStatement(interestedColumns, interestedColumn, variableType)).ToList();
+ let variableType = $"{interestedColumn.Type.ToLowerInvariant()}" +
+ (string.IsNullOrWhiteSpace(interestedColumn.Size)
+ ? string.Empty
+ : $"({interestedColumn.Size})")
+ select this.DeclareStatement(interestedColumns, interestedColumn, variableType)).ToList();
return string.Join(Environment.NewLine + this.Spacer(4), columnsList);
}
- protected virtual string DeclareStatement(IReadOnlyCollection interestedColumns, TableColumnInfo interestedColumn, string variableType)
+ protected virtual string DeclareStatement(IReadOnlyCollection interestedColumns,
+ TableColumnInfo interestedColumn, string variableType)
{
var variableName = this.SanitizeVariableName(interestedColumns, interestedColumn.Name);
@@ -887,7 +1050,8 @@ protected virtual string DeclareStatement(IReadOnlyCollection i
return declare;
}
- protected virtual string SanitizeVariableName(IReadOnlyCollection userInterestedColumns, string tableColumnName)
+ protected virtual string SanitizeVariableName(IReadOnlyCollection userInterestedColumns,
+ string tableColumnName)
{
for (var i = 0; i < userInterestedColumns.Count; i++)
{
@@ -902,7 +1066,8 @@ protected virtual string SanitizeVariableName(IReadOnlyCollection !string.Equals(r.PermissionType, permissionToCheck, StringComparison.OrdinalIgnoreCase)))
+ var permissionToCheck =
+ EnumUtil.GetDescriptionFromEnumValue((SqlServerRequiredPermission) permission);
+ if (privilegesTable.Rows.All(r =>
+ !string.Equals(r.PermissionType, permissionToCheck, StringComparison.OrdinalIgnoreCase)))
{
throw new UserWithMissingPermissionException(permissionToCheck);
}
@@ -970,8 +1139,9 @@ protected virtual void CheckIfServiceBrokerIsEnabled()
sqlConnection.Open();
using (var sqlCommand = sqlConnection.CreateCommand())
{
- sqlCommand.CommandText = "SELECT is_broker_enabled FROM sys.databases WITH (NOLOCK) WHERE database_id = db_id();";
- if ((bool)sqlCommand.ExecuteScalar() == false) throw new ServiceBrokerNotEnabledException();
+ sqlCommand.CommandText =
+ "SELECT is_broker_enabled FROM sys.databases WITH (NOLOCK) WHERE database_id = db_id();";
+ if ((bool) sqlCommand.ExecuteScalar() == false) throw new ServiceBrokerNotEnabledException();
}
}
}
@@ -984,7 +1154,7 @@ protected override void CheckIfTableExists()
using (var sqlCommand = sqlConnection.CreateCommand())
{
sqlCommand.CommandText = string.Format(SqlScripts.InformationSchemaTables, _tableName, _schemaName);
- if ((int)sqlCommand.ExecuteScalar() == 0) throw new NotExistingTableException(_tableName);
+ if ((int) sqlCommand.ExecuteScalar() == 0) throw new NotExistingTableException(_tableName);
}
}
}
@@ -1001,10 +1171,20 @@ protected virtual async Task WaitForNotifications(
var messagesBag = this.CreateMessagesBag(this.Encoding, _processableMessages);
var messageNumber = _userInterestedColumns.Count() * (this.IncludeOldValues ? 2 : 1) + 2;
+ var doNotDeleteDBObjet = _saveQueue;
+ string waitForSqlScript;
- var waitForSqlScript =
- $"BEGIN CONVERSATION TIMER ('{this.ConversationHandle.ToString().ToUpper()}') TIMEOUT = " + timeOutWatchDog + ";" +
- $"WAITFOR (RECEIVE TOP({messageNumber}) [message_type_name], [message_body] FROM [{_schemaName}].[{_dataBaseObjectsNamingConvention}_Receiver]), TIMEOUT {timeOut * 1000};";
+ if (_saveQueue)
+ {
+ waitForSqlScript = $"RECEIVE TOP({messageNumber}) [message_type_name], [message_body] FROM [{_schemaName}].[{_dataBaseObjectsNamingConvention}_Receiver];";
+ }
+ else
+ {
+ waitForSqlScript =
+ $"BEGIN CONVERSATION TIMER ('{this.ConversationHandle.ToString().ToUpper()}') TIMEOUT = " +
+ timeOutWatchDog + ";" +
+ $"WAITFOR (RECEIVE TOP({messageNumber}) [message_type_name], [message_body] FROM [{_schemaName}].[{_dataBaseObjectsNamingConvention}_Receiver]), TIMEOUT {timeOut * 1000};";
+ }
this.NotifyListenersAboutStatus(onStatusChangedSubscribedList, TableDependencyStatus.Started);
@@ -1014,34 +1194,38 @@ protected virtual async Task WaitForNotifications(
{
await sqlConnection.OpenAsync(cancellationToken);
this.WriteTraceMessage(TraceLevel.Verbose, "Connection opened.");
- this.NotifyListenersAboutStatus(onStatusChangedSubscribedList, TableDependencyStatus.WaitingForNotification);
-
+ this.NotifyListenersAboutStatus(onStatusChangedSubscribedList,
+ TableDependencyStatus.WaitingForNotification);
+
while (true)
{
messagesBag.Reset();
-
+
using (var sqlCommand = new SqlCommand(waitForSqlScript, sqlConnection))
{
sqlCommand.CommandTimeout = 0;
this.WriteTraceMessage(TraceLevel.Verbose, "Executing WAITFOR command.");
-
+
using (var sqlDataReader = await sqlCommand.ExecuteReaderAsync(cancellationToken).WithCancellation(cancellationToken))
{
while (sqlDataReader.Read())
{
- var message = new Message(sqlDataReader.GetSqlString(0).Value, sqlDataReader.IsDBNull(1) ? null : sqlDataReader.GetSqlBytes(1).Value);
- if (message.MessageType == SqlMessageTypes.ErrorType) throw new QueueContainingErrorMessageException();
+ var message = new Message(sqlDataReader.GetSqlString(0).Value,
+ sqlDataReader.IsDBNull(1) ? null : sqlDataReader.GetSqlBytes(1).Value);
+ if (message.MessageType == SqlMessageTypes.ErrorType)
+ throw new QueueContainingErrorMessageException();
messagesBag.AddMessage(message);
- this.WriteTraceMessage(TraceLevel.Verbose, $"Received message type = {message.MessageType}.");
+ this.WriteTraceMessage(TraceLevel.Verbose,
+ $"Received message type = {message.MessageType}.");
}
}
}
-
+
if (messagesBag.Status == MessagesBagStatus.Collecting)
{
throw new MessageMisalignedException("Received a number of messages lower than expected.");
}
-
+
if (messagesBag.Status == MessagesBagStatus.Ready)
{
this.WriteTraceMessage(TraceLevel.Verbose, "Message ready to be notified.");
@@ -1053,30 +1237,38 @@ protected virtual async Task WaitForNotifications(
}
catch (OperationCanceledException)
{
- this.NotifyListenersAboutStatus(onStatusChangedSubscribedList, TableDependencyStatus.StopDueToCancellation);
+ this.NotifyListenersAboutStatus(onStatusChangedSubscribedList,
+ TableDependencyStatus.StopDueToCancellation);
this.WriteTraceMessage(TraceLevel.Info, "Operation canceled.");
}
catch (AggregateException aggregateException)
{
this.NotifyListenersAboutStatus(onStatusChangedSubscribedList, TableDependencyStatus.StopDueToError);
- if (cancellationToken.IsCancellationRequested == false) this.NotifyListenersAboutError(onErrorSubscribedList, aggregateException.InnerException);
- this.WriteTraceMessage(TraceLevel.Error, "Exception in WaitForNotifications.", aggregateException.InnerException);
+ if (cancellationToken.IsCancellationRequested == false)
+ this.NotifyListenersAboutError(onErrorSubscribedList, aggregateException.InnerException);
+ this.WriteTraceMessage(TraceLevel.Error, "Exception in WaitForNotifications.",
+ aggregateException.InnerException);
}
catch (SqlException sqlException)
{
this.NotifyListenersAboutStatus(onStatusChangedSubscribedList, TableDependencyStatus.StopDueToError);
- if (cancellationToken.IsCancellationRequested == false) this.NotifyListenersAboutError(onErrorSubscribedList, sqlException);
+ if (cancellationToken.IsCancellationRequested == false)
+ this.NotifyListenersAboutError(onErrorSubscribedList, sqlException);
this.WriteTraceMessage(TraceLevel.Error, "Exception in WaitForNotifications.", sqlException);
}
catch (Exception exception)
{
this.NotifyListenersAboutStatus(onStatusChangedSubscribedList, TableDependencyStatus.StopDueToError);
- if (cancellationToken.IsCancellationRequested == false) this.NotifyListenersAboutError(onErrorSubscribedList, exception);
+ if (cancellationToken.IsCancellationRequested == false)
+ this.NotifyListenersAboutError(onErrorSubscribedList, exception);
this.WriteTraceMessage(TraceLevel.Error, "Exception in WaitForNotifications.", exception);
}
finally
{
- this.DropDatabaseObjects();
+ if (!doNotDeleteDBObjet)
+ {
+ this.DropDatabaseObjects();
+ }
}
}
}
diff --git a/TableDependency.SqlClient/TableDependency.SqlClient.csproj b/TableDependency.SqlClient/TableDependency.SqlClient.csproj
index 2c0194e..247d74f 100644
--- a/TableDependency.SqlClient/TableDependency.SqlClient.csproj
+++ b/TableDependency.SqlClient/TableDependency.SqlClient.csproj
@@ -19,6 +19,7 @@
GitHub
SQL Server dependency notifications record table change
+ 8.5.8.27