Commit 11806e33 authored by Andreas Müller's avatar Andreas Müller

Removed DeadLock on Dispose

parent ac21a3f9
Pipeline #36 passed with stage
in 31 seconds
using System;
using System.Threading;
using System.Threading.Tasks;
namespace AMWD.Modbus.Common.Util
{
// Source: https://stackoverflow.com/a/21851153
internal class FakeSynchronizationContext : SynchronizationContext
{
private static readonly ThreadLocal<FakeSynchronizationContext> context =
new ThreadLocal<FakeSynchronizationContext>(() => new FakeSynchronizationContext());
private FakeSynchronizationContext()
{
}
public static FakeSynchronizationContext Instance => context.Value;
public static void Execute(Action action)
{
var savedContext = Current;
SetSynchronizationContext(Instance);
try
{
action();
}
finally
{
SetSynchronizationContext(savedContext);
}
}
public static TResult Execute<TResult>(Func<TResult> action)
{
var savedContext = Current;
SetSynchronizationContext(Instance);
try
{
return action();
}
finally
{
SetSynchronizationContext(savedContext);
}
}
#region SynchronizationContext methods
public override SynchronizationContext CreateCopy()
{
return this;
}
public override void OperationStarted()
{
throw new NotImplementedException("OperationStarted");
}
public override void OperationCompleted()
{
throw new NotImplementedException("OperationCompleted");
}
public override void Post(SendOrPostCallback d, object state)
{
throw new NotImplementedException("Post");
}
public override void Send(SendOrPostCallback d, object state)
{
throw new NotImplementedException("Send");
}
#endregion SynchronizationContext methods
}
/// <summary>
/// Source: https://stackoverflow.com/a/21851153
/// </summary>
public static class FakeSynchronizationContextExtensions
{
/// <summary>
/// Transitions the underlying <see cref="Task"/> into the
/// <see cref="TaskStatus.RanToCompletion"/> state. The awaiting code is executed
/// asynchronously so it won't block the caller of this method.
/// </summary>
/// <typeparam name="T">The type of the result.</typeparam>
/// <param name="tcs"></param>
/// <param name="result">The result value to bind to this <see cref="Task"/>.</param>
public static void SetResultAsync<T>(this TaskCompletionSource<T> tcs, T result)
{
FakeSynchronizationContext.Execute(() => tcs.SetResult(result));
}
/// <summary>
/// Attempts to transition the underlying <see cref="Task"/> into the
/// <see cref="TaskStatus.RanToCompletion"/> state. The awaiting code is executed
/// asynchronously so it won't block the caller of this method.
/// </summary>
/// <typeparam name="T">The type of the result.</typeparam>
/// <param name="tcs"></param>
/// <param name="result">The result value to bind to this <see cref="Task"/>.</param>
/// <returns>True if the operation was successful; otherwise, false.</returns>
public static bool TrySetResultAsync<T>(this TaskCompletionSource<T> tcs, T result)
{
return FakeSynchronizationContext.Execute(() => tcs.TrySetResult(result));
}
}
}
......@@ -38,8 +38,8 @@ namespace AMWD.Modbus.Tcp.Client
private Task receiveTask;
// Transaction handling
private readonly object syncLock = new object();
private ushort transactionId = 0;
private readonly SemaphoreSlim sendMutex = new SemaphoreSlim(1, 1);
private readonly ConcurrentDictionary<ushort, TaskCompletionSource<Response>> awaitedResponses = new ConcurrentDictionary<ushort, TaskCompletionSource<Response>>();
#endregion Fields
......@@ -979,7 +979,7 @@ namespace AMWD.Modbus.Tcp.Client
#region Private Methods
private async void ReceiveLoop(CancellationToken ct)
private async Task ReceiveLoop(CancellationToken ct)
{
logger?.LogInformation("ModbusClient.ReceiveLoop started");
var reported = false;
......@@ -1008,7 +1008,7 @@ namespace AMWD.Modbus.Tcp.Client
SpinWait.SpinUntil(() => stream.DataAvailable || ct.IsCancellationRequested);
if (ct.IsCancellationRequested)
{
continue;
break;
}
var bytes = new List<byte>();
......@@ -1024,7 +1024,7 @@ namespace AMWD.Modbus.Tcp.Client
while (expectedCount > 0 && !ct.IsCancellationRequested);
if (ct.IsCancellationRequested)
{
continue;
break;
}
var lenBytes = bytes.Skip(4).Take(2).ToArray();
......@@ -1044,13 +1044,13 @@ namespace AMWD.Modbus.Tcp.Client
while (expectedCount > 0 && !ct.IsCancellationRequested);
if (ct.IsCancellationRequested)
{
continue;
break;
}
var response = new Response(bytes.ToArray());
if (awaitedResponses.TryRemove(response.TransactionId, out var tcs))
{
tcs.TrySetResult(response);
tcs.TrySetResultAsync(response);
}
}
catch (ObjectDisposedException) when (ct.IsCancellationRequested)
......@@ -1059,7 +1059,7 @@ namespace AMWD.Modbus.Tcp.Client
}
catch (InvalidOperationException) when (isReconnecting)
{
// server broken
// connection broken
}
catch (Exception ex)
{
......@@ -1099,12 +1099,13 @@ namespace AMWD.Modbus.Tcp.Client
{
try
{
await sendMutex.WaitAsync(ct);
request.TransactionId = transactionId;
transactionId++;
lock (syncLock)
{
request.TransactionId = transactionId;
transactionId++;
awaitedResponses[request.TransactionId] = tcs;
awaitedResponses[request.TransactionId] = tcs;
}
logger?.LogTrace(request.ToString());
......@@ -1116,7 +1117,8 @@ namespace AMWD.Modbus.Tcp.Client
if (await Task.WhenAny(tcs.Task, Task.Delay(ReceiveTimeout, ct)) == tcs.Task && !ct.IsCancellationRequested)
{
logger?.LogTrace("ModbusClient.SendRequest - Response received");
return await tcs.Task;
var response = await tcs.Task;
return response;
}
}
}
......@@ -1124,10 +1126,6 @@ namespace AMWD.Modbus.Tcp.Client
{
logger?.LogError(ex, $"ModbusClient.SendRequest - Transaction {request.TransactionId}");
}
finally
{
sendMutex.Release();
}
return new Response(new byte[] { 0, 0, 0, 0, 0, 0 });
}
......@@ -1282,11 +1280,6 @@ namespace AMWD.Modbus.Tcp.Client
DisconnectInternal(true)
.GetAwaiter()
.GetResult();
if (logger is IDisposable dl)
{
dl?.Dispose();
}
}
#endregion IDisposable implementation
......
......@@ -683,7 +683,7 @@ namespace UnitTests
// Time for the scheduler to launch a thread to start the reconnect
private async Task EnsureWait()
{
await Task.Delay(10);
await Task.Delay(1);
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment