diff --git a/Modbus.Net/Modbus.Net.Modbus.NA200H/Modbus.Net.Modbus.NA200H.csproj b/Modbus.Net/Modbus.Net.Modbus.NA200H/Modbus.Net.Modbus.NA200H.csproj
index 489b6c8..2ffb238 100644
--- a/Modbus.Net/Modbus.Net.Modbus.NA200H/Modbus.Net.Modbus.NA200H.csproj
+++ b/Modbus.Net/Modbus.Net.Modbus.NA200H/Modbus.Net.Modbus.NA200H.csproj
@@ -5,7 +5,7 @@
Modbus.Net.Modbus.NA200H
Modbus.Net.Modbus.NA200H
Modbus.Net.Modbus.NA200H
- 1.4.0-beta04
+ 1.4.1-beta05
Chris L.(Luo Sheng)
Hangzhou Delian Science Technology Co.,Ltd.
Modbus.Net.Modbus
diff --git a/Modbus.Net/Modbus.Net.Modbus/Modbus.Net.Modbus.csproj b/Modbus.Net/Modbus.Net.Modbus/Modbus.Net.Modbus.csproj
index e81085e..8f7111d 100644
--- a/Modbus.Net/Modbus.Net.Modbus/Modbus.Net.Modbus.csproj
+++ b/Modbus.Net/Modbus.Net.Modbus/Modbus.Net.Modbus.csproj
@@ -5,7 +5,7 @@
Modbus.Net.Modbus
Modbus.Net.Modbus
Modbus.Net.Modbus
- 1.4.0-beta04
+ 1.4.1-beta05
Chris L.(Luo Sheng)
Hangzhou Delian Science Technology Co.,Ltd.
Modbus.Net.Modbus
diff --git a/Modbus.Net/Modbus.Net.Modbus/ModbusAsciiInTcpProtocolLinker.cs b/Modbus.Net/Modbus.Net.Modbus/ModbusAsciiInTcpProtocolLinker.cs
index 2b4083c..ad60e7f 100644
--- a/Modbus.Net/Modbus.Net.Modbus/ModbusAsciiInTcpProtocolLinker.cs
+++ b/Modbus.Net/Modbus.Net.Modbus/ModbusAsciiInTcpProtocolLinker.cs
@@ -24,7 +24,7 @@ namespace Modbus.Net.Modbus
public ModbusAsciiInTcpProtocolLinker(string ip, int port)
: base(ip, port)
{
- ((BaseConnector)BaseConnector).AddController(new FifoController(int.Parse(ConfigurationReader.GetValue("TCP:" + ip + ":" + port, "FetchSleepTime")), lengthCalc: content => { if (content[0] != 0x3a) return 0; for (int i = 1; i < content.Length; i++) { if (content[i - 1] == 0x0D && content[i] == 0x0A) return i + 1; } return -1; }, waitingListMaxCount: ConfigurationReader.GetValue("TCP:" + ip + ":" + port, "WaitingListCount") != null ? int.Parse(ConfigurationReader.GetValue("TCP:" + ip + ":" + port, "WaitingListCount")) : null));
+ ((EventHandlerConnector)BaseConnector).AddController(new FifoController(int.Parse(ConfigurationReader.GetValue("TCP:" + ip + ":" + port, "FetchSleepTime")), lengthCalc: content => { if (content[0] != 0x3a) return 0; for (int i = 1; i < content.Length; i++) { if (content[i - 1] == 0x0D && content[i] == 0x0A) return i + 1; } return -1; }, waitingListMaxCount: ConfigurationReader.GetValue("TCP:" + ip + ":" + port, "WaitingListCount") != null ? int.Parse(ConfigurationReader.GetValue("TCP:" + ip + ":" + port, "WaitingListCount")) : null));
}
///
diff --git a/Modbus.Net/Modbus.Net.Modbus/ModbusAsciiInUdpProtocolLinker.cs b/Modbus.Net/Modbus.Net.Modbus/ModbusAsciiInUdpProtocolLinker.cs
index 63a336b..f8217ba 100644
--- a/Modbus.Net/Modbus.Net.Modbus/ModbusAsciiInUdpProtocolLinker.cs
+++ b/Modbus.Net/Modbus.Net.Modbus/ModbusAsciiInUdpProtocolLinker.cs
@@ -24,7 +24,7 @@ namespace Modbus.Net.Modbus
public ModbusAsciiInUdpProtocolLinker(string ip, int port)
: base(ip, port)
{
- ((BaseConnector)BaseConnector).AddController(new FifoController(int.Parse(ConfigurationReader.GetValue("UDP:" + ip + ":" + port, "FetchSleepTime")), lengthCalc: content => { if (content[0] != 0x3a) return 0; for (int i = 1; i < content.Length; i++) { if (content[i - 1] == 0x0D && content[i] == 0x0A) return i + 1; } return -1; }, waitingListMaxCount: ConfigurationReader.GetValue("UDP:" + ip + ":" + port, "WaitingListCount") != null ? int.Parse(ConfigurationReader.GetValue("UDP:" + ip + ":" + port, "WaitingListCount")) : null));
+ ((EventHandlerConnector)BaseConnector).AddController(new FifoController(int.Parse(ConfigurationReader.GetValue("UDP:" + ip + ":" + port, "FetchSleepTime")), lengthCalc: content => { if (content[0] != 0x3a) return 0; for (int i = 1; i < content.Length; i++) { if (content[i - 1] == 0x0D && content[i] == 0x0A) return i + 1; } return -1; }, waitingListMaxCount: ConfigurationReader.GetValue("UDP:" + ip + ":" + port, "WaitingListCount") != null ? int.Parse(ConfigurationReader.GetValue("UDP:" + ip + ":" + port, "WaitingListCount")) : null));
}
///
diff --git a/Modbus.Net/Modbus.Net.Modbus/ModbusRtuInTcpProtocolLinker.cs b/Modbus.Net/Modbus.Net.Modbus/ModbusRtuInTcpProtocolLinker.cs
index 68e3709..76685d9 100644
--- a/Modbus.Net/Modbus.Net.Modbus/ModbusRtuInTcpProtocolLinker.cs
+++ b/Modbus.Net/Modbus.Net.Modbus/ModbusRtuInTcpProtocolLinker.cs
@@ -24,7 +24,7 @@ namespace Modbus.Net.Modbus
public ModbusRtuInTcpProtocolLinker(string ip, int port)
: base(ip, port)
{
- ((BaseConnector)BaseConnector).AddController(new FifoController(int.Parse(ConfigurationReader.GetValue("TCP:" + ip + ":" + port, "FetchSleepTime")), lengthCalc: content => { if (content[1] == 5 || content[1] == 6 || content[1] == 15 || content[1] == 16 || content[1] == 21) return 8; else return DuplicateWithCount.GetDuplcateFunc(new List { 2 }, 5).Invoke(content); }, waitingListMaxCount: ConfigurationReader.GetValue("TCP:" + ip + ":" + port, "WaitingListCount") != null ? int.Parse(ConfigurationReader.GetValue("TCP:" + ip + ":" + port, "WaitingListCount")) : null));
+ ((EventHandlerConnector)BaseConnector).AddController(new FifoController(int.Parse(ConfigurationReader.GetValue("TCP:" + ip + ":" + port, "FetchSleepTime")), lengthCalc: content => { if (content[1] == 5 || content[1] == 6 || content[1] == 15 || content[1] == 16 || content[1] == 21) return 8; else return DuplicateWithCount.GetDuplcateFunc(new List { 2 }, 5).Invoke(content); }, waitingListMaxCount: ConfigurationReader.GetValue("TCP:" + ip + ":" + port, "WaitingListCount") != null ? int.Parse(ConfigurationReader.GetValue("TCP:" + ip + ":" + port, "WaitingListCount")) : null));
}
///
diff --git a/Modbus.Net/Modbus.Net.Modbus/ModbusRtuInUdpProtocolLinker.cs b/Modbus.Net/Modbus.Net.Modbus/ModbusRtuInUdpProtocolLinker.cs
index 58bba98..3694360 100644
--- a/Modbus.Net/Modbus.Net.Modbus/ModbusRtuInUdpProtocolLinker.cs
+++ b/Modbus.Net/Modbus.Net.Modbus/ModbusRtuInUdpProtocolLinker.cs
@@ -24,7 +24,7 @@ namespace Modbus.Net.Modbus
public ModbusRtuInUdpProtocolLinker(string ip, int port)
: base(ip, port)
{
- ((BaseConnector)BaseConnector).AddController(new FifoController(int.Parse(ConfigurationReader.GetValue("UDP:" + ip + ":" + port, "FetchSleepTime")), lengthCalc: content => { if (content[1] == 5 || content[1] == 6 || content[1] == 15 || content[1] == 16 || content[1] == 21) return 8; else return DuplicateWithCount.GetDuplcateFunc(new List { 2 }, 5).Invoke(content); }, waitingListMaxCount: ConfigurationReader.GetValue("UDP:" + ip + ":" + port, "WaitingListCount") != null ? int.Parse(ConfigurationReader.GetValue("UDP:" + ip + ":" + port, "WaitingListCount")) : null));
+ ((EventHandlerConnector)BaseConnector).AddController(new FifoController(int.Parse(ConfigurationReader.GetValue("UDP:" + ip + ":" + port, "FetchSleepTime")), lengthCalc: content => { if (content[1] == 5 || content[1] == 6 || content[1] == 15 || content[1] == 16 || content[1] == 21) return 8; else return DuplicateWithCount.GetDuplcateFunc(new List { 2 }, 5).Invoke(content); }, waitingListMaxCount: ConfigurationReader.GetValue("UDP:" + ip + ":" + port, "WaitingListCount") != null ? int.Parse(ConfigurationReader.GetValue("UDP:" + ip + ":" + port, "WaitingListCount")) : null));
}
///
diff --git a/Modbus.Net/Modbus.Net.Modbus/ModbusTcpProtocolLinker.cs b/Modbus.Net/Modbus.Net.Modbus/ModbusTcpProtocolLinker.cs
index 72c408e..bae4855 100644
--- a/Modbus.Net/Modbus.Net.Modbus/ModbusTcpProtocolLinker.cs
+++ b/Modbus.Net/Modbus.Net.Modbus/ModbusTcpProtocolLinker.cs
@@ -23,7 +23,7 @@ namespace Modbus.Net.Modbus
/// 端口
public ModbusTcpProtocolLinker(string ip, int port) : base(ip, port)
{
- ((BaseConnector)BaseConnector).AddController(new FifoController(int.Parse(ConfigurationReader.GetValue("TCP:" + ip + ":" + port, "FetchSleepTime")), lengthCalc: DuplicateWithCount.GetDuplcateFunc(new List { 4, 5 }, 6), waitingListMaxCount: ConfigurationReader.GetValue("TCP:" + ip + ":" + port, "WaitingListCount") != null ? int.Parse(ConfigurationReader.GetValue("TCP:" + ip + ":" + port, "WaitingListCount")) : null));
+ ((EventHandlerConnector)BaseConnector).AddController(new FifoController(int.Parse(ConfigurationReader.GetValue("TCP:" + ip + ":" + port, "FetchSleepTime")), lengthCalc: DuplicateWithCount.GetDuplcateFunc(new List { 4, 5 }, 6), waitingListMaxCount: ConfigurationReader.GetValue("TCP:" + ip + ":" + port, "WaitingListCount") != null ? int.Parse(ConfigurationReader.GetValue("TCP:" + ip + ":" + port, "WaitingListCount")) : null));
}
///
diff --git a/Modbus.Net/Modbus.Net.Modbus/ModbusUdpProtocolLinker.cs b/Modbus.Net/Modbus.Net.Modbus/ModbusUdpProtocolLinker.cs
index 8d19711..7e4901a 100644
--- a/Modbus.Net/Modbus.Net.Modbus/ModbusUdpProtocolLinker.cs
+++ b/Modbus.Net/Modbus.Net.Modbus/ModbusUdpProtocolLinker.cs
@@ -12,7 +12,7 @@ namespace Modbus.Net.Modbus
///
/// IP地址
public ModbusUdpProtocolLinker(string ip)
- : this(ip, int.Parse(ConfigurationReader.GetValue("UDP:" + ip, "ModbusPort")))
+ : this(ip, int.Parse(ConfigurationReader.GetValueDirect("UDP:" + ip, "ModbusPort") ?? ConfigurationReader.GetValueDirect("UDP:Modbus", "ModbusPort")))
{
}
@@ -23,7 +23,7 @@ namespace Modbus.Net.Modbus
/// 端口
public ModbusUdpProtocolLinker(string ip, int port) : base(ip, port)
{
- ((BaseConnector)BaseConnector).AddController(new FifoController(int.Parse(ConfigurationReader.GetValue("UDP:" + ip + ":" + port, "FetchSleepTime")), lengthCalc: DuplicateWithCount.GetDuplcateFunc(new List { 4, 5 }, 6), waitingListMaxCount: ConfigurationReader.GetValue("UDP:" + ip + ":" + port, "WaitingListCount") != null ? int.Parse(ConfigurationReader.GetValue("UDP:" + ip + ":" + port, "WaitingListCount")) : null));
+ ((EventHandlerConnector)BaseConnector).AddController(new FifoController(int.Parse(ConfigurationReader.GetValue("UDP:" + ip + ":" + port, "FetchSleepTime")), lengthCalc: DuplicateWithCount.GetDuplcateFunc(new List { 4, 5 }, 6), waitingListMaxCount: ConfigurationReader.GetValue("UDP:" + ip + ":" + port, "WaitingListCount") != null ? int.Parse(ConfigurationReader.GetValue("UDP:" + ip + ":" + port, "WaitingListCount")) : null));
}
///
diff --git a/Modbus.Net/Modbus.Net.OPC/Modbus.Net.OPC.csproj b/Modbus.Net/Modbus.Net.OPC/Modbus.Net.OPC.csproj
index b83affa..b901e52 100644
--- a/Modbus.Net/Modbus.Net.OPC/Modbus.Net.OPC.csproj
+++ b/Modbus.Net/Modbus.Net.OPC/Modbus.Net.OPC.csproj
@@ -5,7 +5,7 @@
Modbus.Net.OPC
Modbus.Net.OPC
Modbus.Net.OPC
- 1.4.0-beta04
+ 1.4.1-beta05
Chris L.(Luo Sheng)
Hangzhou Delian Science Technology Co.,Ltd.
Modbus.Net.OPC
diff --git a/Modbus.Net/Modbus.Net.Siemens/Modbus.Net.Siemens.csproj b/Modbus.Net/Modbus.Net.Siemens/Modbus.Net.Siemens.csproj
index 7d8721f..9e7d830 100644
--- a/Modbus.Net/Modbus.Net.Siemens/Modbus.Net.Siemens.csproj
+++ b/Modbus.Net/Modbus.Net.Siemens/Modbus.Net.Siemens.csproj
@@ -5,7 +5,7 @@
Modbus.Net.Siemens
Modbus.Net.Siemens
Modbus.Net.Siemens
- 1.4.0-beta04
+ 1.4.1-beta05
Chris L.(Luo Sheng)
Hangzhou Delian Science Technology Co.,Ltd.
Modbus.Net Siemens Profinet Implementation
diff --git a/Modbus.Net/Modbus.Net.Siemens/SiemensTcpProtocolLinker.cs b/Modbus.Net/Modbus.Net.Siemens/SiemensTcpProtocolLinker.cs
index ca2997f..6efe9a2 100644
--- a/Modbus.Net/Modbus.Net.Siemens/SiemensTcpProtocolLinker.cs
+++ b/Modbus.Net/Modbus.Net.Siemens/SiemensTcpProtocolLinker.cs
@@ -25,7 +25,7 @@ namespace Modbus.Net.Siemens
public SiemensTcpProtocolLinker(string ip, int port)
: base(ip, port)
{
- ((BaseConnector)BaseConnector).AddController(new MatchDirectlySendController(new ICollection<(int, int)>[] { new List<(int, int)> { (11, 11), (12, 12) } }, DuplicateWithCount.GetDuplcateFunc(new List { 2, 3 }, 0), waitingListMaxCount: ConfigurationReader.GetValue("TCP:" + ip + ":" + port, "WaitingListCount") != null ? int.Parse(ConfigurationReader.GetValue("TCP:" + ip + ":" + port, "WaitingListCount")) : null));
+ ((EventHandlerConnector)BaseConnector).AddController(new MatchDirectlySendController(new ICollection<(int, int)>[] { new List<(int, int)> { (11, 11), (12, 12) } }, DuplicateWithCount.GetDuplcateFunc(new List { 2, 3 }, 0), waitingListMaxCount: ConfigurationReader.GetValue("TCP:" + ip + ":" + port, "WaitingListCount") != null ? int.Parse(ConfigurationReader.GetValue("TCP:" + ip + ":" + port, "WaitingListCount")) : null));
}
///
diff --git a/Modbus.Net/Modbus.Net/Connector/EventHandlerConnector.cs b/Modbus.Net/Modbus.Net/Connector/EventHandlerConnector.cs
new file mode 100644
index 0000000..cdafd1e
--- /dev/null
+++ b/Modbus.Net/Modbus.Net/Connector/EventHandlerConnector.cs
@@ -0,0 +1,172 @@
+using DotNetty.Transport.Channels;
+using Microsoft.Extensions.Logging;
+using Nito.AsyncEx;
+using System;
+using System.Threading.Tasks;
+
+namespace Modbus.Net
+{
+ ///
+ public abstract class EventHandlerConnector : EventHandlerConnector
+ {
+ ///
+ public override bool IsSharable => true;
+
+ private static readonly ILogger logger = LogProvider.CreateLogger();
+
+ ///
+ /// 发送锁
+ ///
+ protected abstract AsyncLock Lock { get; }
+
+ ///
+ /// 是否为全双工
+ ///
+ public bool IsFullDuplex { get; }
+
+ ///
+ /// 发送超时时间
+ ///
+ protected abstract int TimeoutTime { get; set; }
+
+ ///
+ /// 构造器
+ ///
+ /// 发送超时时间
+ /// 是否为全双工
+ protected EventHandlerConnector(int timeoutTime = 10000, bool isFullDuplex = true)
+ {
+ IsFullDuplex = isFullDuplex;
+ if (timeoutTime < -1) timeoutTime = -1;
+ TimeoutTime = timeoutTime;
+ }
+
+ ///
+ public override async Task SendMsgAsync(byte[] message)
+ {
+ var ans = await SendMsgInner(message);
+ if (ans == null) return new byte[0];
+ return ans.ReceiveMessage;
+ }
+
+ ///
+ /// 发送内部
+ ///
+ /// 发送的信息
+ /// 发送信息的定义
+ protected async Task SendMsgInner(byte[] message)
+ {
+ IDisposable asyncLock = null;
+ try
+ {
+ var messageSendingdef = Controller.AddMessage(message);
+ if (messageSendingdef != null)
+ {
+ if (!IsFullDuplex)
+ {
+ asyncLock = await Lock.LockAsync();
+ }
+ var success = messageSendingdef.SendMutex.WaitOne(TimeoutTime);
+ if (success)
+ {
+ await SendMsgWithoutConfirm(message);
+ success = messageSendingdef.ReceiveMutex.WaitOne(TimeoutTime);
+ if (success)
+ {
+ return messageSendingdef;
+ }
+ }
+ Controller.ForceRemoveWaitingMessage(messageSendingdef);
+ }
+ logger.LogInformation("Message is waiting in {0}. Cancel!", ConnectionToken);
+ return null;
+ }
+ catch (Exception e)
+ {
+ logger.LogError(e, "Connector {0} Send Error.", ConnectionToken);
+ return null;
+ }
+ finally
+ {
+ asyncLock?.Dispose();
+ }
+ }
+
+ ///
+ public override void ChannelReadComplete(IChannelHandlerContext ctx)
+ {
+ ctx.Flush();
+ }
+
+ ///
+ public override void ExceptionCaught(IChannelHandlerContext ctx, Exception e)
+ {
+ logger.LogError(e, e.ToString());
+ ctx.CloseAsync();
+ }
+ }
+
+ ///
+ /// 基础的协议连接类
+ ///
+ public abstract class EventHandlerConnector : ChannelHandlerAdapter, IConnector where TParamIn : class
+ {
+ ///
+ /// 数据返回代理参数
+ ///
+ ///
+ ///
+ ///
+ public delegate MessageReturnCallbackArgs MessageReturnDelegate(object sender, MessageReturnArgs args);
+
+ ///
+ /// 数据返回代理
+ ///
+ public event MessageReturnDelegate MessageReturn;
+
+ ///
+ /// 增加传输控制器
+ ///
+ /// 传输控制器
+ public void AddController(IController controller)
+ {
+ Controller = controller;
+ }
+
+ ///
+ /// 传输控制器
+ ///
+ protected virtual IController Controller { get; set; }
+
+ ///
+ public abstract string ConnectionToken { get; }
+
+ ///
+ public abstract bool IsConnected { get; }
+
+ ///
+ public abstract Task ConnectAsync();
+
+ ///
+ public abstract bool Disconnect();
+
+ ///
+ public abstract Task SendMsgAsync(TParamIn message);
+
+ ///
+ /// 发送数据,不确认
+ ///
+ /// 需要发送的数据
+ protected abstract Task SendMsgWithoutConfirm(TParamIn message);
+
+ ///
+ /// 数据返回代理函数
+ ///
+ ///
+ ///
+ protected TParamIn InvokeReturnMessage(TParamOut receiveMessage)
+ {
+ return MessageReturn?.Invoke(this, new MessageReturnArgs { ReturnMessage = receiveMessage })?.SendMessage;
+ }
+ }
+}
\ No newline at end of file
diff --git a/Modbus.Net/Modbus.Net/Connector/TcpConnector.cs b/Modbus.Net/Modbus.Net/Connector/TcpConnector.cs
index 081a6ad..11fcd91 100644
--- a/Modbus.Net/Modbus.Net/Connector/TcpConnector.cs
+++ b/Modbus.Net/Modbus.Net/Connector/TcpConnector.cs
@@ -1,9 +1,13 @@
-using Microsoft.Extensions.Logging;
+using DotNetty.Buffers;
+using DotNetty.Common.Utilities;
+using DotNetty.Transport.Bootstrapping;
+using DotNetty.Transport.Channels;
+using DotNetty.Transport.Channels.Sockets;
+using Microsoft.Extensions.Logging;
using Nito.AsyncEx;
using System;
using System.Linq;
-using System.Net.Sockets;
-using System.Threading;
+using System.Net;
using System.Threading.Tasks;
namespace Modbus.Net
@@ -12,29 +16,19 @@ namespace Modbus.Net
/// Socket收发类
/// 作者:本类来源于CSDN,并由罗圣(Chris L.)根据实际需要修改
///
- public class TcpConnector : BaseConnector, IDisposable
+ public class TcpConnector : EventHandlerConnector, IDisposable
{
private static readonly ILogger logger = LogProvider.CreateLogger();
private readonly string _host;
private readonly int _port;
- ///
- /// 1MB 的接收缓冲区
- ///
- private readonly byte[] _receiveBuffer = new byte[1024];
-
private int _errorCount;
private int _receiveCount;
private int _sendCount;
- private TcpClient _socketClient;
-
- private int _timeoutTime;
-
- private Task _receiveThread;
- private bool _taskCancel = false;
+ private IChannel Channel { get; set; }
///
/// 构造器
@@ -53,20 +47,10 @@ namespace Modbus.Net
public override string ConnectionToken => _host;
///
- protected override int TimeoutTime
- {
- get =>
- _timeoutTime;
- set
- {
- _timeoutTime = value;
- if (_socketClient != null)
- _socketClient.ReceiveTimeout = _timeoutTime;
- }
- }
+ protected override int TimeoutTime { get; set; }
///
- public override bool IsConnected => _socketClient?.Client != null && _socketClient.Connected;
+ public override bool IsConnected => Channel?.Open == true;
///
protected override AsyncLock Lock { get; } = new AsyncLock();
@@ -92,10 +76,10 @@ namespace Modbus.Net
// Release managed resources
}
// Release unmanaged resources
- if (_socketClient != null)
+ if (Channel != null)
{
- CloseClientSocket();
- _socketClient = null;
+ CloseClientSocket().Wait();
+ Channel = null;
logger.LogDebug("Tcp client {ConnectionToken} Disposed", ConnectionToken);
}
}
@@ -114,28 +98,31 @@ namespace Modbus.Net
{
using (await Lock.LockAsync())
{
- if (_socketClient != null)
+ if (Channel != null)
{
- if (_socketClient.Connected)
+ if (Channel.Open)
return true;
}
try
{
- _socketClient = new TcpClient
- {
- SendTimeout = TimeoutTime,
- ReceiveTimeout = TimeoutTime
- };
+ var bootstrap = new Bootstrap();
+ bootstrap
+ .Group(new MultithreadEventLoopGroup())
+ .Channel()
+ .Option(ChannelOption.TcpNodelay, true)
+ .Option(ChannelOption.ConnectTimeout, TimeSpan.FromMilliseconds(TimeoutTime))
+ .Handler(new ActionChannelInitializer(channel =>
+ {
+ IChannelPipeline pipeline = channel.Pipeline;
- var cts = new CancellationTokenSource();
- cts.CancelAfter(TimeoutTime);
- await _socketClient.ConnectAsync(_host, _port).WithCancellation(cts.Token);
+ pipeline.AddLast("handler", this);
+ }));
- if (_socketClient.Connected)
+ Channel = await bootstrap.ConnectAsync(new IPEndPoint(IPAddress.Parse(_host), _port));
+
+ if (Channel.Open)
{
- _taskCancel = false;
Controller.SendStart();
- ReceiveMsgThreadStart();
logger.LogInformation("Tcp client {ConnectionToken} connected", ConnectionToken);
return true;
}
@@ -146,6 +133,9 @@ namespace Modbus.Net
catch (Exception err)
{
logger.LogError(err, "Tcp client {ConnectionToken} connect exception", ConnectionToken);
+
+ RefreshErrorCount();
+
Dispose();
return false;
}
@@ -155,7 +145,7 @@ namespace Modbus.Net
///
public override bool Disconnect()
{
- if (_socketClient == null)
+ if (Channel.Open)
return true;
try
@@ -167,11 +157,14 @@ namespace Modbus.Net
catch (Exception err)
{
logger.LogError(err, "Tcp client {ConnectionToken} disconnected exception", ConnectionToken);
+
+ RefreshErrorCount();
+
return false;
}
finally
{
- _socketClient = null;
+ Channel = null;
}
}
@@ -185,75 +178,54 @@ namespace Modbus.Net
if (!IsConnected)
await ConnectAsync();
- var stream = _socketClient.GetStream();
-
RefreshSendCount();
logger.LogDebug("Tcp client {ConnectionToken} send text len = {Length}", ConnectionToken, datagram.Length);
- logger.LogDebug($"Tcp client {ConnectionToken} send: {String.Concat(datagram.Select(p => " " + p.ToString("X2")))}");
- await stream.WriteAsync(datagram, 0, datagram.Length);
+ logger.LogDebug($"Tcp client {ConnectionToken} send: {string.Concat(datagram.Select(p => " " + p.ToString("X2")))}");
+ IByteBuffer buffer = Unpooled.Buffer();
+ buffer.WriteBytes(datagram);
+ await Channel.WriteAndFlushAsync(buffer);
}
catch (Exception err)
{
logger.LogError(err, "Tcp client {ConnectionToken} send exception", ConnectionToken);
+
+ RefreshErrorCount();
+
Dispose();
}
}
- ///
- protected override void ReceiveMsgThreadStart()
- {
- _receiveThread = Task.Run(ReceiveMessage);
- }
-
- ///
- protected override void ReceiveMsgThreadStop()
- {
- _taskCancel = true;
- }
-
- ///
- /// 接收返回消息
- ///
- /// 返回的消息
- protected async Task ReceiveMessage()
+ ///
+ public override async void ChannelRead(IChannelHandlerContext context, object message)
{
try
{
- while (!_taskCancel)
+ if (message is IByteBuffer buffer)
{
- if (_socketClient == null) break;
- NetworkStream stream = _socketClient.GetStream();
- var len = await stream.ReadAsync(_receiveBuffer, 0, _receiveBuffer.Length);
- stream.Flush();
-
- // 异步接收回答
- if (len > 0)
+ byte[] msg = buffer.Array.Slice(buffer.ArrayOffset, buffer.ReadableBytes);
+ logger.LogDebug("Tcp client {ConnectionToken} receive text len = {Length}", ConnectionToken,
+ msg.Length);
+ logger.LogDebug(
+ $"Tcp client {ConnectionToken} receive: {string.Concat(msg.Select(p => " " + p.ToString("X2")))}");
+ var isMessageConfirmed = Controller.ConfirmMessage(msg);
+ if (isMessageConfirmed != null)
{
- byte[] receiveBytes = CheckReplyDatagram(len);
- logger.LogDebug("Tcp client {ConnectionToken} receive text len = {Length}", ConnectionToken,
- receiveBytes.Length);
- logger.LogDebug(
- $"Tcp client {ConnectionToken} receive: {String.Concat(receiveBytes.Select(p => " " + p.ToString("X2")))}");
- var isMessageConfirmed = Controller.ConfirmMessage(receiveBytes);
- if (isMessageConfirmed != null)
+ foreach (var confirmed in isMessageConfirmed)
{
- foreach (var confirmed in isMessageConfirmed)
+ if (confirmed.Item2 == false)
{
- if (confirmed.Item2 == false)
+ var sendMessage = InvokeReturnMessage(confirmed.Item1);
+ //主动传输事件
+ if (sendMessage != null)
{
- var sendMessage = InvokeReturnMessage(confirmed.Item1);
- //主动传输事件
- if (sendMessage != null)
- {
- await SendMsgWithoutConfirm(sendMessage);
- }
+ await SendMsgWithoutConfirm(sendMessage);
}
}
}
-
- RefreshReceiveCount();
}
+
+ RefreshReceiveCount();
}
}
catch (ObjectDisposedException)
@@ -263,23 +235,11 @@ namespace Modbus.Net
catch (Exception err)
{
logger.LogError(err, "Tcp client {ConnectionToken} receive exception", ConnectionToken);
- //CloseClientSocket();
- }
- }
- ///
- /// 接收消息,并转换成字符串
- ///
- /// 消息的长度
- private byte[] CheckReplyDatagram(int len)
- {
- var replyMessage = new byte[len];
- Array.Copy(_receiveBuffer, replyMessage, len);
-
- if (len <= 0)
RefreshErrorCount();
- return replyMessage;
+ await CloseClientSocket();
+ }
}
private void RefreshSendCount()
@@ -300,24 +260,25 @@ namespace Modbus.Net
logger.LogDebug("Tcp client {ConnectionToken} error count: {ErrorCount}", ConnectionToken, _errorCount);
}
- private void CloseClientSocket()
+ private async Task CloseClientSocket()
{
try
{
Controller.SendStop();
Controller.Clear();
- ReceiveMsgThreadStop();
- if (_socketClient != null)
+ if (Channel != null)
{
- if (_socketClient.Connected)
+ if (Channel.Open)
{
- _socketClient.Close();
+ await Channel.CloseAsync();
}
}
}
catch (Exception ex)
{
logger.LogError(ex, "Tcp client {ConnectionToken} client close exception", ConnectionToken);
+
+ RefreshErrorCount();
}
}
}
diff --git a/Modbus.Net/Modbus.Net/Connector/UdpConnector.cs b/Modbus.Net/Modbus.Net/Connector/UdpConnector.cs
index c1775a8..8ca5927 100644
--- a/Modbus.Net/Modbus.Net/Connector/UdpConnector.cs
+++ b/Modbus.Net/Modbus.Net/Connector/UdpConnector.cs
@@ -1,9 +1,14 @@
-using Microsoft.Extensions.Logging;
+using DotNetty.Buffers;
+using DotNetty.Common.Utilities;
+using DotNetty.Transport.Bootstrapping;
+using DotNetty.Transport.Channels;
+using DotNetty.Transport.Channels.Sockets;
+using Microsoft.Extensions.Logging;
using Nito.AsyncEx;
using System;
using System.Linq;
+using System.Net;
using System.Net.Sockets;
-using System.Threading;
using System.Threading.Tasks;
namespace Modbus.Net
@@ -11,27 +16,19 @@ namespace Modbus.Net
///
/// Udp收发类
///
- public class UdpConnector : BaseConnector, IDisposable
+ public class UdpConnector : EventHandlerConnector, IDisposable
{
private static readonly ILogger logger = LogProvider.CreateLogger();
private readonly string _host;
private readonly int _port;
- ///
- /// 1MB 的接收缓冲区
- ///
- private readonly byte[] _receiveBuffer = new byte[1024];
-
private int _errorCount;
private int _receiveCount;
private int _sendCount;
- private UdpClient _socketClient;
-
- private Task _receiveThread;
- private bool _taskCancel = false;
+ private IChannel Channel { get; set; }
///
/// 构造器
@@ -53,7 +50,7 @@ namespace Modbus.Net
protected override int TimeoutTime { get; set; }
///
- public override bool IsConnected => _socketClient?.Client != null && _socketClient.Client.Connected;
+ public override bool IsConnected => Channel != null && Channel.Active;
///
protected override AsyncLock Lock { get; } = new AsyncLock();
@@ -79,10 +76,10 @@ namespace Modbus.Net
// Release managed resources
}
// Release unmanaged resources
- if (_socketClient != null)
+ if (Channel != null)
{
- CloseClientSocket();
- _socketClient = null;
+ CloseClientSocket().Wait();
+ Channel = null;
logger.LogDebug("Udp client {ConnectionToken} Disposed", ConnectionToken);
}
}
@@ -101,23 +98,30 @@ namespace Modbus.Net
{
using (await Lock.LockAsync())
{
- if (_socketClient != null)
+ if (Channel != null)
{
return true;
}
try
{
- _socketClient = new UdpClient();
+ var bootstrap = new Bootstrap();
+ bootstrap
+ .Group(new MultithreadEventLoopGroup())
+ .Channel()
+ .Option(ChannelOption.SoBroadcast, true)
+ .Option(ChannelOption.ConnectTimeout, TimeSpan.FromMilliseconds(TimeoutTime))
+ .Handler(new ActionChannelInitializer(channel =>
+ {
+ IChannelPipeline pipeline = channel.Pipeline;
- var cts = new CancellationTokenSource();
- cts.CancelAfter(TimeoutTime);
- await Task.Run(() => _socketClient.Connect(_host, _port), cts.Token);
+ pipeline.AddLast("handler", this);
+ }));
- if (_socketClient.Client.Connected)
+ Channel = await bootstrap.BindAsync(IPEndPoint.MinPort);
+
+ if (Channel.Active)
{
- _taskCancel = false;
Controller.SendStart();
- ReceiveMsgThreadStart();
logger.LogInformation("Udp client {ConnectionToken} connected", ConnectionToken);
return true;
}
@@ -138,7 +142,7 @@ namespace Modbus.Net
///
public override bool Disconnect()
{
- if (_socketClient == null)
+ if (Channel == null)
return true;
try
@@ -150,11 +154,14 @@ namespace Modbus.Net
catch (Exception err)
{
logger.LogError(err, "Udp client {ConnectionToken} disconnected exception", ConnectionToken);
+
+ RefreshErrorCount();
+
return false;
}
finally
{
- _socketClient = null;
+ Channel = null;
}
}
@@ -171,71 +178,53 @@ namespace Modbus.Net
RefreshSendCount();
logger.LogDebug("Udp client {ConnectionToken} send text len = {Length}", ConnectionToken, datagram.Length);
- logger.LogDebug($"Udp client {ConnectionToken} send: {String.Concat(datagram.Select(p => " " + p.ToString("X2")))}");
- await _socketClient.SendAsync(datagram, datagram.Length);
+ logger.LogDebug($"Udp client {ConnectionToken} send: {string.Concat(datagram.Select(p => " " + p.ToString("X2")))}");
+ IByteBuffer buffer = Unpooled.Buffer();
+ buffer.WriteBytes(datagram);
+ var packet = new DatagramPacket((IByteBuffer)buffer.Retain(), new IPEndPoint(IPAddress.Parse(_host), _port));
+ await Channel.WriteAndFlushAsync(packet);
}
catch (Exception err)
{
logger.LogError(err, "Udp client {ConnectionToken} send exception", ConnectionToken);
+
+ RefreshErrorCount();
+
Dispose();
}
}
- ///
- protected override void ReceiveMsgThreadStart()
- {
- _receiveThread = Task.Run(ReceiveMessage);
- }
-
- ///
- protected override void ReceiveMsgThreadStop()
- {
- _taskCancel = true;
- }
-
- ///
- /// 接收返回消息
- ///
- /// 返回的消息
- protected async Task ReceiveMessage()
+ ///
+ public override async void ChannelRead(IChannelHandlerContext ctx, object message)
{
try
{
- while (!_taskCancel)
+ if (message is DatagramPacket packet)
{
- if (_socketClient == null) break;
- var receive = await _socketClient.ReceiveAsync();
-
- var len = receive.Buffer.Length;
- // 异步接收回答
- if (len > 0)
+ var buffer = packet.Content;
+ byte[] msg = buffer.Array.Slice(buffer.ArrayOffset, buffer.ReadableBytes);
+ logger.LogDebug("Udp client {ConnectionToken} receive text len = {Length}", ConnectionToken,
+ msg.Length);
+ logger.LogDebug(
+ $"Udp client {ConnectionToken} receive: {string.Concat(msg.Select(p => " " + p.ToString("X2")))}");
+ var isMessageConfirmed = Controller.ConfirmMessage(msg);
+ if (isMessageConfirmed != null)
{
- if (receive.Buffer.Clone() is byte[] receiveBytes)
+ foreach (var confirmed in isMessageConfirmed)
{
- logger.LogDebug("Udp client {ConnectionToken} receive text len = {Length}", ConnectionToken,
- receiveBytes.Length);
- logger.LogDebug(
- $"Udp client {ConnectionToken} receive: {String.Concat(receiveBytes.Select(p => " " + p.ToString("X2")))}");
- var isMessageConfirmed = Controller.ConfirmMessage(receiveBytes);
- if (isMessageConfirmed != null)
+ if (confirmed.Item2 == false)
{
- foreach (var confirmed in isMessageConfirmed)
+ var sendMessage = InvokeReturnMessage(confirmed.Item1);
+ //主动传输事件
+ if (sendMessage != null)
{
- if (confirmed.Item2 == false)
- {
- var sendMessage = InvokeReturnMessage(confirmed.Item1);
- //主动传输事件
- if (sendMessage != null)
- {
- await SendMsgWithoutConfirm(sendMessage);
- }
- }
+ await SendMsgWithoutConfirm(sendMessage);
}
}
}
-
- RefreshReceiveCount();
}
+
+ RefreshReceiveCount();
}
}
catch (ObjectDisposedException)
@@ -245,7 +234,10 @@ namespace Modbus.Net
catch (Exception err)
{
logger.LogError(err, "Udp client {ConnectionToken} receive exception", ConnectionToken);
- //CloseClientSocket();
+
+ RefreshErrorCount();
+
+ await CloseClientSocket();
}
}
@@ -267,25 +259,25 @@ namespace Modbus.Net
logger.LogDebug("Udp client {ConnectionToken} error count: {ErrorCount}", ConnectionToken, _errorCount);
}
- private void CloseClientSocket()
+ private async Task CloseClientSocket()
{
try
{
Controller.SendStop();
Controller.Clear();
- ReceiveMsgThreadStop();
- if (_socketClient != null)
+ if (Channel != null)
{
- if (_socketClient.Client?.Connected == true)
+ if (Channel.Active)
{
- _socketClient.Client.Disconnect(false);
+ await Channel.CloseAsync();
}
- _socketClient.Close();
}
}
catch (Exception ex)
{
logger.LogError(ex, "Udp client {ConnectionToken} client close exception", ConnectionToken);
+
+ RefreshErrorCount();
}
}
}
diff --git a/Modbus.Net/Modbus.Net/Modbus.Net.csproj b/Modbus.Net/Modbus.Net/Modbus.Net.csproj
index 1e84a1d..a5e7f46 100644
--- a/Modbus.Net/Modbus.Net/Modbus.Net.csproj
+++ b/Modbus.Net/Modbus.Net/Modbus.Net.csproj
@@ -5,7 +5,7 @@
Modbus.Net
Modbus.Net
Modbus.Net
- 1.4.0-beta04
+ 1.4.1-beta05
Modbus.Net
Chris L.(Luo Sheng)
Hangzhou Delian Science Technology Co.,Ltd.
@@ -29,6 +29,7 @@
+