From a9fbe1765469a27280ea093f7dcdd54898d08314 Mon Sep 17 00:00:00 2001 From: "parallelbgls@outlook.com" Date: Wed, 25 Nov 2015 09:54:54 +0800 Subject: [PATCH] 2015-11-25 update 1 continue fix bugs in fboxconnector --- Modbus.Net/ModBus.Net/AsyncHelper.cs | 99 +++++++ .../ModBus.Net/FBox/SignalRConnector.cs | 255 ++++++++++++------ 2 files changed, 269 insertions(+), 85 deletions(-) diff --git a/Modbus.Net/ModBus.Net/AsyncHelper.cs b/Modbus.Net/ModBus.Net/AsyncHelper.cs index 4235428..e08cb07 100644 --- a/Modbus.Net/ModBus.Net/AsyncHelper.cs +++ b/Modbus.Net/ModBus.Net/AsyncHelper.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; @@ -42,4 +43,102 @@ namespace ModBus.Net return task.ContinueWith(t => t.GetAwaiter().GetResult(), token); } } + + /// AsyncLock locks across one or several await calls. + /// + /// + public class AsyncLock + { + private readonly AsyncSemaphore _semaphore; + private readonly Task _releaser; + + public AsyncLock() + { + _semaphore = new AsyncSemaphore(1); + _releaser = Task.FromResult(new Releaser(this)); + } + + public Task LockAsync() + { + var wait = _semaphore.WaitAsync(); + return wait.IsCompleted + ? _releaser + : wait.ContinueWith((_, state) => new Releaser((AsyncLock) state), + this, CancellationToken.None, + TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); + } + + + public struct Releaser : IDisposable + { + private readonly AsyncLock _toRelease; + + internal Releaser(AsyncLock toRelease) + { + _toRelease = toRelease; + } + + public void Dispose() + { + if (_toRelease != null) + { + _toRelease._semaphore.Release(); + } + } + } + } + + public class AsyncSemaphore + { + private readonly static Task _completed = Task.FromResult(true); + private readonly Queue> _waiters = new Queue>(); + private int _currentCount; + + public AsyncSemaphore(int initialCount) + { + if (initialCount < 0) + { + throw new ArgumentOutOfRangeException("initialCount"); + } + _currentCount = initialCount; + } + + public Task WaitAsync() + { + lock (_waiters) + { + if (_currentCount > 0) + { + _currentCount--; + return _completed; + } + else + { + var waiter = new TaskCompletionSource(); + _waiters.Enqueue(waiter); + return waiter.Task; + } + } + } + + public void Release() + { + TaskCompletionSource toRelease = null; + lock (_waiters) + { + if (_waiters.Count > 0) + { + toRelease = _waiters.Dequeue(); + } + else + { + _currentCount++; + } + } + if (toRelease != null) + { + toRelease.SetResult(true); + } + } + } } diff --git a/Modbus.Net/ModBus.Net/FBox/SignalRConnector.cs b/Modbus.Net/ModBus.Net/FBox/SignalRConnector.cs index 13aa9d5..d5b9ee7 100644 --- a/Modbus.Net/ModBus.Net/FBox/SignalRConnector.cs +++ b/Modbus.Net/ModBus.Net/FBox/SignalRConnector.cs @@ -30,7 +30,8 @@ namespace ModBus.Net.FBox private static Dictionary _hubConnections; private static Dictionary _boxUidSessionId; - private static Dictionary> _boxUidDataGroups; + private static Dictionary> _boxUidDataGroups; + private static Dictionary _connectionTokenState; private static Dictionary _groupNameUid; private static Dictionary _groupNameBoxUid; private static Dictionary> _machineData; @@ -38,6 +39,8 @@ namespace ModBus.Net.FBox public override string ConnectionToken { get; } + private static AsyncLock _lock = new AsyncLock(); + private bool _connected; public override bool IsConnected { get { return _connected; } } @@ -66,6 +69,7 @@ namespace ModBus.Net.FBox _machineData = new Dictionary>(); _machineDataType = new Dictionary>(); _boxUidSessionId = new Dictionary(); + _connectionTokenState = new Dictionary(); _groupNameUid = new Dictionary(); _groupNameBoxUid = new Dictionary(); _boxUidDataGroups = new Dictionary>(); @@ -98,18 +102,25 @@ namespace ModBus.Net.FBox { try { - if (_hubConnections.ContainsKey(_groupNameBoxUid[ConnectionToken]) && _httpClient2.ContainsKey(_groupNameBoxUid[ConnectionToken]) && _groupNameUid.ContainsKey(ConnectionToken)) + using (await _lock.LockAsync()) { - await _httpClient2[_groupNameBoxUid[ConnectionToken]].PostAsync("dmon/group/" + _groupNameUid[ConnectionToken] + "/start", - null); - _connected = true; - Console.WriteLine("SignalR Connected success"); - return true; - } - else - { - Console.WriteLine("SignalR Connected failed"); - return false; + if (_hubConnections.ContainsKey(_groupNameBoxUid[ConnectionToken]) && + _httpClient2.ContainsKey(_groupNameBoxUid[ConnectionToken]) && + _groupNameUid.ContainsKey(ConnectionToken)) + { + await + _httpClient2[_groupNameBoxUid[ConnectionToken]].PostAsync( + "dmon/group/" + _groupNameUid[ConnectionToken] + "/start", + null); + _connected = true; + Console.WriteLine("SignalR Connected success"); + return true; + } + else + { + Console.WriteLine("SignalR Connected failed"); + return false; + } } } catch @@ -167,6 +178,7 @@ namespace ModBus.Net.FBox List dataGroups = JsonConvert.DeserializeObject>(response); _boxUidDataGroups.Add(boxUid, dataGroups); + _boxUidSessionId.Add(boxUid, sessionId); var hubConnection = new HubConnection(signalrUrl); _hubConnections.Add(boxUid, hubConnection); @@ -178,47 +190,55 @@ namespace ModBus.Net.FBox dataHubProxy.On>("dMonUpdateValue", (boxSessionId, values) => { - if (_boxUidSessionId.ContainsValue(boxSessionId)) + lock (_boxUidSessionId) { - var localBoxUid = _boxUidSessionId.FirstOrDefault(p => p.Value == boxSessionId).Key; - foreach (var value in values) + if (_boxUidSessionId.ContainsValue(boxSessionId)) { - lock (_machineData) + Console.WriteLine($"Box session {boxSessionId} return at {DateTime.Now}"); + var localBoxUid = _boxUidSessionId.FirstOrDefault(p => p.Value == boxSessionId).Key; + foreach (var value in values) { - if (_boxUidDataGroups.ContainsKey(localBoxUid)) + if (value.Status != 0) return; + lock (_machineData) { - foreach (var dataGroupInner in _boxUidDataGroups[localBoxUid]) + if (_boxUidDataGroups.ContainsKey(localBoxUid)) { - if (dataGroupInner.DMonEntries.Any(p => p.Uid == value.Id)) + foreach (var dataGroupInner in _boxUidDataGroups[localBoxUid]) { - if (!_machineData.ContainsKey(dataGroupInner.Name)) + if (dataGroupInner.DMonEntries.Any(p => p.Uid == value.Id)) { - _machineData.Add(dataGroupInner.Name, - new Dictionary()); - } - if (_machineData[dataGroupInner.Name] == null) - { - _machineData[dataGroupInner.Name] = - new Dictionary(); - } - - var dMonEntry = - dataGroupInner.DMonEntries.FirstOrDefault(p => p.Uid == value.Id); - - if (value.Value.HasValue && dMonEntry != null) - { - if (_machineData[dataGroupInner.Name].ContainsKey(dMonEntry.Desc)) + if (!_machineData.ContainsKey(dataGroupInner.Name)) { - _machineData[dataGroupInner.Name][dMonEntry.Desc] = - value.Value.Value; + _machineData.Add(dataGroupInner.Name, + new Dictionary()); } - else + if (_machineData[dataGroupInner.Name] == null) { - _machineData[dataGroupInner.Name].Add(dMonEntry.Desc, - value.Value.Value); + _machineData[dataGroupInner.Name] = + new Dictionary(); } + + var dMonEntry = + dataGroupInner.DMonEntries.FirstOrDefault( + p => p.Uid == value.Id); + + if (value.Value.HasValue && dMonEntry != null) + { + if ( + _machineData[dataGroupInner.Name].ContainsKey( + dMonEntry.Desc)) + { + _machineData[dataGroupInner.Name][dMonEntry.Desc] = + value.Value.Value; + } + else + { + _machineData[dataGroupInner.Name].Add(dMonEntry.Desc, + value.Value.Value); + } + } + break; } - break; } } } @@ -230,58 +250,107 @@ namespace ModBus.Net.FBox dataHubProxy.On("boxConnectionStateChanged", async (newConnectionToken, getBoxUid, oldStatus, newStatus) => { - if (_httpClient2.ContainsKey(getBoxUid)) + using (await _lock.LockAsync()) { - sessionId = newConnectionToken; - _boxUidSessionId[getBoxUid] = sessionId; - _httpClient2[getBoxUid].DefaultRequestHeaders.Remove("X-FBox-Session"); - _httpClient2[getBoxUid].DefaultRequestHeaders.Add("X-FBox-Session", sessionId.ToString()); - _hubConnections[getBoxUid].Headers.Remove("X-FBox-Session"); - _hubConnections[getBoxUid].Headers.Add("X-FBox-Session", sessionId.ToString()); - _hubConnections[getBoxUid].Stop(); - await _hubConnections[getBoxUid].Start(); - - if (newStatus == 1 && IsConnected) + if (_httpClient2.ContainsKey(getBoxUid)) { - var localDataGroups = _boxUidDataGroups[getBoxUid]; - foreach (var localDataGroup in localDataGroups) + Console.WriteLine( + $"Box uid {getBoxUid} change state at {DateTime.Now} new connectionToken {newConnectionToken} newStatus {newStatus}"); + sessionId = newConnectionToken; + lock (_boxUidSessionId) { - await - _httpClient2[getBoxUid].PostAsync( - "dmon/group/" + localDataGroup.Uid + "/stop", null); - await - _httpClient2[getBoxUid].PostAsync( - "dmon/group/" + localDataGroup.Uid + "/start", null); + _boxUidSessionId[getBoxUid] = sessionId; } - } - else - { - var localDataGroups = _boxUidDataGroups[getBoxUid]; - foreach (var localDataGroup in localDataGroups) + + _httpClient2[getBoxUid].DefaultRequestHeaders.Remove("X-FBox-Session"); + _httpClient2[getBoxUid].DefaultRequestHeaders.Add("X-FBox-Session", + sessionId.ToString()); + _hubConnections[getBoxUid].Headers["X-FBox-Session"] = sessionId.ToString(); + + if (_hubConnections[getBoxUid].State == ConnectionState.Disconnected) { - if (_machineData.ContainsKey(localDataGroup.Name)) + await _hubConnections[getBoxUid].Start(); + } + + if (newStatus == 1 && IsConnected) + { + var localDataGroups = _boxUidDataGroups[getBoxUid]; + foreach (var localDataGroup in localDataGroups) { - _machineData.Remove(localDataGroup.Name); + await + _httpClient2[getBoxUid].PostAsync( + "dmon/group/" + localDataGroup.Uid + "/start", null); + } + } + else + { + var localDataGroups = _boxUidDataGroups[getBoxUid]; + foreach (var localDataGroup in localDataGroups) + { + lock (_connectionTokenState) + { + if (!_connectionTokenState.ContainsKey(localDataGroup.Name)) + { + _connectionTokenState.Add(localDataGroup.Name, newStatus); + } + else + { + _connectionTokenState[localDataGroup.Name] = newStatus; + } + } + //lock (_machineData) + //{ + //if (_machineData.ContainsKey(localDataGroup.Name)) + //{ + //_machineData.Remove(localDataGroup.Name); + //} + //await + //_httpClient2[getBoxUid].PostAsync( + //"dmon/group/" + localDataGroup.Uid + "/stop", null); + //} } - await - _httpClient2[getBoxUid].PostAsync( - "dmon/group/" + localDataGroup.Uid + "/stop", null); } } } - } - ); + }); hubConnection.Error += ex => Console.WriteLine(@"SignalR error: {0}", ex.Message); + + hubConnection.Closed += async () => + { + string getBoxUid; + lock (_boxUidSessionId) + { + getBoxUid = + _boxUidSessionId.FirstOrDefault( + p => p.Value == int.Parse(hubConnection.Headers["X-FBox-Session"])).Key; + } + if (hubConnection.State != ConnectionState.Connected) + { + await hubConnection.Start(); + if (IsConnected) + { + var localDataGroups = _boxUidDataGroups[getBoxUid]; + foreach (var localDataGroup in localDataGroups) + { + await + _httpClient2[getBoxUid].PostAsync( + "dmon/group/" + localDataGroup.Uid + "/start", null); + } + } + } + }; + ServicePointManager.DefaultConnectionLimit = 10; foreach (var dataGroup in dataGroups) { if (dataGroup == null) return; - _boxUidSessionId.Add(boxUid, sessionId); + var groupUid = dataGroup.Uid; var groupName = dataGroup.Name; + _connectionTokenState.Add(groupName, 1); if (groupName != "(Default)" && !_groupNameUid.ContainsKey(groupName)) { _groupNameUid.Add(groupName, groupUid); @@ -409,18 +478,25 @@ namespace ModBus.Net.FBox { try { - if (_hubConnections.ContainsKey(_groupNameBoxUid[ConnectionToken]) && _httpClient2.ContainsKey(_groupNameBoxUid[ConnectionToken]) && _groupNameUid.ContainsKey(ConnectionToken)) + using (await _lock.LockAsync()) { - await _httpClient2[_groupNameBoxUid[ConnectionToken]].PostAsync("dmon/group/" + _groupNameUid[ConnectionToken] + "/stop", - null); - _connected = false; - Console.WriteLine("SignalR Disconnect success"); - return true; - } - else - { - Console.WriteLine("SignalR Disconnect failed"); - return false; + if (_hubConnections.ContainsKey(_groupNameBoxUid[ConnectionToken]) && + _httpClient2.ContainsKey(_groupNameBoxUid[ConnectionToken]) && + _groupNameUid.ContainsKey(ConnectionToken)) + { + await + _httpClient2[_groupNameBoxUid[ConnectionToken]].PostAsync( + "dmon/group/" + _groupNameUid[ConnectionToken] + "/stop", + null); + _connected = false; + Console.WriteLine("SignalR Disconnect success"); + return true; + } + else + { + Console.WriteLine("SignalR Disconnect failed"); + return false; + } } } catch @@ -447,11 +523,20 @@ namespace ModBus.Net.FBox byte[] ans; + lock (_connectionTokenState) + { + if (_connectionTokenState.ContainsKey(ConnectionToken) && _connectionTokenState[ConnectionToken] != 1) + { + Console.WriteLine($"Return Value Rejected with connectionToken {ConnectionToken}"); + return null; + } + } + lock (_machineData) { if (!_machineData.ContainsKey(ConnectionToken) || !_machineDataType.ContainsKey(ConnectionToken)) { - //Console.WriteLine("Return Value Rejected"); + Console.WriteLine($"Return Value Rejected with connectionToken {ConnectionToken}"); return null; } var machineDataValue = _machineData[ConnectionToken];