// Spludlow Software // Copyright © Samuel P. Ludlow 2020 All Rights Reserved // Distributed under the terms of the GNU General Public License version 3 // Distributed WITHOUT ANY WARRANTY; without implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE // https://www.spludlow.co.uk/LICENCE.TXT // The Spludlow logo is a registered trademark of Samuel P. Ludlow and may not be used without permission // v1.14 using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using System.ServiceModel; using System.Data; namespace Spludlow { public class ParallelResults : IParallelResults { private object Lock = new object(); Dictionary ResultCalls; private DataSet DataSet; public ParallelResults() { this.ResultCalls = new Dictionary(); DataTable table = Spludlow.Data.TextTable.ReadText(new string[] { "MrKey CallIndex StartTime EndTime Result", "String* Int32* DateTime DateTime String[]", }); this.DataSet = Spludlow.Data.ADO.WireDataSet(table); Spludlow.Log.Info("ParallelResults; Created"); } public static IParallelResults MakeClient() { string address = Spludlow.Config.RemotingAddress("ParallelResults"); NetTcpBinding binding = new NetTcpBinding(SecurityMode.None); EndpointAddress endpointAddress = new EndpointAddress(address); ChannelFactory channelFactory = new ChannelFactory(binding, endpointAddress); Spludlow.IParallelResults channel = channelFactory.CreateChannel(); return channel; } public static string New(string description, Spludlow.CallSet[] calls, Spludlow.CallSet resultCall) // top level call { string resultHost = resultCall.Calls[0].Host; if (resultCall.Calls[0].PrevResultParamIndex == -1) throw new ApplicationException("The ResultCall's first method must have PrevResultParamIndex set to recieve the array of results."); return (string)Spludlow.Call.Now(resultHost, "Spludlow", "Spludlow.ParallelResults", "NewWork", new object[] { description, calls, resultCall }); } public static string NewWork(string description, Spludlow.CallSet[] calls, Spludlow.CallSet resultCall) { IParallelResults client = MakeClient(); try { return client.NewBatch(description, calls, resultCall); } finally { ServiceModels.Close(client); } } public static void Result(string[] result, string mrKey, int mrIndex) // called from Call.cs to queue on specided address, so already on result host { IParallelResults client = MakeClient(); try { client.SubmitResult(result, mrKey, mrIndex); } finally { ServiceModels.Close(client); } } public string NewBatch(string description, Spludlow.CallSet[] calls, Spludlow.CallSet resultCall) { lock (this.Lock) { string mrKey = Guid.NewGuid().ToString(); DateTime startTime = DateTime.Now; this.ResultCalls.Add(mrKey, resultCall); for (int index = 0; index < calls.Length; ++index) { Spludlow.CallSet callSet = calls[index]; callSet.LastMethod.ManagedResultsAddress = resultCall.LastMethod.Address; callSet.LastMethod.ManagedResultsKey = mrKey; callSet.LastMethod.ManagedResultsIndex = index; DataTable table = this.DataSet.Tables[0]; table.Rows.Add(new object[] { mrKey, index, startTime, DBNull.Value, DBNull.Value }); } Spludlow.Log.Info("ParallelResults; New Batch:\t" + calls + "\t" + mrKey); foreach (Spludlow.CallSet callSet in calls) Spludlow.Call.Run(callSet); Spludlow.Log.Info("ParallelResults; Submitted:\t" + mrKey); return mrKey; } } public void SubmitResult(string[] result, string mrKey, int mrIndex) { lock (this.Lock) { DataTable table = this.DataSet.Tables[0]; DataRow row = table.Rows.Find(new object[] { mrKey, mrIndex }); if (row == null) throw new ApplicationException("ParallelResults; row not found:\t" + mrKey + "\t" + mrIndex); row["EndTime"] = DateTime.Now; row["Result"] = result; Spludlow.Log.Info("ParallelResults; SubmitResult:\t" + mrKey + "\t" + mrIndex); DataRow[] rows = table.Select("(MrKey = '" + mrKey + "') AND (EndTime IS NULL)"); if (rows.Length == 0) { rows = table.Select("(MrKey = '" + mrKey + "')"); this.MakeResultsCall(rows, mrKey); // call outside lock delete from state obects first?? error handling? for (int index = 0; index < rows.Length; ++index) rows[index].Delete(); table.AcceptChanges(); this.ResultCalls.Remove(mrKey); Spludlow.Log.Info("ParallelResults; Removed:\t" + mrKey); } } } private void MakeResultsCall(DataRow[] rows, string mrKey) { List encodedResultPairs = new List(); foreach (DataRow row in rows) encodedResultPairs.Add((string[])row["Result"]); string[] encodedResults = Spludlow.Parameters.EncodeResultsArray(encodedResultPairs.ToArray(), false, false); Spludlow.CallSet resultCall = this.ResultCalls[mrKey]; Spludlow.CallMethod method = resultCall.CurrentMethod; if (method.Flags.HasFlag(CallFlags.ConstructorIndex) == false) method.Parameters[method.PrevResultParamIndex] = encodedResults; else method.ConstructorArguments[method.PrevResultParamIndex] = encodedResults; Spludlow.Log.Info("ParallelResults; Result:\t" + mrKey); Spludlow.Call.Run(resultCall); } } }