using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; namespace ModBus.Net { public static class TimeRestore { public static int Restore = 0; } public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler { /// /// Whether the current thread is processing work items. /// [ThreadStatic] private static bool _currentThreadIsProcessingItems; /// /// The list of tasks to be executed. /// private readonly LinkedList _tasks = new LinkedList(); // protected by lock(_tasks) /// /// The maximum concurrency level allowed by this scheduler. /// private readonly int _maxDegreeOfParallelism; /// /// Whether the scheduler is currently processing work items. /// private int _delegatesQueuedOrRunning = 0; // protected by lock(_tasks) /// /// Initializes an instance of the LimitedConcurrencyLevelTaskScheduler class with the /// specified degree of parallelism. /// /// /// The maximum degree of parallelism provided by this scheduler. /// public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism) { if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism"); _maxDegreeOfParallelism = maxDegreeOfParallelism; } /// /// Queues a task to the scheduler. /// /// /// The task to be queued. /// protected sealed override void QueueTask(Task task) { // Add the task to the list of tasks to be processed. If there aren't enough // delegates currently queued or running to process tasks, schedule another. lock (_tasks) { _tasks.AddLast(task); if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism) { ++_delegatesQueuedOrRunning; NotifyThreadPoolOfPendingWork(); } } } /// /// Informs the ThreadPool that there's work to be executed for this scheduler. /// private void NotifyThreadPoolOfPendingWork() { ThreadPool.UnsafeQueueUserWorkItem(_ => { // Note that the current thread is now processing work items. // This is necessary to enable inlining of tasks into this thread. _currentThreadIsProcessingItems = true; try { // Process all available items in the queue. while (true) { Task item; lock (_tasks) { // When there are no more items to be processed, // note that we're done processing, and get out. if (_tasks.Count == 0) { --_delegatesQueuedOrRunning; break; } // Get the next item from the queue item = _tasks.First.Value; _tasks.RemoveFirst(); } // Execute the task we pulled out of the queue base.TryExecuteTask(item); } } // We're done processing items on the current thread finally { _currentThreadIsProcessingItems = false; } }, null); } /// Attempts to execute the specified task on the current thread. /// The task to be executed. /// /// Whether the task could be executed on the current thread. protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) { // If this thread isn't already processing a task, we don't support inlining if (!_currentThreadIsProcessingItems) return false; // If the task was previously queued, remove it from the queue if (taskWasPreviouslyQueued) TryDequeue(task); // Try to run the task. return base.TryExecuteTask(task); } /// /// Attempts to remove a previously scheduled task from the scheduler. /// /// /// The task to be removed. /// /// /// Whether the task could be found and removed. /// protected sealed override bool TryDequeue(Task task) { lock (_tasks) return _tasks.Remove(task); } /// /// Gets the maximum concurrency level supported by this scheduler. /// public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; } } /// /// Gets an enumerable of the tasks currently scheduled on this scheduler. /// /// /// An enumerable of the tasks currently scheduled. /// protected sealed override IEnumerable GetScheduledTasks() { bool lockTaken = false; try { Monitor.TryEnter(_tasks, ref lockTaken); if (lockTaken) return _tasks.ToArray(); else throw new NotSupportedException(); } finally { if (lockTaken) Monitor.Exit(_tasks); } } } public class TaskManager { private HashSet _machines; private TaskFactory> _tasks; private TaskScheduler _scheduler; private CancellationTokenSource _cts; private Timer _timer; private bool _keepConnect; public bool KeepConnect { get { return _keepConnect; } set { TaskStop(); _keepConnect = value; lock (_machines) { foreach (var machine in _machines) { machine.KeepConnect = _keepConnect; } } } } public delegate void ReturnValuesDelegate(KeyValuePair> returnValue); public event ReturnValuesDelegate ReturnValues; private int _getCycle; /// /// 毫秒 /// public int GetCycle { get { return _getCycle; } set { if (value == _getCycle) return; if (value == Timeout.Infinite) { if (_timer != null) { _timer.Change(Timeout.Infinite, Timeout.Infinite); } } else if (value < 0) return; else { if (_timer != null) { _timer.Change(Timeout.Infinite, Timeout.Infinite); _timer.Dispose(); _timer = null; } if (value > 0) { _getCycle = value; } _timer = new Timer(MaintainTasks, null, 0, _getCycle * 1000); //MaintainTasks(null); } } } public int MaxRunningTasks { get { return _scheduler.MaximumConcurrencyLevel; } set { TaskStop(); _scheduler = new LimitedConcurrencyLevelTaskScheduler(value); } } public TaskManager(int maxRunningTask, int getCycle, bool keepConnect) { _scheduler = new LimitedConcurrencyLevelTaskScheduler(maxRunningTask); _machines = new HashSet(new BaseMachineEqualityComparer()); _getCycle = getCycle; KeepConnect = keepConnect; } public void AddMachine(BaseMachine machine) { machine.KeepConnect = KeepConnect; lock (_machines) { _machines.Add(machine); } } public void AddMachines(IEnumerable machines) { lock (_machines) { foreach (var machine in machines) { AddMachine(machine); } } } public void RemoveMachineWithToken(string machineToken) { lock (_machines) { _machines.RemoveWhere(p => p.ConnectionToken == machineToken); } } public void RemoveMachineWithId(int id) { lock (_machines) { _machines.RemoveWhere(p => p.Id == id); } } public void RemoveMachine(BaseMachine machine) { lock (_machines) { _machines.Remove(machine); } } private void MaintainTasks(object sender) { AsyncHelper.RunSync(MaintainTasksAsync); } private async Task MaintainTasksAsync() { HashSet saveMachines = new HashSet(); lock (_machines) { saveMachines.UnionWith(_machines); } foreach (var machine in saveMachines) { await RunTask(machine); } } public void TaskStart() { TaskStop(); _cts = new CancellationTokenSource(); _tasks = new TaskFactory>(_cts.Token, TaskCreationOptions.None, TaskContinuationOptions.None, _scheduler); GetCycle = TimeRestore.Restore; } public void TaskStop() { GetCycle = Timeout.Infinite; if (_cts != null) { _cts.Cancel(); } if (_machines != null) { lock (_machines) { foreach (var machine in _machines) { machine.Disconnect(); } } } } private async Task RunTask(BaseMachine machine) { try { //var ans = machine.GetDatas(); CancellationTokenSource cts = new CancellationTokenSource(); cts.CancelAfter(TimeSpan.FromSeconds(_getCycle * 2)); var ans = await _tasks.StartNew(machine.GetDatas, cts.Token); if (ReturnValues != null) { ReturnValues(new KeyValuePair>(machine.Id, ans)); } } catch (Exception) { if (ReturnValues != null) { ReturnValues(new KeyValuePair>(machine.Id, null)); } } } } }