From 9b34aaae0a0923baf8ed7bd4cba39db61298717c Mon Sep 17 00:00:00 2001 From: luosheng Date: Thu, 23 Mar 2023 16:15:57 +0800 Subject: [PATCH] Use DotNetty for better TCP and UDP experience --- .../Modbus.Net.Modbus.NA200H.csproj | 2 +- .../Modbus.Net.Modbus.csproj | 2 +- .../ModbusAsciiInTcpProtocolLinker.cs | 2 +- .../ModbusAsciiInUdpProtocolLinker.cs | 2 +- .../ModbusRtuInTcpProtocolLinker.cs | 2 +- .../ModbusRtuInUdpProtocolLinker.cs | 2 +- .../ModbusTcpProtocolLinker.cs | 2 +- .../ModbusUdpProtocolLinker.cs | 4 +- .../Modbus.Net.OPC/Modbus.Net.OPC.csproj | 2 +- .../Modbus.Net.Siemens.csproj | 2 +- .../SiemensTcpProtocolLinker.cs | 2 +- .../Connector/EventHandlerConnector.cs | 172 +++++++++++++++++ .../Modbus.Net/Connector/TcpConnector.cs | 181 +++++++----------- .../Modbus.Net/Connector/UdpConnector.cs | 148 +++++++------- Modbus.Net/Modbus.Net/Modbus.Net.csproj | 3 +- 15 files changed, 327 insertions(+), 201 deletions(-) create mode 100644 Modbus.Net/Modbus.Net/Connector/EventHandlerConnector.cs diff --git a/Modbus.Net/Modbus.Net.Modbus.NA200H/Modbus.Net.Modbus.NA200H.csproj b/Modbus.Net/Modbus.Net.Modbus.NA200H/Modbus.Net.Modbus.NA200H.csproj index 489b6c8..2ffb238 100644 --- a/Modbus.Net/Modbus.Net.Modbus.NA200H/Modbus.Net.Modbus.NA200H.csproj +++ b/Modbus.Net/Modbus.Net.Modbus.NA200H/Modbus.Net.Modbus.NA200H.csproj @@ -5,7 +5,7 @@ Modbus.Net.Modbus.NA200H Modbus.Net.Modbus.NA200H Modbus.Net.Modbus.NA200H - 1.4.0-beta04 + 1.4.1-beta05 Chris L.(Luo Sheng) Hangzhou Delian Science Technology Co.,Ltd. Modbus.Net.Modbus diff --git a/Modbus.Net/Modbus.Net.Modbus/Modbus.Net.Modbus.csproj b/Modbus.Net/Modbus.Net.Modbus/Modbus.Net.Modbus.csproj index e81085e..8f7111d 100644 --- a/Modbus.Net/Modbus.Net.Modbus/Modbus.Net.Modbus.csproj +++ b/Modbus.Net/Modbus.Net.Modbus/Modbus.Net.Modbus.csproj @@ -5,7 +5,7 @@ Modbus.Net.Modbus Modbus.Net.Modbus Modbus.Net.Modbus - 1.4.0-beta04 + 1.4.1-beta05 Chris L.(Luo Sheng) Hangzhou Delian Science Technology Co.,Ltd. Modbus.Net.Modbus diff --git a/Modbus.Net/Modbus.Net.Modbus/ModbusAsciiInTcpProtocolLinker.cs b/Modbus.Net/Modbus.Net.Modbus/ModbusAsciiInTcpProtocolLinker.cs index 2b4083c..ad60e7f 100644 --- a/Modbus.Net/Modbus.Net.Modbus/ModbusAsciiInTcpProtocolLinker.cs +++ b/Modbus.Net/Modbus.Net.Modbus/ModbusAsciiInTcpProtocolLinker.cs @@ -24,7 +24,7 @@ namespace Modbus.Net.Modbus public ModbusAsciiInTcpProtocolLinker(string ip, int port) : base(ip, port) { - ((BaseConnector)BaseConnector).AddController(new FifoController(int.Parse(ConfigurationReader.GetValue("TCP:" + ip + ":" + port, "FetchSleepTime")), lengthCalc: content => { if (content[0] != 0x3a) return 0; for (int i = 1; i < content.Length; i++) { if (content[i - 1] == 0x0D && content[i] == 0x0A) return i + 1; } return -1; }, waitingListMaxCount: ConfigurationReader.GetValue("TCP:" + ip + ":" + port, "WaitingListCount") != null ? int.Parse(ConfigurationReader.GetValue("TCP:" + ip + ":" + port, "WaitingListCount")) : null)); + ((EventHandlerConnector)BaseConnector).AddController(new FifoController(int.Parse(ConfigurationReader.GetValue("TCP:" + ip + ":" + port, "FetchSleepTime")), lengthCalc: content => { if (content[0] != 0x3a) return 0; for (int i = 1; i < content.Length; i++) { if (content[i - 1] == 0x0D && content[i] == 0x0A) return i + 1; } return -1; }, waitingListMaxCount: ConfigurationReader.GetValue("TCP:" + ip + ":" + port, "WaitingListCount") != null ? int.Parse(ConfigurationReader.GetValue("TCP:" + ip + ":" + port, "WaitingListCount")) : null)); } /// diff --git a/Modbus.Net/Modbus.Net.Modbus/ModbusAsciiInUdpProtocolLinker.cs b/Modbus.Net/Modbus.Net.Modbus/ModbusAsciiInUdpProtocolLinker.cs index 63a336b..f8217ba 100644 --- a/Modbus.Net/Modbus.Net.Modbus/ModbusAsciiInUdpProtocolLinker.cs +++ b/Modbus.Net/Modbus.Net.Modbus/ModbusAsciiInUdpProtocolLinker.cs @@ -24,7 +24,7 @@ namespace Modbus.Net.Modbus public ModbusAsciiInUdpProtocolLinker(string ip, int port) : base(ip, port) { - ((BaseConnector)BaseConnector).AddController(new FifoController(int.Parse(ConfigurationReader.GetValue("UDP:" + ip + ":" + port, "FetchSleepTime")), lengthCalc: content => { if (content[0] != 0x3a) return 0; for (int i = 1; i < content.Length; i++) { if (content[i - 1] == 0x0D && content[i] == 0x0A) return i + 1; } return -1; }, waitingListMaxCount: ConfigurationReader.GetValue("UDP:" + ip + ":" + port, "WaitingListCount") != null ? int.Parse(ConfigurationReader.GetValue("UDP:" + ip + ":" + port, "WaitingListCount")) : null)); + ((EventHandlerConnector)BaseConnector).AddController(new FifoController(int.Parse(ConfigurationReader.GetValue("UDP:" + ip + ":" + port, "FetchSleepTime")), lengthCalc: content => { if (content[0] != 0x3a) return 0; for (int i = 1; i < content.Length; i++) { if (content[i - 1] == 0x0D && content[i] == 0x0A) return i + 1; } return -1; }, waitingListMaxCount: ConfigurationReader.GetValue("UDP:" + ip + ":" + port, "WaitingListCount") != null ? int.Parse(ConfigurationReader.GetValue("UDP:" + ip + ":" + port, "WaitingListCount")) : null)); } /// diff --git a/Modbus.Net/Modbus.Net.Modbus/ModbusRtuInTcpProtocolLinker.cs b/Modbus.Net/Modbus.Net.Modbus/ModbusRtuInTcpProtocolLinker.cs index 68e3709..76685d9 100644 --- a/Modbus.Net/Modbus.Net.Modbus/ModbusRtuInTcpProtocolLinker.cs +++ b/Modbus.Net/Modbus.Net.Modbus/ModbusRtuInTcpProtocolLinker.cs @@ -24,7 +24,7 @@ namespace Modbus.Net.Modbus public ModbusRtuInTcpProtocolLinker(string ip, int port) : base(ip, port) { - ((BaseConnector)BaseConnector).AddController(new FifoController(int.Parse(ConfigurationReader.GetValue("TCP:" + ip + ":" + port, "FetchSleepTime")), lengthCalc: content => { if (content[1] == 5 || content[1] == 6 || content[1] == 15 || content[1] == 16 || content[1] == 21) return 8; else return DuplicateWithCount.GetDuplcateFunc(new List { 2 }, 5).Invoke(content); }, waitingListMaxCount: ConfigurationReader.GetValue("TCP:" + ip + ":" + port, "WaitingListCount") != null ? int.Parse(ConfigurationReader.GetValue("TCP:" + ip + ":" + port, "WaitingListCount")) : null)); + ((EventHandlerConnector)BaseConnector).AddController(new FifoController(int.Parse(ConfigurationReader.GetValue("TCP:" + ip + ":" + port, "FetchSleepTime")), lengthCalc: content => { if (content[1] == 5 || content[1] == 6 || content[1] == 15 || content[1] == 16 || content[1] == 21) return 8; else return DuplicateWithCount.GetDuplcateFunc(new List { 2 }, 5).Invoke(content); }, waitingListMaxCount: ConfigurationReader.GetValue("TCP:" + ip + ":" + port, "WaitingListCount") != null ? int.Parse(ConfigurationReader.GetValue("TCP:" + ip + ":" + port, "WaitingListCount")) : null)); } /// diff --git a/Modbus.Net/Modbus.Net.Modbus/ModbusRtuInUdpProtocolLinker.cs b/Modbus.Net/Modbus.Net.Modbus/ModbusRtuInUdpProtocolLinker.cs index 58bba98..3694360 100644 --- a/Modbus.Net/Modbus.Net.Modbus/ModbusRtuInUdpProtocolLinker.cs +++ b/Modbus.Net/Modbus.Net.Modbus/ModbusRtuInUdpProtocolLinker.cs @@ -24,7 +24,7 @@ namespace Modbus.Net.Modbus public ModbusRtuInUdpProtocolLinker(string ip, int port) : base(ip, port) { - ((BaseConnector)BaseConnector).AddController(new FifoController(int.Parse(ConfigurationReader.GetValue("UDP:" + ip + ":" + port, "FetchSleepTime")), lengthCalc: content => { if (content[1] == 5 || content[1] == 6 || content[1] == 15 || content[1] == 16 || content[1] == 21) return 8; else return DuplicateWithCount.GetDuplcateFunc(new List { 2 }, 5).Invoke(content); }, waitingListMaxCount: ConfigurationReader.GetValue("UDP:" + ip + ":" + port, "WaitingListCount") != null ? int.Parse(ConfigurationReader.GetValue("UDP:" + ip + ":" + port, "WaitingListCount")) : null)); + ((EventHandlerConnector)BaseConnector).AddController(new FifoController(int.Parse(ConfigurationReader.GetValue("UDP:" + ip + ":" + port, "FetchSleepTime")), lengthCalc: content => { if (content[1] == 5 || content[1] == 6 || content[1] == 15 || content[1] == 16 || content[1] == 21) return 8; else return DuplicateWithCount.GetDuplcateFunc(new List { 2 }, 5).Invoke(content); }, waitingListMaxCount: ConfigurationReader.GetValue("UDP:" + ip + ":" + port, "WaitingListCount") != null ? int.Parse(ConfigurationReader.GetValue("UDP:" + ip + ":" + port, "WaitingListCount")) : null)); } /// diff --git a/Modbus.Net/Modbus.Net.Modbus/ModbusTcpProtocolLinker.cs b/Modbus.Net/Modbus.Net.Modbus/ModbusTcpProtocolLinker.cs index 72c408e..bae4855 100644 --- a/Modbus.Net/Modbus.Net.Modbus/ModbusTcpProtocolLinker.cs +++ b/Modbus.Net/Modbus.Net.Modbus/ModbusTcpProtocolLinker.cs @@ -23,7 +23,7 @@ namespace Modbus.Net.Modbus /// 端口 public ModbusTcpProtocolLinker(string ip, int port) : base(ip, port) { - ((BaseConnector)BaseConnector).AddController(new FifoController(int.Parse(ConfigurationReader.GetValue("TCP:" + ip + ":" + port, "FetchSleepTime")), lengthCalc: DuplicateWithCount.GetDuplcateFunc(new List { 4, 5 }, 6), waitingListMaxCount: ConfigurationReader.GetValue("TCP:" + ip + ":" + port, "WaitingListCount") != null ? int.Parse(ConfigurationReader.GetValue("TCP:" + ip + ":" + port, "WaitingListCount")) : null)); + ((EventHandlerConnector)BaseConnector).AddController(new FifoController(int.Parse(ConfigurationReader.GetValue("TCP:" + ip + ":" + port, "FetchSleepTime")), lengthCalc: DuplicateWithCount.GetDuplcateFunc(new List { 4, 5 }, 6), waitingListMaxCount: ConfigurationReader.GetValue("TCP:" + ip + ":" + port, "WaitingListCount") != null ? int.Parse(ConfigurationReader.GetValue("TCP:" + ip + ":" + port, "WaitingListCount")) : null)); } /// diff --git a/Modbus.Net/Modbus.Net.Modbus/ModbusUdpProtocolLinker.cs b/Modbus.Net/Modbus.Net.Modbus/ModbusUdpProtocolLinker.cs index 8d19711..7e4901a 100644 --- a/Modbus.Net/Modbus.Net.Modbus/ModbusUdpProtocolLinker.cs +++ b/Modbus.Net/Modbus.Net.Modbus/ModbusUdpProtocolLinker.cs @@ -12,7 +12,7 @@ namespace Modbus.Net.Modbus /// /// IP地址 public ModbusUdpProtocolLinker(string ip) - : this(ip, int.Parse(ConfigurationReader.GetValue("UDP:" + ip, "ModbusPort"))) + : this(ip, int.Parse(ConfigurationReader.GetValueDirect("UDP:" + ip, "ModbusPort") ?? ConfigurationReader.GetValueDirect("UDP:Modbus", "ModbusPort"))) { } @@ -23,7 +23,7 @@ namespace Modbus.Net.Modbus /// 端口 public ModbusUdpProtocolLinker(string ip, int port) : base(ip, port) { - ((BaseConnector)BaseConnector).AddController(new FifoController(int.Parse(ConfigurationReader.GetValue("UDP:" + ip + ":" + port, "FetchSleepTime")), lengthCalc: DuplicateWithCount.GetDuplcateFunc(new List { 4, 5 }, 6), waitingListMaxCount: ConfigurationReader.GetValue("UDP:" + ip + ":" + port, "WaitingListCount") != null ? int.Parse(ConfigurationReader.GetValue("UDP:" + ip + ":" + port, "WaitingListCount")) : null)); + ((EventHandlerConnector)BaseConnector).AddController(new FifoController(int.Parse(ConfigurationReader.GetValue("UDP:" + ip + ":" + port, "FetchSleepTime")), lengthCalc: DuplicateWithCount.GetDuplcateFunc(new List { 4, 5 }, 6), waitingListMaxCount: ConfigurationReader.GetValue("UDP:" + ip + ":" + port, "WaitingListCount") != null ? int.Parse(ConfigurationReader.GetValue("UDP:" + ip + ":" + port, "WaitingListCount")) : null)); } /// diff --git a/Modbus.Net/Modbus.Net.OPC/Modbus.Net.OPC.csproj b/Modbus.Net/Modbus.Net.OPC/Modbus.Net.OPC.csproj index b83affa..b901e52 100644 --- a/Modbus.Net/Modbus.Net.OPC/Modbus.Net.OPC.csproj +++ b/Modbus.Net/Modbus.Net.OPC/Modbus.Net.OPC.csproj @@ -5,7 +5,7 @@ Modbus.Net.OPC Modbus.Net.OPC Modbus.Net.OPC - 1.4.0-beta04 + 1.4.1-beta05 Chris L.(Luo Sheng) Hangzhou Delian Science Technology Co.,Ltd. Modbus.Net.OPC diff --git a/Modbus.Net/Modbus.Net.Siemens/Modbus.Net.Siemens.csproj b/Modbus.Net/Modbus.Net.Siemens/Modbus.Net.Siemens.csproj index 7d8721f..9e7d830 100644 --- a/Modbus.Net/Modbus.Net.Siemens/Modbus.Net.Siemens.csproj +++ b/Modbus.Net/Modbus.Net.Siemens/Modbus.Net.Siemens.csproj @@ -5,7 +5,7 @@ Modbus.Net.Siemens Modbus.Net.Siemens Modbus.Net.Siemens - 1.4.0-beta04 + 1.4.1-beta05 Chris L.(Luo Sheng) Hangzhou Delian Science Technology Co.,Ltd. Modbus.Net Siemens Profinet Implementation diff --git a/Modbus.Net/Modbus.Net.Siemens/SiemensTcpProtocolLinker.cs b/Modbus.Net/Modbus.Net.Siemens/SiemensTcpProtocolLinker.cs index ca2997f..6efe9a2 100644 --- a/Modbus.Net/Modbus.Net.Siemens/SiemensTcpProtocolLinker.cs +++ b/Modbus.Net/Modbus.Net.Siemens/SiemensTcpProtocolLinker.cs @@ -25,7 +25,7 @@ namespace Modbus.Net.Siemens public SiemensTcpProtocolLinker(string ip, int port) : base(ip, port) { - ((BaseConnector)BaseConnector).AddController(new MatchDirectlySendController(new ICollection<(int, int)>[] { new List<(int, int)> { (11, 11), (12, 12) } }, DuplicateWithCount.GetDuplcateFunc(new List { 2, 3 }, 0), waitingListMaxCount: ConfigurationReader.GetValue("TCP:" + ip + ":" + port, "WaitingListCount") != null ? int.Parse(ConfigurationReader.GetValue("TCP:" + ip + ":" + port, "WaitingListCount")) : null)); + ((EventHandlerConnector)BaseConnector).AddController(new MatchDirectlySendController(new ICollection<(int, int)>[] { new List<(int, int)> { (11, 11), (12, 12) } }, DuplicateWithCount.GetDuplcateFunc(new List { 2, 3 }, 0), waitingListMaxCount: ConfigurationReader.GetValue("TCP:" + ip + ":" + port, "WaitingListCount") != null ? int.Parse(ConfigurationReader.GetValue("TCP:" + ip + ":" + port, "WaitingListCount")) : null)); } /// diff --git a/Modbus.Net/Modbus.Net/Connector/EventHandlerConnector.cs b/Modbus.Net/Modbus.Net/Connector/EventHandlerConnector.cs new file mode 100644 index 0000000..cdafd1e --- /dev/null +++ b/Modbus.Net/Modbus.Net/Connector/EventHandlerConnector.cs @@ -0,0 +1,172 @@ +using DotNetty.Transport.Channels; +using Microsoft.Extensions.Logging; +using Nito.AsyncEx; +using System; +using System.Threading.Tasks; + +namespace Modbus.Net +{ + /// + public abstract class EventHandlerConnector : EventHandlerConnector + { + /// + public override bool IsSharable => true; + + private static readonly ILogger logger = LogProvider.CreateLogger(); + + /// + /// 发送锁 + /// + protected abstract AsyncLock Lock { get; } + + /// + /// 是否为全双工 + /// + public bool IsFullDuplex { get; } + + /// + /// 发送超时时间 + /// + protected abstract int TimeoutTime { get; set; } + + /// + /// 构造器 + /// + /// 发送超时时间 + /// 是否为全双工 + protected EventHandlerConnector(int timeoutTime = 10000, bool isFullDuplex = true) + { + IsFullDuplex = isFullDuplex; + if (timeoutTime < -1) timeoutTime = -1; + TimeoutTime = timeoutTime; + } + + /// + public override async Task SendMsgAsync(byte[] message) + { + var ans = await SendMsgInner(message); + if (ans == null) return new byte[0]; + return ans.ReceiveMessage; + } + + /// + /// 发送内部 + /// + /// 发送的信息 + /// 发送信息的定义 + protected async Task SendMsgInner(byte[] message) + { + IDisposable asyncLock = null; + try + { + var messageSendingdef = Controller.AddMessage(message); + if (messageSendingdef != null) + { + if (!IsFullDuplex) + { + asyncLock = await Lock.LockAsync(); + } + var success = messageSendingdef.SendMutex.WaitOne(TimeoutTime); + if (success) + { + await SendMsgWithoutConfirm(message); + success = messageSendingdef.ReceiveMutex.WaitOne(TimeoutTime); + if (success) + { + return messageSendingdef; + } + } + Controller.ForceRemoveWaitingMessage(messageSendingdef); + } + logger.LogInformation("Message is waiting in {0}. Cancel!", ConnectionToken); + return null; + } + catch (Exception e) + { + logger.LogError(e, "Connector {0} Send Error.", ConnectionToken); + return null; + } + finally + { + asyncLock?.Dispose(); + } + } + + /// + public override void ChannelReadComplete(IChannelHandlerContext ctx) + { + ctx.Flush(); + } + + /// + public override void ExceptionCaught(IChannelHandlerContext ctx, Exception e) + { + logger.LogError(e, e.ToString()); + ctx.CloseAsync(); + } + } + + /// + /// 基础的协议连接类 + /// + public abstract class EventHandlerConnector : ChannelHandlerAdapter, IConnector where TParamIn : class + { + /// + /// 数据返回代理参数 + /// + /// + /// + /// + public delegate MessageReturnCallbackArgs MessageReturnDelegate(object sender, MessageReturnArgs args); + + /// + /// 数据返回代理 + /// + public event MessageReturnDelegate MessageReturn; + + /// + /// 增加传输控制器 + /// + /// 传输控制器 + public void AddController(IController controller) + { + Controller = controller; + } + + /// + /// 传输控制器 + /// + protected virtual IController Controller { get; set; } + + /// + public abstract string ConnectionToken { get; } + + /// + public abstract bool IsConnected { get; } + + /// + public abstract Task ConnectAsync(); + + /// + public abstract bool Disconnect(); + + /// + public abstract Task SendMsgAsync(TParamIn message); + + /// + /// 发送数据,不确认 + /// + /// 需要发送的数据 + protected abstract Task SendMsgWithoutConfirm(TParamIn message); + + /// + /// 数据返回代理函数 + /// + /// + /// + protected TParamIn InvokeReturnMessage(TParamOut receiveMessage) + { + return MessageReturn?.Invoke(this, new MessageReturnArgs { ReturnMessage = receiveMessage })?.SendMessage; + } + } +} \ No newline at end of file diff --git a/Modbus.Net/Modbus.Net/Connector/TcpConnector.cs b/Modbus.Net/Modbus.Net/Connector/TcpConnector.cs index 081a6ad..11fcd91 100644 --- a/Modbus.Net/Modbus.Net/Connector/TcpConnector.cs +++ b/Modbus.Net/Modbus.Net/Connector/TcpConnector.cs @@ -1,9 +1,13 @@ -using Microsoft.Extensions.Logging; +using DotNetty.Buffers; +using DotNetty.Common.Utilities; +using DotNetty.Transport.Bootstrapping; +using DotNetty.Transport.Channels; +using DotNetty.Transport.Channels.Sockets; +using Microsoft.Extensions.Logging; using Nito.AsyncEx; using System; using System.Linq; -using System.Net.Sockets; -using System.Threading; +using System.Net; using System.Threading.Tasks; namespace Modbus.Net @@ -12,29 +16,19 @@ namespace Modbus.Net /// Socket收发类 /// 作者:本类来源于CSDN,并由罗圣(Chris L.)根据实际需要修改 /// - public class TcpConnector : BaseConnector, IDisposable + public class TcpConnector : EventHandlerConnector, IDisposable { private static readonly ILogger logger = LogProvider.CreateLogger(); private readonly string _host; private readonly int _port; - /// - /// 1MB 的接收缓冲区 - /// - private readonly byte[] _receiveBuffer = new byte[1024]; - private int _errorCount; private int _receiveCount; private int _sendCount; - private TcpClient _socketClient; - - private int _timeoutTime; - - private Task _receiveThread; - private bool _taskCancel = false; + private IChannel Channel { get; set; } /// /// 构造器 @@ -53,20 +47,10 @@ namespace Modbus.Net public override string ConnectionToken => _host; /// - protected override int TimeoutTime - { - get => - _timeoutTime; - set - { - _timeoutTime = value; - if (_socketClient != null) - _socketClient.ReceiveTimeout = _timeoutTime; - } - } + protected override int TimeoutTime { get; set; } /// - public override bool IsConnected => _socketClient?.Client != null && _socketClient.Connected; + public override bool IsConnected => Channel?.Open == true; /// protected override AsyncLock Lock { get; } = new AsyncLock(); @@ -92,10 +76,10 @@ namespace Modbus.Net // Release managed resources } // Release unmanaged resources - if (_socketClient != null) + if (Channel != null) { - CloseClientSocket(); - _socketClient = null; + CloseClientSocket().Wait(); + Channel = null; logger.LogDebug("Tcp client {ConnectionToken} Disposed", ConnectionToken); } } @@ -114,28 +98,31 @@ namespace Modbus.Net { using (await Lock.LockAsync()) { - if (_socketClient != null) + if (Channel != null) { - if (_socketClient.Connected) + if (Channel.Open) return true; } try { - _socketClient = new TcpClient - { - SendTimeout = TimeoutTime, - ReceiveTimeout = TimeoutTime - }; + var bootstrap = new Bootstrap(); + bootstrap + .Group(new MultithreadEventLoopGroup()) + .Channel() + .Option(ChannelOption.TcpNodelay, true) + .Option(ChannelOption.ConnectTimeout, TimeSpan.FromMilliseconds(TimeoutTime)) + .Handler(new ActionChannelInitializer(channel => + { + IChannelPipeline pipeline = channel.Pipeline; - var cts = new CancellationTokenSource(); - cts.CancelAfter(TimeoutTime); - await _socketClient.ConnectAsync(_host, _port).WithCancellation(cts.Token); + pipeline.AddLast("handler", this); + })); - if (_socketClient.Connected) + Channel = await bootstrap.ConnectAsync(new IPEndPoint(IPAddress.Parse(_host), _port)); + + if (Channel.Open) { - _taskCancel = false; Controller.SendStart(); - ReceiveMsgThreadStart(); logger.LogInformation("Tcp client {ConnectionToken} connected", ConnectionToken); return true; } @@ -146,6 +133,9 @@ namespace Modbus.Net catch (Exception err) { logger.LogError(err, "Tcp client {ConnectionToken} connect exception", ConnectionToken); + + RefreshErrorCount(); + Dispose(); return false; } @@ -155,7 +145,7 @@ namespace Modbus.Net /// public override bool Disconnect() { - if (_socketClient == null) + if (Channel.Open) return true; try @@ -167,11 +157,14 @@ namespace Modbus.Net catch (Exception err) { logger.LogError(err, "Tcp client {ConnectionToken} disconnected exception", ConnectionToken); + + RefreshErrorCount(); + return false; } finally { - _socketClient = null; + Channel = null; } } @@ -185,75 +178,54 @@ namespace Modbus.Net if (!IsConnected) await ConnectAsync(); - var stream = _socketClient.GetStream(); - RefreshSendCount(); logger.LogDebug("Tcp client {ConnectionToken} send text len = {Length}", ConnectionToken, datagram.Length); - logger.LogDebug($"Tcp client {ConnectionToken} send: {String.Concat(datagram.Select(p => " " + p.ToString("X2")))}"); - await stream.WriteAsync(datagram, 0, datagram.Length); + logger.LogDebug($"Tcp client {ConnectionToken} send: {string.Concat(datagram.Select(p => " " + p.ToString("X2")))}"); + IByteBuffer buffer = Unpooled.Buffer(); + buffer.WriteBytes(datagram); + await Channel.WriteAndFlushAsync(buffer); } catch (Exception err) { logger.LogError(err, "Tcp client {ConnectionToken} send exception", ConnectionToken); + + RefreshErrorCount(); + Dispose(); } } - /// - protected override void ReceiveMsgThreadStart() - { - _receiveThread = Task.Run(ReceiveMessage); - } - - /// - protected override void ReceiveMsgThreadStop() - { - _taskCancel = true; - } - - /// - /// 接收返回消息 - /// - /// 返回的消息 - protected async Task ReceiveMessage() + /// + public override async void ChannelRead(IChannelHandlerContext context, object message) { try { - while (!_taskCancel) + if (message is IByteBuffer buffer) { - if (_socketClient == null) break; - NetworkStream stream = _socketClient.GetStream(); - var len = await stream.ReadAsync(_receiveBuffer, 0, _receiveBuffer.Length); - stream.Flush(); - - // 异步接收回答 - if (len > 0) + byte[] msg = buffer.Array.Slice(buffer.ArrayOffset, buffer.ReadableBytes); + logger.LogDebug("Tcp client {ConnectionToken} receive text len = {Length}", ConnectionToken, + msg.Length); + logger.LogDebug( + $"Tcp client {ConnectionToken} receive: {string.Concat(msg.Select(p => " " + p.ToString("X2")))}"); + var isMessageConfirmed = Controller.ConfirmMessage(msg); + if (isMessageConfirmed != null) { - byte[] receiveBytes = CheckReplyDatagram(len); - logger.LogDebug("Tcp client {ConnectionToken} receive text len = {Length}", ConnectionToken, - receiveBytes.Length); - logger.LogDebug( - $"Tcp client {ConnectionToken} receive: {String.Concat(receiveBytes.Select(p => " " + p.ToString("X2")))}"); - var isMessageConfirmed = Controller.ConfirmMessage(receiveBytes); - if (isMessageConfirmed != null) + foreach (var confirmed in isMessageConfirmed) { - foreach (var confirmed in isMessageConfirmed) + if (confirmed.Item2 == false) { - if (confirmed.Item2 == false) + var sendMessage = InvokeReturnMessage(confirmed.Item1); + //主动传输事件 + if (sendMessage != null) { - var sendMessage = InvokeReturnMessage(confirmed.Item1); - //主动传输事件 - if (sendMessage != null) - { - await SendMsgWithoutConfirm(sendMessage); - } + await SendMsgWithoutConfirm(sendMessage); } } } - - RefreshReceiveCount(); } + + RefreshReceiveCount(); } } catch (ObjectDisposedException) @@ -263,23 +235,11 @@ namespace Modbus.Net catch (Exception err) { logger.LogError(err, "Tcp client {ConnectionToken} receive exception", ConnectionToken); - //CloseClientSocket(); - } - } - /// - /// 接收消息,并转换成字符串 - /// - /// 消息的长度 - private byte[] CheckReplyDatagram(int len) - { - var replyMessage = new byte[len]; - Array.Copy(_receiveBuffer, replyMessage, len); - - if (len <= 0) RefreshErrorCount(); - return replyMessage; + await CloseClientSocket(); + } } private void RefreshSendCount() @@ -300,24 +260,25 @@ namespace Modbus.Net logger.LogDebug("Tcp client {ConnectionToken} error count: {ErrorCount}", ConnectionToken, _errorCount); } - private void CloseClientSocket() + private async Task CloseClientSocket() { try { Controller.SendStop(); Controller.Clear(); - ReceiveMsgThreadStop(); - if (_socketClient != null) + if (Channel != null) { - if (_socketClient.Connected) + if (Channel.Open) { - _socketClient.Close(); + await Channel.CloseAsync(); } } } catch (Exception ex) { logger.LogError(ex, "Tcp client {ConnectionToken} client close exception", ConnectionToken); + + RefreshErrorCount(); } } } diff --git a/Modbus.Net/Modbus.Net/Connector/UdpConnector.cs b/Modbus.Net/Modbus.Net/Connector/UdpConnector.cs index c1775a8..8ca5927 100644 --- a/Modbus.Net/Modbus.Net/Connector/UdpConnector.cs +++ b/Modbus.Net/Modbus.Net/Connector/UdpConnector.cs @@ -1,9 +1,14 @@ -using Microsoft.Extensions.Logging; +using DotNetty.Buffers; +using DotNetty.Common.Utilities; +using DotNetty.Transport.Bootstrapping; +using DotNetty.Transport.Channels; +using DotNetty.Transport.Channels.Sockets; +using Microsoft.Extensions.Logging; using Nito.AsyncEx; using System; using System.Linq; +using System.Net; using System.Net.Sockets; -using System.Threading; using System.Threading.Tasks; namespace Modbus.Net @@ -11,27 +16,19 @@ namespace Modbus.Net /// /// Udp收发类 /// - public class UdpConnector : BaseConnector, IDisposable + public class UdpConnector : EventHandlerConnector, IDisposable { private static readonly ILogger logger = LogProvider.CreateLogger(); private readonly string _host; private readonly int _port; - /// - /// 1MB 的接收缓冲区 - /// - private readonly byte[] _receiveBuffer = new byte[1024]; - private int _errorCount; private int _receiveCount; private int _sendCount; - private UdpClient _socketClient; - - private Task _receiveThread; - private bool _taskCancel = false; + private IChannel Channel { get; set; } /// /// 构造器 @@ -53,7 +50,7 @@ namespace Modbus.Net protected override int TimeoutTime { get; set; } /// - public override bool IsConnected => _socketClient?.Client != null && _socketClient.Client.Connected; + public override bool IsConnected => Channel != null && Channel.Active; /// protected override AsyncLock Lock { get; } = new AsyncLock(); @@ -79,10 +76,10 @@ namespace Modbus.Net // Release managed resources } // Release unmanaged resources - if (_socketClient != null) + if (Channel != null) { - CloseClientSocket(); - _socketClient = null; + CloseClientSocket().Wait(); + Channel = null; logger.LogDebug("Udp client {ConnectionToken} Disposed", ConnectionToken); } } @@ -101,23 +98,30 @@ namespace Modbus.Net { using (await Lock.LockAsync()) { - if (_socketClient != null) + if (Channel != null) { return true; } try { - _socketClient = new UdpClient(); + var bootstrap = new Bootstrap(); + bootstrap + .Group(new MultithreadEventLoopGroup()) + .Channel() + .Option(ChannelOption.SoBroadcast, true) + .Option(ChannelOption.ConnectTimeout, TimeSpan.FromMilliseconds(TimeoutTime)) + .Handler(new ActionChannelInitializer(channel => + { + IChannelPipeline pipeline = channel.Pipeline; - var cts = new CancellationTokenSource(); - cts.CancelAfter(TimeoutTime); - await Task.Run(() => _socketClient.Connect(_host, _port), cts.Token); + pipeline.AddLast("handler", this); + })); - if (_socketClient.Client.Connected) + Channel = await bootstrap.BindAsync(IPEndPoint.MinPort); + + if (Channel.Active) { - _taskCancel = false; Controller.SendStart(); - ReceiveMsgThreadStart(); logger.LogInformation("Udp client {ConnectionToken} connected", ConnectionToken); return true; } @@ -138,7 +142,7 @@ namespace Modbus.Net /// public override bool Disconnect() { - if (_socketClient == null) + if (Channel == null) return true; try @@ -150,11 +154,14 @@ namespace Modbus.Net catch (Exception err) { logger.LogError(err, "Udp client {ConnectionToken} disconnected exception", ConnectionToken); + + RefreshErrorCount(); + return false; } finally { - _socketClient = null; + Channel = null; } } @@ -171,71 +178,53 @@ namespace Modbus.Net RefreshSendCount(); logger.LogDebug("Udp client {ConnectionToken} send text len = {Length}", ConnectionToken, datagram.Length); - logger.LogDebug($"Udp client {ConnectionToken} send: {String.Concat(datagram.Select(p => " " + p.ToString("X2")))}"); - await _socketClient.SendAsync(datagram, datagram.Length); + logger.LogDebug($"Udp client {ConnectionToken} send: {string.Concat(datagram.Select(p => " " + p.ToString("X2")))}"); + IByteBuffer buffer = Unpooled.Buffer(); + buffer.WriteBytes(datagram); + var packet = new DatagramPacket((IByteBuffer)buffer.Retain(), new IPEndPoint(IPAddress.Parse(_host), _port)); + await Channel.WriteAndFlushAsync(packet); } catch (Exception err) { logger.LogError(err, "Udp client {ConnectionToken} send exception", ConnectionToken); + + RefreshErrorCount(); + Dispose(); } } - /// - protected override void ReceiveMsgThreadStart() - { - _receiveThread = Task.Run(ReceiveMessage); - } - - /// - protected override void ReceiveMsgThreadStop() - { - _taskCancel = true; - } - - /// - /// 接收返回消息 - /// - /// 返回的消息 - protected async Task ReceiveMessage() + /// + public override async void ChannelRead(IChannelHandlerContext ctx, object message) { try { - while (!_taskCancel) + if (message is DatagramPacket packet) { - if (_socketClient == null) break; - var receive = await _socketClient.ReceiveAsync(); - - var len = receive.Buffer.Length; - // 异步接收回答 - if (len > 0) + var buffer = packet.Content; + byte[] msg = buffer.Array.Slice(buffer.ArrayOffset, buffer.ReadableBytes); + logger.LogDebug("Udp client {ConnectionToken} receive text len = {Length}", ConnectionToken, + msg.Length); + logger.LogDebug( + $"Udp client {ConnectionToken} receive: {string.Concat(msg.Select(p => " " + p.ToString("X2")))}"); + var isMessageConfirmed = Controller.ConfirmMessage(msg); + if (isMessageConfirmed != null) { - if (receive.Buffer.Clone() is byte[] receiveBytes) + foreach (var confirmed in isMessageConfirmed) { - logger.LogDebug("Udp client {ConnectionToken} receive text len = {Length}", ConnectionToken, - receiveBytes.Length); - logger.LogDebug( - $"Udp client {ConnectionToken} receive: {String.Concat(receiveBytes.Select(p => " " + p.ToString("X2")))}"); - var isMessageConfirmed = Controller.ConfirmMessage(receiveBytes); - if (isMessageConfirmed != null) + if (confirmed.Item2 == false) { - foreach (var confirmed in isMessageConfirmed) + var sendMessage = InvokeReturnMessage(confirmed.Item1); + //主动传输事件 + if (sendMessage != null) { - if (confirmed.Item2 == false) - { - var sendMessage = InvokeReturnMessage(confirmed.Item1); - //主动传输事件 - if (sendMessage != null) - { - await SendMsgWithoutConfirm(sendMessage); - } - } + await SendMsgWithoutConfirm(sendMessage); } } } - - RefreshReceiveCount(); } + + RefreshReceiveCount(); } } catch (ObjectDisposedException) @@ -245,7 +234,10 @@ namespace Modbus.Net catch (Exception err) { logger.LogError(err, "Udp client {ConnectionToken} receive exception", ConnectionToken); - //CloseClientSocket(); + + RefreshErrorCount(); + + await CloseClientSocket(); } } @@ -267,25 +259,25 @@ namespace Modbus.Net logger.LogDebug("Udp client {ConnectionToken} error count: {ErrorCount}", ConnectionToken, _errorCount); } - private void CloseClientSocket() + private async Task CloseClientSocket() { try { Controller.SendStop(); Controller.Clear(); - ReceiveMsgThreadStop(); - if (_socketClient != null) + if (Channel != null) { - if (_socketClient.Client?.Connected == true) + if (Channel.Active) { - _socketClient.Client.Disconnect(false); + await Channel.CloseAsync(); } - _socketClient.Close(); } } catch (Exception ex) { logger.LogError(ex, "Udp client {ConnectionToken} client close exception", ConnectionToken); + + RefreshErrorCount(); } } } diff --git a/Modbus.Net/Modbus.Net/Modbus.Net.csproj b/Modbus.Net/Modbus.Net/Modbus.Net.csproj index 1e84a1d..a5e7f46 100644 --- a/Modbus.Net/Modbus.Net/Modbus.Net.csproj +++ b/Modbus.Net/Modbus.Net/Modbus.Net.csproj @@ -5,7 +5,7 @@ Modbus.Net Modbus.Net Modbus.Net - 1.4.0-beta04 + 1.4.1-beta05 Modbus.Net Chris L.(Luo Sheng) Hangzhou Delian Science Technology Co.,Ltd. @@ -29,6 +29,7 @@ +