// Spludlow Software // Copyright © Samuel P. Ludlow 2020 All Rights Reserved // Distributed under the terms of the GNU General Public License version 3 // Distributed WITHOUT ANY WARRANTY; without implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE // https://www.spludlow.co.uk/LICENCE.TXT // The Spludlow logo is a registered trademark of Samuel P. Ludlow and may not be used without permission // v1.14 using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Messaging; using System.Data; namespace Spludlow { public class MessageQueues { public static string Path(string host, string queue) { return Path(host, queue, Spludlow.Config.Get("Spludlow.QueueNamespace")); } public static string Path(string host, string queue, string queueNamespace) { StringBuilder path = new StringBuilder(); path.Append(@"FORMATNAME:DIRECT=OS:"); path.Append(host); path.Append(@"\Private$\"); path.Append(queueNamespace); path.Append("."); path.Append(queue); return path.ToString(); } public static void Send(Spludlow.CallSet info, string host, string queue) { Spludlow.CallMethod call = info.Calls[info.CallIndex]; string path = Path(host, queue); string checkSum = ""; if (call.Flags.HasFlag(Spludlow.CallFlags.DontDuplicate) == true) { checkSum = Spludlow.Hashing.MD5HexObject(info); if (LabelExists(queue, checkSum) == true) { Spludlow.Log.Warning("MessageQueues; DontDuplicate: " + queue + ", " + checkSum); return; } } BinaryMessageFormatter formatter = new BinaryMessageFormatter(); using (MessageQueue messageQueue = new MessageQueue(path)) { using (Message message = new Message(info, formatter)) { message.Label = checkSum; message.Recoverable = true; messageQueue.Send(message); } } } public static CallSet Peek(string host, string queue, string messageId) { return (CallSet)Spludlow.Call.Now(host, "Spludlow", "Spludlow.MessageQueues", "Peek", new string[] { queue, messageId }); } public static CallSet Peek(string queue, string messageId) { string path = Path(Environment.MachineName, queue); using (MessageQueue messageQueue = new MessageQueue(path)) { BinaryMessageFormatter formatter = new BinaryMessageFormatter(); messageQueue.Formatter = formatter; using (Message message = messageQueue.PeekById(messageId)) { if (message == null) throw new ApplicationException("Peek Message By Id; Message not there:\t" + messageId); CallSet callSet = (CallSet)message.Body; return callSet; } } } public static CallSet PeekPath(string path) { using (MessageQueue messageQueue = new MessageQueue(path)) { BinaryMessageFormatter formatter = new BinaryMessageFormatter(); messageQueue.Formatter = formatter; using (Message message = messageQueue.Peek()) { if (message == null) throw new ApplicationException("Peek Message By Id; Message not there"); CallSet callSet = (CallSet)message.Body; return callSet; } } } public static CallSet ReceivePath(string path) { using (MessageQueue messageQueue = new MessageQueue(path)) { BinaryMessageFormatter formatter = new BinaryMessageFormatter(); messageQueue.Formatter = formatter; using (Message message = messageQueue.Receive()) { if (message == null) throw new ApplicationException("Peek Message By Id; Message not there"); CallSet callSet = (CallSet)message.Body; return callSet; } } } public static Message Peek(MessageQueue queue) { IAsyncResult aSyncResult = queue.BeginPeek(); aSyncResult.AsyncWaitHandle.WaitOne(); return queue.EndPeek(aSyncResult); } public static void Create(string queueNamespace, string[] queueNames, string receiveUser) { foreach (string queueName in queueNames) Create(queueNamespace, queueName, receiveUser); } public static bool Exists(string queueNamespace, string queueName) { string queuePath = @".\Private$\" + queueNamespace + "." + queueName; return MessageQueue.Exists(queuePath); } public static void Create(string queueNamespace, string queueName, string receiveUser) { string label = @"Private$\" + queueNamespace + "." + queueName; string queuePath = @".\" + label; Console.WriteLine("Creating:\t" + queuePath); using (MessageQueue messageQueue = MessageQueue.Create(queuePath)) { messageQueue.Label = label; messageQueue.SetPermissions(receiveUser, MessageQueueAccessRights.GenericWrite | MessageQueueAccessRights.GenericRead); foreach (string writeUser in new string[] { "Everyone", "ANONYMOUS LOGON" }) messageQueue.SetPermissions(writeUser, MessageQueueAccessRights.GenericWrite); } Console.WriteLine("Created:\t" + queuePath); } public static void AddReceiveUser(string queueNamespace, string queueName, string receiveUser) { string queuePath = @".\Private$\" + queueNamespace + "." + queueName; using (MessageQueue messageQueue = new MessageQueue(queuePath)) { messageQueue.SetPermissions(receiveUser, MessageQueueAccessRights.GenericWrite | MessageQueueAccessRights.GenericRead); } } public static void Delete(string queueName) { Purge(queueName); string queuePath = Path(".", queueName); MessageQueue.Delete(queuePath); } public static void DeleteQueue(string queueNamespace, string queueName) { string queuePath = Path(".", queueName, queueNamespace); MessageQueue.Delete(queuePath); } public static int MessageCount(string queueName) { int count = 0; string messageQueuePath = Path(".", queueName); using (MessageQueue queue = new MessageQueue(messageQueuePath)) { using (Cursor cursor = queue.CreateCursor()) { try { PeekAction peekAction; for (peekAction = PeekAction.Current; queue.Peek(TimeSpan.FromSeconds(0.25), cursor, peekAction) != null; peekAction = PeekAction.Next) // Dispose Message ?? ++count; } catch (MessageQueueException ee) { if (ee.MessageQueueErrorCode != MessageQueueErrorCode.IOTimeout) throw ee; } } } return count; } public static bool LabelExists(string queueName, string label) { string messageQueuePath = Path(".", queueName); using (MessageQueue queue = new MessageQueue(messageQueuePath)) { using (Cursor cursor = queue.CreateCursor()) { try { Message message; PeekAction peekAction; for (peekAction = PeekAction.Current; (message = queue.Peek(TimeSpan.FromSeconds(0.25), cursor, peekAction)) != null; peekAction = PeekAction.Next) { if (message.Label == null || message.Label.Length == 0) continue; if (message.Label == label) return true; } } catch (MessageQueueException ee) { if (ee.MessageQueueErrorCode != MessageQueueErrorCode.IOTimeout) throw ee; } return false; } } } public static DataSet Query(string host, string queueName) { return (DataSet)Spludlow.Call.Now(host, "Spludlow", "Spludlow.MessageQueues", "Query", new object[] { queueName }, CallFlags.LargeResult); } public static DataSet Query(string queueName) { DataTable table = Spludlow.Data.TextTable.ReadText(new string[] { "Id ArrivedTime Host Queue MethodCount MethodIndex Assembly Type Method Parameters Constructor", "String* DateTime String String Int32 Int32 String String String String String", }); string path = Path(".", queueName); using (MessageQueue queue = new MessageQueue(path)) { queue.MessageReadPropertyFilter.ArrivedTime = true; BinaryMessageFormatter formatter = new BinaryMessageFormatter(); queue.Formatter = formatter; foreach (Message message in queue.GetAllMessages()) // Perormance ????? { Spludlow.CallSet callSet; try { callSet = (Spludlow.CallSet)message.Body; } catch (Exception ee) { throw new ApplicationException("Casting Body, " + ee.Message, ee); } CallMethod callMethod = callSet.CurrentMethod; table.Rows.Add(new object[] { message.Id, message.ArrivedTime, Environment.MachineName, queueName, callSet.Calls.Length, callSet.CallIndex, callMethod.Assembly, callMethod.Type, callMethod.Method, Spludlow.Parameters.ShortText(callMethod.Parameters), Spludlow.Parameters.ShortText(callMethod.ConstructorArguments) }); message.Dispose(); // break on maximum!!!!!!!!! } } return Spludlow.Data.ADO.WireDataSet(table); } public static DataSet Query(string host, string queueName, int pageIndex, int pageSize) { return (DataSet)Spludlow.Call.Now(host, "Spludlow", "Spludlow.MessageQueues", "Query", new object[] { queueName, pageIndex, pageSize }, CallFlags.LargeResult); } public static DataSet Query(string queueName, int pageIndex, int pageSize) { DataTable table = Spludlow.Data.TextTable.ReadText(new string[] { "Id ArrivedTime Host Queue MethodCount MethodIndex Assembly Type Method Parameters Constructor", "String* DateTime String String Int32 Int32 String String String String String", }); string path = Path(".", queueName); int startIndex = pageIndex * pageSize; int endIndex = (startIndex + pageSize) - 1; using (MessageQueue queue = new MessageQueue(path)) { queue.MessageReadPropertyFilter.ArrivedTime = true; BinaryMessageFormatter formatter = new BinaryMessageFormatter(); queue.Formatter = formatter; int index = 0; foreach (Message message in queue.GetAllMessages()) { Spludlow.CallSet callSet; if (index >= startIndex && index <= endIndex) { try { callSet = (Spludlow.CallSet)message.Body; } catch (Exception ee) { throw new ApplicationException("Casting Body, " + ee.Message, ee); } CallMethod callMethod = callSet.CurrentMethod; table.Rows.Add(new object[] { message.Id, message.ArrivedTime, Environment.MachineName, queueName, callSet.Calls.Length, callSet.CallIndex, callMethod.Assembly, callMethod.Type, callMethod.Method, Spludlow.Parameters.ShortText(callMethod.Parameters), Spludlow.Parameters.ShortText(callMethod.ConstructorArguments) }); } message.Dispose(); ++index; } } return Spludlow.Data.ADO.WireDataSet(table); } public static void Delete(string host, string queueName, string[] ids) { Spludlow.Call.Now(host, "Spludlow", "Spludlow.MessageQueues", "Delete", new object[] { queueName, ids }, CallFlags.LargeParameters); } public static void Delete(string queueName, string[] ids) { string path = Path(".", queueName); using (MessageQueue queue = new MessageQueue(path)) { foreach (string id in ids) queue.ReceiveById(id, TimeSpan.FromSeconds(0.25)); } } public static void Purge(string host, string queueName) { Spludlow.Call.Now(host, "Spludlow", "Spludlow.MessageQueues", "Purge", new object[] { queueName }); } public static void Purge(string queueName) { string path = Path(".", queueName); using (MessageQueue queue = new MessageQueue(path)) { queue.Purge(); } } } }