This repository has been archived by the owner on Dec 18, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 445
/
Copy pathHttpConnection.cs
469 lines (402 loc) · 17.8 KB
/
HttpConnection.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.IO.Pipelines;
using System.Linq;
using System.Net.Http;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Connections.Features;
using Microsoft.AspNetCore.Http.Connections.Client.Internal;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
namespace Microsoft.AspNetCore.Http.Connections.Client
{
public partial class HttpConnection : ConnectionContext, IConnectionInherentKeepAliveFeature
{
private static readonly TimeSpan HttpClientTimeout = TimeSpan.FromSeconds(120);
#if !NETCOREAPP2_1
private static readonly Version Windows8Version = new Version(6, 2);
#endif
private readonly ILogger _logger;
private readonly SemaphoreSlim _connectionLock = new SemaphoreSlim(1, 1);
private bool _started;
private bool _disposed;
private bool _hasInherentKeepAlive;
private readonly HttpClient _httpClient;
private readonly HttpConnectionOptions _httpConnectionOptions;
private ITransport _transport;
private readonly ITransportFactory _transportFactory;
private string _connectionId;
private readonly ConnectionLogScope _logScope;
private readonly IDisposable _scopeDisposable;
private readonly ILoggerFactory _loggerFactory;
public override IDuplexPipe Transport
{
get
{
CheckDisposed();
if (_transport == null)
{
throw new InvalidOperationException($"Cannot access the {nameof(Transport)} pipe before the connection has started.");
}
return _transport;
}
set => throw new NotSupportedException("The transport pipe isn't settable.");
}
public override IFeatureCollection Features { get; } = new FeatureCollection();
public override string ConnectionId { get; set; }
public override IDictionary<object, object> Items { get; set; } = new ConnectionItems();
bool IConnectionInherentKeepAliveFeature.HasInherentKeepAlive => _hasInherentKeepAlive;
public HttpConnection(Uri url)
: this(url, HttpTransports.All)
{ }
public HttpConnection(Uri url, HttpTransportType transports)
: this(url, transports, loggerFactory: null)
{
}
public HttpConnection(Uri url, HttpTransportType transports, ILoggerFactory loggerFactory)
: this(CreateHttpOptions(url, transports), loggerFactory)
{
}
private static HttpConnectionOptions CreateHttpOptions(Uri url, HttpTransportType transports)
{
if (url == null)
{
throw new ArgumentNullException(nameof(url));
}
return new HttpConnectionOptions { Url = url, Transports = transports };
}
public HttpConnection(HttpConnectionOptions httpConnectionOptions, ILoggerFactory loggerFactory)
{
if (httpConnectionOptions.Url == null)
{
throw new ArgumentException("Options does not have a URL specified.", nameof(httpConnectionOptions));
}
_loggerFactory = loggerFactory ?? NullLoggerFactory.Instance;
_logger = _loggerFactory.CreateLogger<HttpConnection>();
_httpConnectionOptions = httpConnectionOptions;
if (httpConnectionOptions.Transports != HttpTransportType.WebSockets)
{
_httpClient = CreateHttpClient();
}
_transportFactory = new DefaultTransportFactory(httpConnectionOptions.Transports, _loggerFactory, _httpClient, httpConnectionOptions);
_logScope = new ConnectionLogScope();
_scopeDisposable = _logger.BeginScope(_logScope);
Features.Set<IConnectionInherentKeepAliveFeature>(this);
}
// Used by unit tests
internal HttpConnection(HttpConnectionOptions httpConnectionOptions, ILoggerFactory loggerFactory, ITransportFactory transportFactory)
: this(httpConnectionOptions, loggerFactory)
{
_transportFactory = transportFactory;
}
public async Task StartAsync()
{
await StartAsync(TransferFormat.Binary);
}
public async Task StartAsync(TransferFormat transferFormat)
{
await StartAsyncCore(transferFormat).ForceAsync();
}
private async Task StartAsyncCore(TransferFormat transferFormat)
{
CheckDisposed();
if (_started)
{
Log.SkippingStart(_logger);
return;
}
await _connectionLock.WaitAsync();
try
{
CheckDisposed();
if (_started)
{
Log.SkippingStart(_logger);
return;
}
Log.Starting(_logger);
await SelectAndStartTransport(transferFormat);
_started = true;
Log.Started(_logger);
}
finally
{
_connectionLock.Release();
}
}
public async Task DisposeAsync() => await DisposeAsyncCore().ForceAsync();
private async Task DisposeAsyncCore()
{
if (_disposed)
{
return;
}
await _connectionLock.WaitAsync();
try
{
if (!_disposed && _started)
{
Log.DisposingHttpConnection(_logger);
// Stop the transport, but we don't care if it throws.
// The transport should also have completed the pipe with this exception.
try
{
await _transport.StopAsync();
}
catch (Exception ex)
{
Log.TransportThrewExceptionOnStop(_logger, ex);
}
Log.Disposed(_logger);
}
else
{
Log.SkippingDispose(_logger);
}
_httpClient?.Dispose();
}
finally
{
// We want to do these things even if the WaitForWriterToComplete/WaitForReaderToComplete fails
if (!_disposed)
{
_scopeDisposable.Dispose();
_disposed = true;
}
_connectionLock.Release();
}
}
private async Task SelectAndStartTransport(TransferFormat transferFormat)
{
if (_httpConnectionOptions.Transports == HttpTransportType.WebSockets)
{
Log.StartingTransport(_logger, _httpConnectionOptions.Transports, _httpConnectionOptions.Url);
await StartTransport(_httpConnectionOptions.Url, _httpConnectionOptions.Transports, transferFormat);
}
else
{
var negotiationResponse = await GetNegotiationResponse();
// This should only need to happen once
var connectUrl = CreateConnectUrl(_httpConnectionOptions.Url, negotiationResponse.ConnectionId);
// We're going to search for the transfer format as a string because we don't want to parse
// all the transfer formats in the negotiation response, and we want to allow transfer formats
// we don't understand in the negotiate response.
var transferFormatString = transferFormat.ToString();
foreach (var transport in negotiationResponse.AvailableTransports)
{
if (!Enum.TryParse<HttpTransportType>(transport.Transport, out var transportType))
{
Log.TransportNotSupported(_logger, transport.Transport);
continue;
}
if (transportType == HttpTransportType.WebSockets && !IsWebSocketsSupported())
{
Log.WebSocketsNotSupportedByOperatingSystem(_logger);
continue;
}
try
{
if ((transportType & _httpConnectionOptions.Transports) == 0)
{
Log.TransportDisabledByClient(_logger, transportType);
}
else if (!transport.TransferFormats.Contains(transferFormatString, StringComparer.Ordinal))
{
Log.TransportDoesNotSupportTransferFormat(_logger, transportType, transferFormat);
}
else
{
// The negotiation response gets cleared in the fallback scenario.
if (negotiationResponse == null)
{
negotiationResponse = await GetNegotiationResponse();
connectUrl = CreateConnectUrl(_httpConnectionOptions.Url, negotiationResponse.ConnectionId);
}
Log.StartingTransport(_logger, transportType, connectUrl);
await StartTransport(connectUrl, transportType, transferFormat);
break;
}
}
catch (Exception ex)
{
Log.TransportFailed(_logger, transportType, ex);
// Try the next transport
// Clear the negotiation response so we know to re-negotiate.
negotiationResponse = null;
}
}
}
if (_transport == null)
{
throw new InvalidOperationException("Unable to connect to the server with any of the available transports.");
}
}
private async Task<NegotiationResponse> Negotiate(Uri url, HttpClient httpClient, ILogger logger)
{
try
{
// Get a connection ID from the server
Log.EstablishingConnection(logger, url);
var urlBuilder = new UriBuilder(url);
if (!urlBuilder.Path.EndsWith("/"))
{
urlBuilder.Path += "/";
}
urlBuilder.Path += "negotiate";
using (var request = new HttpRequestMessage(HttpMethod.Post, urlBuilder.Uri))
{
// Corefx changed the default version and High Sierra curlhandler tries to upgrade request
request.Version = new Version(1, 1);
// ResponseHeadersRead instructs SendAsync to return once headers are read
// rather than buffer the entire response. This gives a small perf boost.
// Note that it is important to dispose of the response when doing this to
// avoid leaving the connection open.
using (var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead))
{
response.EnsureSuccessStatusCode();
NegotiationResponse negotiateResponse;
using (var responseStream = await response.Content.ReadAsStreamAsync())
{
negotiateResponse = NegotiateProtocol.ParseResponse(responseStream);
}
Log.ConnectionEstablished(_logger, negotiateResponse.ConnectionId);
return negotiateResponse;
}
}
}
catch (Exception ex)
{
Log.ErrorWithNegotiation(logger, url, ex);
throw;
}
}
private static Uri CreateConnectUrl(Uri url, string connectionId)
{
if (string.IsNullOrWhiteSpace(connectionId))
{
throw new FormatException("Invalid connection id.");
}
return Utils.AppendQueryString(url, "id=" + connectionId);
}
private async Task StartTransport(Uri connectUrl, HttpTransportType transportType, TransferFormat transferFormat)
{
// Construct the transport
var transport = _transportFactory.CreateTransport(transportType);
// Start the transport, giving it one end of the pipe
try
{
await transport.StartAsync(connectUrl, transferFormat);
}
catch (Exception ex)
{
Log.ErrorStartingTransport(_logger, transportType, ex);
_transport = null;
throw;
}
// Disable keep alives for long polling
_hasInherentKeepAlive = transportType == HttpTransportType.LongPolling;
// We successfully started, set the transport properties (we don't want to set these until the transport is definitely running).
_transport = transport;
Log.TransportStarted(_logger, transportType);
}
private HttpClient CreateHttpClient()
{
var httpClientHandler = new HttpClientHandler();
HttpMessageHandler httpMessageHandler = httpClientHandler;
if (_httpConnectionOptions != null)
{
if (_httpConnectionOptions.Proxy != null)
{
httpClientHandler.Proxy = _httpConnectionOptions.Proxy;
}
if (_httpConnectionOptions.Cookies != null)
{
httpClientHandler.CookieContainer = _httpConnectionOptions.Cookies;
}
if (_httpConnectionOptions.ClientCertificates != null)
{
httpClientHandler.ClientCertificates.AddRange(_httpConnectionOptions.ClientCertificates);
}
if (_httpConnectionOptions.UseDefaultCredentials != null)
{
httpClientHandler.UseDefaultCredentials = _httpConnectionOptions.UseDefaultCredentials.Value;
}
if (_httpConnectionOptions.Credentials != null)
{
httpClientHandler.Credentials = _httpConnectionOptions.Credentials;
}
httpMessageHandler = httpClientHandler;
if (_httpConnectionOptions.HttpMessageHandlerFactory != null)
{
httpMessageHandler = _httpConnectionOptions.HttpMessageHandlerFactory(httpClientHandler);
if (httpMessageHandler == null)
{
throw new InvalidOperationException("Configured HttpMessageHandlerFactory did not return a value.");
}
}
// Apply the authorization header in a handler instead of a default header because it can change with each request
if (_httpConnectionOptions.AccessTokenProvider != null)
{
httpMessageHandler = new AccessTokenHttpMessageHandler(httpMessageHandler, _httpConnectionOptions.AccessTokenProvider);
}
}
// Wrap message handler after HttpMessageHandlerFactory to ensure not overriden
httpMessageHandler = new LoggingHttpMessageHandler(httpMessageHandler, _loggerFactory);
var httpClient = new HttpClient(httpMessageHandler);
httpClient.Timeout = HttpClientTimeout;
// Start with the user agent header
httpClient.DefaultRequestHeaders.UserAgent.Add(Constants.UserAgentHeader);
// Apply any headers configured on the HttpConnectionOptions
if (_httpConnectionOptions?.Headers != null)
{
foreach (var header in _httpConnectionOptions.Headers)
{
httpClient.DefaultRequestHeaders.Add(header.Key, header.Value);
}
}
httpClient.DefaultRequestHeaders.Remove("X-Requested-With");
// Tell auth middleware to 401 instead of redirecting
httpClient.DefaultRequestHeaders.Add("X-Requested-With", "XMLHttpRequest");
return httpClient;
}
private void CheckDisposed()
{
if (_disposed)
{
throw new ObjectDisposedException(nameof(HttpConnection));
}
}
private static bool IsWebSocketsSupported()
{
#if NETCOREAPP2_1
// .NET Core 2.1 and above has a managed implementation
return true;
#else
var isWindows = RuntimeInformation.IsOSPlatform(OSPlatform.Windows);
if (!isWindows)
{
// Assume other OSes have websockets
return true;
}
else
{
// Windows 8 and above has websockets
return Environment.OSVersion.Version >= Windows8Version;
}
#endif
}
private async Task<NegotiationResponse> GetNegotiationResponse()
{
var negotiationResponse = await Negotiate(_httpConnectionOptions.Url, _httpClient, _logger);
_connectionId = negotiationResponse.ConnectionId;
_logScope.ConnectionId = _connectionId;
return negotiationResponse;
}
}
}