diff --git a/CHANGELOG.md b/CHANGELOG.md index 612a6f74e..95bacf5d6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ ## 1.8.0 [unreleased] +### Bug Fixes + +1. [#264](https://github.com/influxdata/influxdb-client-js/pull/264): Require 204 status code in a write response. + ## 1.7.0 [2020-10-02] ### Features diff --git a/packages/core/src/impl/WriteApiImpl.ts b/packages/core/src/impl/WriteApiImpl.ts index 5d8bb9161..9f49758f9 100644 --- a/packages/core/src/impl/WriteApiImpl.ts +++ b/packages/core/src/impl/WriteApiImpl.ts @@ -5,7 +5,7 @@ import { WriteOptions, WritePrecisionType, } from '../options' -import {Transport, SendOptions} from '../transport' +import {Transport, SendOptions, Headers} from '../transport' import Logger from './Logger' import {HttpError, RetryDelayStrategy} from '../errors' import Point from '../Point' @@ -132,7 +132,11 @@ export default class WriteApiImpl implements WriteApi, PointSettings { const self: WriteApiImpl = this if (!this.closed && lines.length > 0) { return new Promise((resolve, reject) => { + let responseStatusCode: number | undefined this.transport.send(this.httpPath, lines.join('\n'), this.sendOptions, { + responseStarted(_headers: Headers, statusCode?: number): void { + responseStatusCode = statusCode + }, error(error: Error): void { const failedAttempts = self.writeOptions.maxRetries + 2 - attempts // call the writeFailed listener and check if we can retry @@ -170,7 +174,19 @@ export default class WriteApiImpl implements WriteApi, PointSettings { }, complete(): void { self.retryStrategy.success() - resolve() + // older implementations of transport do not report status code + if (responseStatusCode == 204 || responseStatusCode == undefined) { + resolve() + } else { + const error = new HttpError( + responseStatusCode, + `204 HTTP response status code expected, but ${responseStatusCode} returned`, + undefined, + '0' + ) + Logger.error(`Write to InfluxDB failed.`, error) + reject(error) + } }, }) }) @@ -212,7 +228,8 @@ export default class WriteApiImpl implements WriteApi, PointSettings { throw new Error('writeApi: already closed!') } for (let i = 0; i < points.length; i++) { - this.writePoint(points[i]) + const line = points[i].toLineProtocol(this) + if (line) this.writeBuffer.add(line) } } async flush(withRetryBuffer?: boolean): Promise { diff --git a/packages/core/src/impl/browser/FetchTransport.ts b/packages/core/src/impl/browser/FetchTransport.ts index 697e6bdc9..fd1d6f0e5 100644 --- a/packages/core/src/impl/browser/FetchTransport.ts +++ b/packages/core/src/impl/browser/FetchTransport.ts @@ -18,6 +18,7 @@ import {CLIENT_LIB_VERSION} from '../version' export default class FetchTransport implements Transport { chunkCombiner = pureJsChunkCombiner private defaultHeaders: {[key: string]: string} + private url: string constructor(private connectionOptions: ConnectionOptions) { this.defaultHeaders = { 'content-type': 'application/json; charset=utf-8', @@ -27,6 +28,18 @@ export default class FetchTransport implements Transport { this.defaultHeaders['Authorization'] = 'Token ' + this.connectionOptions.token } + this.url = String(this.connectionOptions.url) + if (this.url.endsWith('/')) { + this.url = this.url.substring(0, this.url.length - 1) + } + // https://github.com/influxdata/influxdb-client-js/issues/263 + // don't allow /api/v2 suffix to avoid future problems + if (this.url.endsWith('/api/v2')) { + this.url = this.url.substring(0, this.url.length - '/api/v2'.length) + Logger.warn( + `Please remove '/api/v2' context path from InfluxDB base url, using ${this.url} !` + ) + } } send( path: string, @@ -67,7 +80,7 @@ export default class FetchTransport implements Transport { headers[key] = [previous, value] } }) - observer.responseStarted(headers) + observer.responseStarted(headers, response.status) } if (response.status >= 300) { return response @@ -160,7 +173,7 @@ export default class FetchTransport implements Transport { options: SendOptions ): Promise { const {method, headers, ...other} = options - return fetch(`${this.connectionOptions.url}${path}`, { + return fetch(`${this.url}${path}`, { method: method, body: method === 'GET' || method === 'HEAD' diff --git a/packages/core/src/impl/completeCommunicationObserver.ts b/packages/core/src/impl/completeCommunicationObserver.ts index 57035d7a4..1282e7462 100644 --- a/packages/core/src/impl/completeCommunicationObserver.ts +++ b/packages/core/src/impl/completeCommunicationObserver.ts @@ -30,8 +30,9 @@ export default function completeCommunicationObserver( if (callbacks.complete) callbacks.complete() } }, - responseStarted: (headers: Headers): void => { - if (callbacks.responseStarted) callbacks.responseStarted(headers) + responseStarted: (headers: Headers, statusCode?: number): void => { + if (callbacks.responseStarted) + callbacks.responseStarted(headers, statusCode) }, } return retVal diff --git a/packages/core/src/impl/node/NodeHttpTransport.ts b/packages/core/src/impl/node/NodeHttpTransport.ts index ba60fc1be..2c562ab8d 100644 --- a/packages/core/src/impl/node/NodeHttpTransport.ts +++ b/packages/core/src/impl/node/NodeHttpTransport.ts @@ -16,6 +16,7 @@ import nodeChunkCombiner from './nodeChunkCombiner' import zlib from 'zlib' import completeCommunicationObserver from '../completeCommunicationObserver' import {CLIENT_LIB_VERSION} from '../version' +import Logger from '../Logger' const zlibOptions = { flush: zlib.Z_SYNC_FLUSH, @@ -67,6 +68,15 @@ export class NodeHttpTransport implements Transport { this.contextPath.length - 1 ) } + // https://github.com/influxdata/influxdb-client-js/issues/263 + // don't allow /api/v2 suffix to avoid future problems + if (this.contextPath == '/api/v2') { + Logger.warn( + `Please remove '/api/v2' context path from InfluxDB base url, using ${url.protocol}//${url.hostname}:${url.port} !` + ) + this.contextPath = '' + } + if (url.protocol === 'http:') { this.requestApi = http.request } else if (url.protocol === 'https:') { @@ -205,7 +215,7 @@ export class NodeHttpTransport implements Transport { res.on('aborted', () => { listeners.error(new AbortError()) }) - listeners.responseStarted(res.headers) + listeners.responseStarted(res.headers, res.statusCode) /* istanbul ignore next statusCode is optional in http.IncomingMessage */ const statusCode = res.statusCode ?? 600 const contentEncoding = res.headers['content-encoding'] diff --git a/packages/core/src/transport.ts b/packages/core/src/transport.ts index a6e51bdc3..d0f133644 100644 --- a/packages/core/src/transport.ts +++ b/packages/core/src/transport.ts @@ -24,8 +24,9 @@ export interface CommunicationObserver { /** * Informs about a start of response processing. * @param headers - response HTTP headers + * @param statusCode - response status code */ - responseStarted?: (headers: Headers) => void + responseStarted?: (headers: Headers, statusCode?: number) => void /** * Setups cancelllable for this communication. */ diff --git a/packages/core/test/unit/WriteApi.test.ts b/packages/core/test/unit/WriteApi.test.ts index 12f026c52..52a82e5ca 100644 --- a/packages/core/test/unit/WriteApi.test.ts +++ b/packages/core/test/unit/WriteApi.test.ts @@ -2,6 +2,7 @@ import {expect} from 'chai' import nock from 'nock' // WARN: nock must be imported before NodeHttpTransport, since it modifies node's http import { ClientOptions, + HttpError, WritePrecision, WriteOptions, Point, @@ -123,7 +124,7 @@ describe('WriteApi', () => { subject.writeRecord('test value=1') subject.writeRecords(['test value=2', 'test value=3']) // wait for http calls to finish - await new Promise(resolve => setTimeout(resolve, 10)) + await new Promise(resolve => setTimeout(resolve, 20)) await subject.close().then(() => { expect(logs.error).to.length(1) expect(logs.warn).length(3) // 3 warnings about write failure @@ -164,7 +165,7 @@ describe('WriteApi', () => { it('uses the pre-configured batchSize', async () => { useSubject({flushInterval: 0, maxRetries: 0, batchSize: 2}) subject.writeRecords(['test value=1', 'test value=2', 'test value=3']) - await new Promise(resolve => setTimeout(resolve, 10)) // wait for HTTP to finish + await new Promise(resolve => setTimeout(resolve, 20)) // wait for HTTP to finish let count = subject.dispose() expect(logs.error).to.length(1) expect(logs.warn).to.length(0) @@ -236,7 +237,7 @@ describe('WriteApi', () => { return [429, '', {'retry-after': '1'}] } else { messages.push(_requestBody.toString()) - return [200, '', {'retry-after': '1'}] + return [204, '', {'retry-after': '1'}] } }) .persist() @@ -249,6 +250,7 @@ describe('WriteApi', () => { await new Promise(resolve => setTimeout(resolve, 10)) // wait for background flush and HTTP to finish expect(logs.error).to.length(0) expect(logs.warn).to.length(1) + subject.writePoint(new Point()) // ignored, since it generates no line subject.writePoints([ new Point('test'), // will be ignored + warning new Point('test').floatField('value', 2), @@ -284,5 +286,25 @@ describe('WriteApi', () => { expect(lines[4]).to.be.equal('test,xtra=1 value=6 3000000') expect(lines[5]).to.be.equal('test,xtra=1 value=7 false') }) + it('fails on write response status not being exactly 204', async () => { + // required because of https://github.com/influxdata/influxdb-client-js/issues/263 + useSubject({flushInterval: 5, maxRetries: 0, batchSize: 10}) + nock(clientOptions.url) + .post(WRITE_PATH_NS) + .reply((_uri, _requestBody) => { + return [200, '', {}] + }) + .persist() + subject.writePoint(new Point('test').floatField('value', 1)) + await new Promise(resolve => setTimeout(resolve, 20)) // wait for background flush and HTTP to finish + expect(logs.error).has.length(1) + expect(logs.error[0][0]).equals('Write to InfluxDB failed.') + expect(logs.error[0][1]).instanceOf(HttpError) + expect(logs.error[0][1].statusCode).equals(200) + expect(logs.error[0][1].statusMessage).equals( + `204 HTTP response status code expected, but 200 returned` + ) + expect(logs.warn).deep.equals([]) + }) }) }) diff --git a/packages/core/test/unit/impl/browser/FetchTransport.test.ts b/packages/core/test/unit/impl/browser/FetchTransport.test.ts index 1af475908..5c1f91273 100644 --- a/packages/core/test/unit/impl/browser/FetchTransport.test.ts +++ b/packages/core/test/unit/impl/browser/FetchTransport.test.ts @@ -4,6 +4,7 @@ import {removeFetchApi, emulateFetchApi} from './emulateBrowser' import sinon from 'sinon' import {CLIENT_LIB_VERSION} from '../../../../src/impl/version' import {SendOptions, Cancellable} from '../../../../src' +import {CollectedLogs, collectLogging} from '../../../util' describe('FetchTransport', () => { afterEach(() => { @@ -11,6 +12,13 @@ describe('FetchTransport', () => { }) describe('constructor', () => { + let logs: CollectedLogs + beforeEach(() => { + logs = collectLogging.replace() + }) + afterEach(async () => { + collectLogging.after() + }) it('creates the transport with url', () => { const options = { url: 'http://test:8086', @@ -35,6 +43,28 @@ describe('FetchTransport', () => { }) expect(transport.connectionOptions).to.deep.equal(options) }) + it('ignore last slash / in url', () => { + const options = { + url: 'http://test:8086/', + token: 'a', + } + const transport: any = new FetchTransport(options) + expect(transport.url).equals('http://test:8086') + }) + it('ignore /api/v2 suffix in url', () => { + const options = { + url: 'http://test:8086/api/v2', + token: 'a', + } + const transport: any = new FetchTransport(options) + expect(transport.url).equals('http://test:8086') + expect(logs.warn).is.deep.equal([ + [ + "Please remove '/api/v2' context path from InfluxDB base url, using http://test:8086 !", + undefined, + ], + ]) + }) }) describe('request', () => { const transport = new FetchTransport({url: 'http://test:8086'}) @@ -323,10 +353,13 @@ describe('FetchTransport', () => { cancellable.cancel() expect(cancellable.isCancelled()).is.equal(true) } + if (url === 'error') { + expect(callbacks.responseStarted.callCount).equals(0) + } else { + expect(callbacks.responseStarted.callCount).equals(1) + expect(callbacks.responseStarted.args[0][1]).equals(status) + } const isError = url === 'error' || status !== 200 - expect(callbacks.responseStarted.callCount).equals( - url === 'error' ? 0 : 1 - ) expect(callbacks.error.callCount).equals(isError ? 1 : 0) expect(callbacks.complete.callCount).equals(isError ? 0 : 1) const customNext = url.startsWith('customNext') diff --git a/packages/core/test/unit/impl/node/NodeHttpTransport.test.ts b/packages/core/test/unit/impl/node/NodeHttpTransport.test.ts index d96a03e01..cbaa39a55 100644 --- a/packages/core/test/unit/impl/node/NodeHttpTransport.test.ts +++ b/packages/core/test/unit/impl/node/NodeHttpTransport.test.ts @@ -10,6 +10,7 @@ import sinon from 'sinon' import {Readable} from 'stream' import zlib from 'zlib' import {CLIENT_LIB_VERSION} from '../../../../src/impl/version' +import {CollectedLogs, collectLogging} from '../../../util' function sendTestData( connectionOptions: ConnectionOptions, @@ -41,6 +42,13 @@ const TEST_URL = 'http://test:8086' describe('NodeHttpTransport', () => { describe('constructor', () => { + let logs: CollectedLogs + beforeEach(() => { + logs = collectLogging.replace() + }) + afterEach(async () => { + collectLogging.after() + }) it('creates the transport from http url', () => { const transport: any = new NodeHttpTransport({ url: 'http://test:8086', @@ -101,6 +109,19 @@ describe('NodeHttpTransport', () => { }) ).to.throw() }) + it('warn about unsupported /api/v2 context path', () => { + const transport: any = new NodeHttpTransport({ + url: 'http://test:8086/api/v2', + }) + // don;t use context path at all + expect(transport.contextPath).equals('') + expect(logs.warn).is.deep.equal([ + [ + "Please remove '/api/v2' context path from InfluxDB base url, using http://test:8086 !", + undefined, + ], + ]) + }) }) describe('send', () => { beforeEach(() => { @@ -133,6 +154,7 @@ describe('NodeHttpTransport', () => { const responseData = 'yes' it(`works with options ${JSON.stringify(extras)}`, async () => { const nextFn = sinon.fake() + const responseStartedFn = sinon.fake() await new Promise((resolve, reject) => { const timeout = setTimeout( () => reject(new Error('timeouted')), @@ -183,9 +205,8 @@ describe('NodeHttpTransport', () => { '', {...extras, method: 'POST'}, { - next(data: any) { - nextFn(data) - }, + responseStarted: responseStartedFn, + next: nextFn, error(error: any) { clearTimeout(timeout) reject(new Error('No error expected!, but: ' + error)) @@ -202,16 +223,22 @@ describe('NodeHttpTransport', () => { if (extras.cancel) { cancellable.cancel() } - }) - .then(() => { - expect(nextFn.called) + }).then( + () => { if (!extras.cancel) { + expect(nextFn.callCount).equals(1) + expect(responseStartedFn.callCount).equals(1) + expect(responseStartedFn.args[0][1]).equals(200) expect(nextFn.args[0][0].toString()).to.equal(responseData) + } else { + expect(nextFn.callCount).equals(0) + expect(responseStartedFn.callCount).equals(0) } - }) - .catch(e => { + }, + e => { expect.fail(undefined, e, e.toString()) - }) + } + ) }) } })