2015-11-25 update 1 continue fix bugs in fboxconnector

This commit is contained in:
parallelbgls@outlook.com
2015-11-25 09:54:54 +08:00
parent 6c5048a85c
commit a9fbe17654
2 changed files with 269 additions and 85 deletions

View File

@@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
@@ -42,4 +43,102 @@ namespace ModBus.Net
return task.ContinueWith(t => t.GetAwaiter().GetResult(), token);
}
}
/// <summary>AsyncLock locks across one or several await calls.
///
/// </summary>
public class AsyncLock
{
private readonly AsyncSemaphore _semaphore;
private readonly Task<Releaser> _releaser;
public AsyncLock()
{
_semaphore = new AsyncSemaphore(1);
_releaser = Task.FromResult(new Releaser(this));
}
public Task<Releaser> LockAsync()
{
var wait = _semaphore.WaitAsync();
return wait.IsCompleted
? _releaser
: wait.ContinueWith((_, state) => new Releaser((AsyncLock) state),
this, CancellationToken.None,
TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
}
public struct Releaser : IDisposable
{
private readonly AsyncLock _toRelease;
internal Releaser(AsyncLock toRelease)
{
_toRelease = toRelease;
}
public void Dispose()
{
if (_toRelease != null)
{
_toRelease._semaphore.Release();
}
}
}
}
public class AsyncSemaphore
{
private readonly static Task _completed = Task.FromResult(true);
private readonly Queue<TaskCompletionSource<bool>> _waiters = new Queue<TaskCompletionSource<bool>>();
private int _currentCount;
public AsyncSemaphore(int initialCount)
{
if (initialCount < 0)
{
throw new ArgumentOutOfRangeException("initialCount");
}
_currentCount = initialCount;
}
public Task WaitAsync()
{
lock (_waiters)
{
if (_currentCount > 0)
{
_currentCount--;
return _completed;
}
else
{
var waiter = new TaskCompletionSource<bool>();
_waiters.Enqueue(waiter);
return waiter.Task;
}
}
}
public void Release()
{
TaskCompletionSource<bool> toRelease = null;
lock (_waiters)
{
if (_waiters.Count > 0)
{
toRelease = _waiters.Dequeue();
}
else
{
_currentCount++;
}
}
if (toRelease != null)
{
toRelease.SetResult(true);
}
}
}
}

View File

