maxRunningTask reopen

This commit is contained in:
parallelbgls
2016-09-14 11:10:25 +08:00
parent c0984d5c3f
commit 4da73f2055

View File

@@ -23,7 +23,7 @@ namespace Modbus.Net
public static int Restore = 0; public static int Restore = 0;
} }
/*public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
{ {
/// <summary> /// <summary>
/// Whether the current thread is processing work items. /// Whether the current thread is processing work items.
@@ -172,7 +172,7 @@ namespace Modbus.Net
if (lockTaken) Monitor.Exit(_tasks); if (lockTaken) Monitor.Exit(_tasks);
} }
} }
}*/ }
public class TaskManager public class TaskManager
{ {
@@ -184,9 +184,10 @@ namespace Modbus.Net
/// 不在运行的设备 /// 不在运行的设备
/// </summary> /// </summary>
private HashSet<BaseMachine> _unlinkedMachines; private HashSet<BaseMachine> _unlinkedMachines;
//private TaskFactory<Dictionary<string,ReturnUnit>> _tasks;
//private TaskScheduler _scheduler; private TaskFactory _tasks;
//private CancellationTokenSource _cts; private TaskScheduler _scheduler;
private CancellationTokenSource _cts;
/// <summary> /// <summary>
/// 正常读取的计时器 /// 正常读取的计时器
@@ -309,7 +310,7 @@ namespace Modbus.Net
public MachineGetDataType GetDataType { get; set; } public MachineGetDataType GetDataType { get; set; }
public MachineSetDataType SetDataType { get; set; } public MachineSetDataType SetDataType { get; set; }
/*public int MaxRunningTasks public int MaxRunningTasks
{ {
get { return _scheduler.MaximumConcurrencyLevel; } get { return _scheduler.MaximumConcurrencyLevel; }
set set
@@ -317,7 +318,7 @@ namespace Modbus.Net
TaskStop(); TaskStop();
_scheduler = new LimitedConcurrencyLevelTaskScheduler(value); _scheduler = new LimitedConcurrencyLevelTaskScheduler(value);
} }
}*/ }
/// <summary> /// <summary>
/// 构造一个TaskManager /// 构造一个TaskManager
@@ -326,9 +327,9 @@ namespace Modbus.Net
/// <param name="getCycle">读取数据的时间间隔(秒)</param> /// <param name="getCycle">读取数据的时间间隔(秒)</param>
/// <param name="keepConnect">读取数据后是否保持连接</param> /// <param name="keepConnect">读取数据后是否保持连接</param>
/// <param name="dataType">获取与设置数据的方式</param> /// <param name="dataType">获取与设置数据的方式</param>
public TaskManager(/*int maxRunningTask,*/ int getCycle, bool keepConnect, MachineDataType dataType = MachineDataType.CommunicationTag) public TaskManager(int maxRunningTask, int getCycle, bool keepConnect, MachineDataType dataType = MachineDataType.CommunicationTag)
{ {
//_scheduler = new LimitedConcurrencyLevelTaskScheduler(maxRunningTask); _scheduler = new LimitedConcurrencyLevelTaskScheduler(maxRunningTask);
_machines = new HashSet<BaseMachine>(new BaseMachineEqualityComparer()); _machines = new HashSet<BaseMachine>(new BaseMachineEqualityComparer());
_unlinkedMachines = new HashSet<BaseMachine>(new BaseMachineEqualityComparer()); _unlinkedMachines = new HashSet<BaseMachine>(new BaseMachineEqualityComparer());
_getCycle = getCycle; _getCycle = getCycle;
@@ -480,16 +481,28 @@ namespace Modbus.Net
/// <returns></returns> /// <returns></returns>
private async Task MaintainTasksAsync() private async Task MaintainTasksAsync()
{ {
HashSet<BaseMachine> saveMachines = new HashSet<BaseMachine>(); try
IEnumerable<BaseMachine> saveMachinesEnum;
lock (_machines)
{ {
saveMachines.UnionWith(_machines); var tasks = new List<Task>();
saveMachinesEnum = saveMachines.ToList(); HashSet<BaseMachine> saveMachines = new HashSet<BaseMachine>();
IEnumerable<BaseMachine> saveMachinesEnum;
lock (_machines)
{
saveMachines.UnionWith(_machines);
saveMachinesEnum = saveMachines.ToList();
}
foreach (var machine in saveMachinesEnum)
{
CancellationTokenSource cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(_getCycle * 10));
var task = _tasks.StartNew(() => RunTask(machine).WithCancellation(cts.Token));
tasks.Add(task);
}
await Task.WhenAll(tasks);
} }
foreach (var machine in saveMachinesEnum) catch
{ {
await RunTask(machine); return;
} }
} }
@@ -499,23 +512,26 @@ namespace Modbus.Net
/// <returns></returns> /// <returns></returns>
private async Task MaintainTasks2Async() private async Task MaintainTasks2Async()
{ {
var tasks = new List<Task>();
HashSet<BaseMachine> saveMachines = new HashSet<BaseMachine>(); HashSet<BaseMachine> saveMachines = new HashSet<BaseMachine>();
lock (_unlinkedMachines) lock (_unlinkedMachines)
{ {
saveMachines.UnionWith(_unlinkedMachines); saveMachines.UnionWith(_unlinkedMachines);
} }
foreach (var machine in saveMachines) try
{ {
try foreach (var machine in saveMachines)
{ {
CancellationTokenSource cts = new CancellationTokenSource(); CancellationTokenSource cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(_getCycle * 10)); cts.CancelAfter(TimeSpan.FromSeconds(_getCycle * 10));
await RunTask(machine).WithCancellation(cts.Token); var task = _tasks.StartNew(() => RunTask(machine).WithCancellation(cts.Token));
} tasks.Add(task);
catch
{
return;
} }
await Task.WhenAll();
}
catch
{
return;
} }
} }
@@ -543,8 +559,8 @@ namespace Modbus.Net
public void TaskStart() public void TaskStart()
{ {
TaskStop(); TaskStop();
//_cts = new CancellationTokenSource(); _cts = new CancellationTokenSource();
//_tasks = new TaskFactory<Dictionary<string,ReturnUnit>>(_cts.Token, TaskCreationOptions.None, TaskContinuationOptions.None, _scheduler); _tasks = new TaskFactory(_cts.Token, TaskCreationOptions.None, TaskContinuationOptions.None, _scheduler);
GetCycle = TimeRestore.Restore; GetCycle = TimeRestore.Restore;
} }
@@ -556,10 +572,10 @@ namespace Modbus.Net
lock (_machines) lock (_machines)
{ {
GetCycle = Timeout.Infinite; GetCycle = Timeout.Infinite;
//if (_cts != null) if (_cts != null)
//{ {
// _cts.Cancel(); _cts.Cancel();
//} }
if (_machines != null) if (_machines != null)
{ {
foreach (var machine in _machines) foreach (var machine in _machines)
@@ -567,7 +583,7 @@ namespace Modbus.Net
machine.Disconnect(); machine.Disconnect();
} }
} }
//_tasks = null; _tasks = null;
} }
} }