@@ -31,6 +31,7 @@ namespace ModBus.Net.FBox
private static Dictionary<string, int> _boxUidSessionId;
private static Dictionary<string, List<DMonGroup>> _boxUidDataGroups;
private static Dictionary<string, int> _connectionTokenState;
private static Dictionary<string, string> _groupNameUid;
private static Dictionary<string, string> _groupNameBoxUid;
private static Dictionary<string, Dictionary<string, double>> _machineData;
@@ -38,6 +39,8 @@ namespace ModBus.Net.FBox
public override string ConnectionToken { get; }
private static AsyncLock _lock = new AsyncLock();
private bool _connected;
public override bool IsConnected { get { return _connected; } }
@@ -66,6 +69,7 @@ namespace ModBus.Net.FBox
_machineData = new Dictionary<string, Dictionary<string, double>>();
_machineDataType = new Dictionary<string, Dictionary<string, Type>>();
_boxUidSessionId = new Dictionary<string, int>();
_connectionTokenState = new Dictionary<string, int>();
_groupNameUid = new Dictionary<string, string>();
_groupNameBoxUid = new Dictionary<string, string>();
_boxUidDataGroups = new Dictionary<string, List<DMonGroup>>();
@@ -98,18 +102,25 @@ namespace ModBus.Net.FBox
{
try
{
if (_hubConnections.ContainsKey(_groupNameBoxUid[ConnectionToken]) && _httpClient2.ContainsKey(_groupNameBoxUid[ConnectionToken]) && _groupNameUid.ContainsKey(ConnectionToken))
using (await _lock.LockAsync())
{
await _httpClient2[_groupNameBoxUid[ConnectionToken]].PostAsync("dmon/group/" + _groupNameUid[ConnectionToken] + "/start",
null);
_connected = true;
Console.WriteLine("SignalR Connected success");
return true;
}
else
{
Console.WriteLine("SignalR Connected failed");
return false;
if (_hubConnections.ContainsKey(_groupNameBoxUid[ConnectionToken]) &&
_httpClient2.ContainsKey(_groupNameBoxUid[ConnectionToken]) &&
_groupNameUid.ContainsKey(ConnectionToken))
{
await
_httpClient2[_groupNameBoxUid[ConnectionToken]].PostAsync(
"dmon/group/" + _groupNameUid[ConnectionToken] + "/start",
null);
_connected = true;
Console.WriteLine("SignalR Connected success");
return true;
}
else
{
Console.WriteLine("SignalR Connected failed");
return false;
}
}
}
catch
@@ -167,6 +178,7 @@ namespace ModBus.Net.FBox
List<DMonGroup> dataGroups = JsonConvert.DeserializeObject<List<DMonGroup>>(response);
_boxUidDataGroups.Add(boxUid, dataGroups);
_boxUidSessionId.Add(boxUid, sessionId);
var hubConnection = new HubConnection(signalrUrl);
_hubConnections.Add(boxUid, hubConnection);
@@ -178,47 +190,55 @@ namespace ModBus.Net.FBox
dataHubProxy.On<int, List<GetValue>>("dMonUpdateValue",
(boxSessionId, values) =>
{
if (_boxUidSessionId.ContainsValue(boxSessionId))
lock (_boxUidSessionId)
{
var localBoxUid = _boxUidSessionId.FirstOrDefault(p => p.Value == boxSessionId).Key;
foreach (var value in values)
if (_boxUidSessionId.ContainsValue(boxSessionId))
{
lock (_machineData)
Console.WriteLine($"Box session {boxSessionId} return at {DateTime.Now}");
var localBoxUid = _boxUidSessionId.FirstOrDefault(p => p.Value == boxSessionId).Key;
foreach (var value in values)
{
if (_boxUidDataGroups.ContainsKey(localBoxUid))
if (value.Status != 0) return;
lock (_machineData)
{
foreach (var dataGroupInner in _boxUidDataGroups[localBoxUid])
if (_boxUidDataGroups.ContainsKey(localBoxUid))
{
if (dataGroupInner.DMonEntries.Any(p => p.Uid == value.Id))
foreach (var dataGroupInner in _boxUidDataGroups[localBoxUid])
{
if (!_machineData.ContainsKey(dataGroupInner.Name))
if (dataGroupInner.DMonEntries.Any(p => p.Uid == value.Id))
{
_machineData.Add(dataGroupInner.Name,
new Dictionary<string, double>());
}
if (_machineData[dataGroupInner.Name] == null)
{
_machineData[dataGroupInner.Name] =
new Dictionary<string, double>();
}
var dMonEntry =
dataGroupInner.DMonEntries.FirstOrDefault(p => p.Uid == value.Id);
if (value.Value.HasValue && dMonEntry != null)
{
if (_machineData[dataGroupInner.Name].ContainsKey(dMonEntry.Desc))
if (!_machineData.ContainsKey(dataGroupInner.Name))
{
_machineData[dataGroupInner.Name][dMonEntry.Desc] =
value.Value.Value;
_machineData.Add(dataGroupInner.Name,
new Dictionary<string, double>());
}
else
if (_machineData[dataGroupInner.Name] == null)
{
_machineData[dataGroupInner.Name].Add(dMonEntry.Desc,
value.Value.Value);
_machineData[dataGroupInner.Name] =
new Dictionary<string, double>();
}
var dMonEntry =
dataGroupInner.DMonEntries.FirstOrDefault(
p => p.Uid == value.Id);
if (value.Value.HasValue && dMonEntry != null)
{
if (
_machineData[dataGroupInner.Name].ContainsKey(
dMonEntry.Desc))
{
_machineData[dataGroupInner.Name][dMonEntry.Desc] =
value.Value.Value;
}
else
{
_machineData[dataGroupInner.Name].Add(dMonEntry.Desc,
value.Value.Value);
}
}
break;
}
break;
}
}
}
@@ -230,58 +250,107 @@ namespace ModBus.Net.FBox
dataHubProxy.On<int, string, int, int>("boxConnectionStateChanged",
async (newConnectionToken, getBoxUid, oldStatus, newStatus) =>
{
if (_httpClient2.ContainsKey(getBoxUid))
using (await _lock.LockAsync())
{
sessionId = newConnectionToken;
_boxUidSessionId[getBoxUid] = sessionId;
_httpClient2[getBoxUid].DefaultRequestHeaders.Remove("X-FBox-Session");
_httpClient2[getBoxUid].DefaultRequestHeaders.Add("X-FBox-Session", sessionId.ToString());
_hubConnections[getBoxUid].Headers.Remove("X-FBox-Session");
_hubConnections[getBoxUid].Headers.Add("X-FBox-Session", sessionId.ToString());
_hubConnections[getBoxUid].Stop();
await _hubConnections[getBoxUid].Start();
if (newStatus == 1 && IsConnected)
if (_httpClient2.ContainsKey(getBoxUid))
{
var localDataGroups = _boxUidDataGroups[getBoxUid];
foreach (var localDataGroup in localDataGroups)
Console.WriteLine(
$"Box uid {getBoxUid} change state at {DateTime.Now} new connectionToken {newConnectionToken} newStatus {newStatus}");
sessionId = newConnectionToken;
lock (_boxUidSessionId)
{
await
_httpClient2[getBoxUid].PostAsync(
"dmon/group/" + localDataGroup.Uid + "/stop", null);
await
_httpClient2[getBoxUid].PostAsync(
"dmon/group/" + localDataGroup.Uid + "/start", null);
_boxUidSessionId[getBoxUid] = sessionId;
}
}
else
{
var localDataGroups = _boxUidDataGroups[getBoxUid];
foreach (var localDataGroup in localDataGroups)
_httpClient2[getBoxUid].DefaultRequestHeaders.Remove("X-FBox-Session");
_httpClient2[getBoxUid].DefaultRequestHeaders.Add("X-FBox-Session",
sessionId.ToString());
_hubConnections[getBoxUid].Headers["X-FBox-Session"] = sessionId.ToString();
if (_hubConnections[getBoxUid].State == ConnectionState.Disconnected)
{
if (_machineData.ContainsKey(localDataGroup.Name))
await _hubConnections[getBoxUid].Start();
}
if (newStatus == 1 && IsConnected)
{
var localDataGroups = _boxUidDataGroups[getBoxUid];
foreach (var localDataGroup in localDataGroups)
{
_machineData.Remove(localDataGroup.Name);
await
_httpClient2[getBoxUid].PostAsync(
"dmon/group/" + localDataGroup.Uid + "/start", null);
}
}
else
{
var localDataGroups = _boxUidDataGroups[getBoxUid];
foreach (var localDataGroup in localDataGroups)
{
lock (_connectionTokenState)
{
if (!_connectionTokenState.ContainsKey(localDataGroup.Name))
{
_connectionTokenState.Add(localDataGroup.Name, newStatus);
}
else
{
_connectionTokenState[localDataGroup.Name] = newStatus;
}
}
//lock (_machineData)
//{
//if (_machineData.ContainsKey(localDataGroup.Name))
//{
//_machineData.Remove(localDataGroup.Name);
//}
//await
//_httpClient2[getBoxUid].PostAsync(
//"dmon/group/" + localDataGroup.Uid + "/stop", null);
//}
}
await
_httpClient2[getBoxUid].PostAsync(
"dmon/group/" + localDataGroup.Uid + "/stop", null);
}
}
}
}
);
});
hubConnection.Error += ex => Console.WriteLine(@"SignalR error: {0}", ex.Message);
hubConnection.Closed += async () =>
{
string getBoxUid;
lock (_boxUidSessionId)
{
getBoxUid =
_boxUidSessionId.FirstOrDefault(
p => p.Value == int.Parse(hubConnection.Headers["X-FBox-Session"])).Key;
}
if (hubConnection.State != ConnectionState.Connected)
{
await hubConnection.Start();
if (IsConnected)
{
var localDataGroups = _boxUidDataGroups[getBoxUid];
foreach (var localDataGroup in localDataGroups)
{
await
_httpClient2[getBoxUid].PostAsync(
"dmon/group/" + localDataGroup.Uid + "/start", null);
}
}
}
};
ServicePointManager.DefaultConnectionLimit = 10;
foreach (var dataGroup in dataGroups)
{
if (dataGroup == null) return;
_boxUidSessionId.Add(boxUid, sessionId);
var groupUid = dataGroup.Uid;
var groupName = dataGroup.Name;
_connectionTokenState.Add(groupName, 1);
if (groupName != "(Default)" && !_groupNameUid.ContainsKey(groupName))
{
_groupNameUid.Add(groupName, groupUid);
@@ -409,18 +478,25 @@ namespace ModBus.Net.FBox
{
try
{
if (_hubConnections.ContainsKey(_groupNameBoxUid[ConnectionToken]) && _httpClient2.ContainsKey(_groupNameBoxUid[ConnectionToken]) && _groupNameUid.ContainsKey(ConnectionToken))
using (await _lock.LockAsync())
{
await _httpClient2[_groupNameBoxUid[ConnectionToken]].PostAsync("dmon/group/" + _groupNameUid[ConnectionToken] + "/stop",
null);
_connected = false;
Console.WriteLine("SignalR Disconnect success");
return true;
}
else
{
Console.WriteLine("SignalR Disconnect failed");
return false;
if (_hubConnections.ContainsKey(_groupNameBoxUid[ConnectionToken]) &&
_httpClient2.ContainsKey(_groupNameBoxUid[ConnectionToken]) &&
_groupNameUid.ContainsKey(ConnectionToken))
{
await
_httpClient2[_groupNameBoxUid[ConnectionToken]].PostAsync(
"dmon/group/" + _groupNameUid[ConnectionToken] + "/stop",
null);
_connected = false;
Console.WriteLine("SignalR Disconnect success");
return true;
}
else
{
Console.WriteLine("SignalR Disconnect failed");
return false;
}
}
}
catch
@@ -447,11 +523,20 @@ namespace ModBus.Net.FBox
byte[] ans;
lock (_connectionTokenState)
{
if (_connectionTokenState.ContainsKey(ConnectionToken) && _connectionTokenState[ConnectionToken] != 1)
{
Console.WriteLine($"Return Value Rejected with connectionToken {ConnectionToken}");
return null;
}
}
lock (_machineData)
{
if (!_machineData.ContainsKey(ConnectionToken) || !_machineDataType.ContainsKey(ConnectionToken))
{
//Console.WriteLine("Return Value Rejected");
Console.WriteLine($"Return Value Rejected with connectionToken {ConnectionToken}");
return null;
}
var machineDataValue = _machineData[ConnectionToken];