Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs(examples): add writeAdvanced.js example #229

Merged
merged 12 commits into from
Aug 3, 2020
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
1. [#220](https://github.com/influxdata/influxdb-client-js/pull/220): Generate API constructor with InfluxDB parameter.
1. [#222](https://github.com/influxdata/influxdb-client-js/pull/222): Improve and validate code documentation.
1. [#224](https://github.com/influxdata/influxdb-client-js/pull/224): Generate API documentation for gh-pages.
1. [#224](https://github.com/influxdata/influxdb-client-js/pull/224): Include doc generation into CI and release process
1. [#224](https://github.com/influxdata/influxdb-client-js/pull/224): Include doc generation into CI and release process.
1. [#229](https://github.com/influxdata/influxdb-client-js/pull/229): Add writeAdvanced.js example.

## 1.5.0 [2020-07-17]

Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@ The following examples helps to start quickly with this client:

There are also more advanced [examples](./examples/README.md) that shows

- how to create a bucket
- how to execute parameterized queries
- how to use this client with InfluxDB 1.8+
- how to use this client in the browser
- how to process InfluxDB query results with RX Observables
- how to customize the way of how measurement points are written to InfluxDB

The client API documentation is available online at https://influxdata.github.io/influxdb-client-js/ .
The client API Reference Documentation is available online at https://influxdata.github.io/influxdb-client-js/ .

## Build Requirements

Expand Down
2 changes: 2 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ This directory contains javascript and typescript examples for node.js and brows
How to use forward compatibility APIs from InfluxDB 1.8.
- [rxjs-query.ts](./rxjs-query.ts)
Use [RxJS](https://rxjs.dev/) to query InfluxDB with [Flux](https://v2.docs.influxdata.com/v2.0/query-data/get-started/).
- [writeAdvanced.js](./writeAdvanced.js)
Shows how to control the way of how data points are written to InfluxDB.
- Browser examples
- Change `url` in [env.js](./env.js) to match your influxDB instance
- Change `token, org, bucket, username, password` variables in [./index.html](index.html) to match your influxDB instance
Expand Down
100 changes: 100 additions & 0 deletions examples/writeAdvanced.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#!/usr/bin/env node
//////////////////////////////////////////////////////////////////////////
// Shows how to control the way of how points are written into InfluxDB //
//////////////////////////////////////////////////////////////////////////
/*
This example shows how to use the client's Write API to control the way of how points
are sent to InfluxDB server.

It is based on the simpler write.js example, it assumes that you are familiar with it.
The write.js example asynchronously writes points to InfluxDB and assumes that the library
takes care about retries upon failures and optimizes networking to send points in
batches and on background. This approach is good for sending various metrics from your
application, but it does not scale well when you need to import bigger amount of data. See
https://github.com/influxdata/influxdb-client-js/issues/213 for details.
*/

const {
InfluxDB,
Point,
flux,
fluxDuration,
DEFAULT_WriteOptions,
} = require('@influxdata/influxdb-client')
const {url, token, org, bucket} = require('./env')
const {hostname} = require('os')

console.log('*** WRITE POINTS ***')
/* points/lines are batched in order to minimize networking and increase performance */
const flushBatchSize = DEFAULT_WriteOptions.batchSize
/* count of demo data to import */
const demoCount = 10_000
/* name of demo measurement */
const demoMeasurement = 'temperature2'

// explains all write options
const writeOptions = {
/* the maximum points/line to send in a single batch to InfluxDB server */
batchSize: flushBatchSize + 1, // don't let automatically flush data
/* default tags to add to every point */
defaultTags: {location: hostname},
/* maximum time in millis to keep points in an unflushed batch, 0 means don't periodically flush */
flushInterval: 0,
/* maximum size of the retry buffer - it contains items that could not be sent for the first time */
maxBufferLines: 30_000,
/* the count of retries, the delays between retries follow an exponential backoff strategy if there is no Retry-After HTTP header */
maxRetries: 3,
/* maximum delay between retries in milliseconds */
maxRetryDelay: 15000,
/* minimum delay between retries in milliseconds */
minRetryDelay: 1000, // minimum delay between retries
/* a random value of up to retryJitter is added when scheduling next retry */
retryJitter: 1000,
// ... or you can customize what to do on write failures when using a writeFailed fn, see the API docs for details
// writeFailed: function(error, lines, failedAttempts){/** return promise or void */},
}

const influxDB = new InfluxDB({url, token})

async function importData() {
const writeApi = influxDB.getWriteApi(org, bucket, 'ns', writeOptions)
// import a bigger count of items
for (let i = 0; i < demoCount; i++) {
const point = new Point(demoMeasurement)
.tag('example', 'writeAdvanced.ts')
.floatField('value', 20 + Math.round(100 * Math.random()) / 10)
writeApi.writePoint(point)
// control the way of how data are flushed
if ((i + 1) % flushBatchSize === 0) {
console.log(`flush writeApi: chunk #${(i + 1) / flushBatchSize}`)
try {
await writeApi.flush()
} catch (e) {
console.error()
}
}
}

console.log(
'close writeApi: flush unwritten points, cancel scheduled retries'
)
await writeApi.close()

// print the count of items in the last 5 minutes
const start = fluxDuration('-5m')
const countQuery = flux`from(bucket: ${bucket})
|> range(start: ${start})
|> filter(fn: (r) => r._measurement == ${demoMeasurement})
|> count(column: "_value")`
const count = await influxDB
.getQueryApi(org)
.collectRows(countQuery, (row, tableMeta) =>
Number.parseInt(row[tableMeta.column('_value').index])
)
.then(results => results.reduce((acc, val) => acc + val, 0))
console.log(`Size of temperature2 measurement since '${start}': `, count)
}

importData()
.then(() => console.log('FINISHED'))
.catch(e => console.error('FINISHED', e))
5 changes: 3 additions & 2 deletions packages/core/src/InfluxDB.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ export default class InfluxDB {
* and flushing windows. See {@link DEFAULT_WriteOptions} to see the defaults.
*
* See also {@link https://github.com/influxdata/influxdb-client-js/blob/master/examples/write.js | write.js example},
* {@link https://github.com/influxdata/influxdb-client-js/blob/master/examples/writeAdvanced.js | writeAdvanced.js example},
* and {@link https://github.com/influxdata/influxdb-client-js/blob/master/examples/index.html | browser example}.
*
* @param org - Specifies the destination organization for writes. Takes either the ID or Name interchangeably.
Expand Down Expand Up @@ -73,8 +74,8 @@ export default class InfluxDB {
* @remarks
* See also {@link https://github.com/influxdata/influxdb-client-js/blob/master/examples/query.ts | query.ts example},
* {@link https://github.com/influxdata/influxdb-client-js/blob/master/examples/queryWithParams.ts | queryWithParams.ts example},
* {@link https://github.com/influxdata/influxdb-client-js/blob/master/examples/index.html | browser example},
* and {@link https://github.com/influxdata/influxdb-client-js/blob/master/examples/rxjs-query.ts | rxjs-query.ts example}.
* {@link https://github.com/influxdata/influxdb-client-js/blob/master/examples/rxjs-query.ts | rxjs-query.ts example},
* and {@link https://github.com/influxdata/influxdb-client-js/blob/master/examples/index.html | browser example},
*
* @param org - organization
* @returns QueryApi instance
Expand Down
6 changes: 4 additions & 2 deletions packages/core/src/WriteApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ export default interface WriteApi {

/**
* Flushes pending writes to the server.
* @param withRetryBuffer - flush also all the scheduled retries
* @returns completition promise
*/
flush(): Promise<void>
flush(withRetryBuffer?: boolean): Promise<void>

/**
* Flushes this writer and cancels retries of write operations that failed.
Expand All @@ -60,6 +61,7 @@ export default interface WriteApi {
/**
* Unlike close, dispose simply quits without trying to flush
* the buffered data.
* @returns count of points that were not written to InfluxDB
*/
dispose(): void
dispose(): number
}
13 changes: 8 additions & 5 deletions packages/core/src/impl/WriteApiImpl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ export default class WriteApiImpl implements WriteApi, PointSettings {
(error as HttpError).statusCode >= 429)
) {
Logger.warn(
`Write to influx DB failed (remaining attempts: ${attempts -
`Write to InfluxDB failed (remaining attempts: ${attempts -
1}).`,
error
)
Expand All @@ -165,7 +165,7 @@ export default class WriteApiImpl implements WriteApi, PointSettings {
reject(error)
return
}
Logger.error(`Write to influx DB failed.`, error)
Logger.error(`Write to InfluxDB failed.`, error)
reject(error)
},
complete(): void {
Expand Down Expand Up @@ -203,9 +203,11 @@ export default class WriteApiImpl implements WriteApi, PointSettings {
this.writePoint(points[i])
}
}
async flush(): Promise<void> {
async flush(withRetryBuffer?: boolean): Promise<void> {
await this.writeBuffer.flush()
return await this.retryBuffer.flush()
if (withRetryBuffer) {
return await this.retryBuffer.flush()
}
}
close(): Promise<void> {
const retVal = this.writeBuffer.flush().finally(() => {
Expand All @@ -220,9 +222,10 @@ export default class WriteApiImpl implements WriteApi, PointSettings {
})
return retVal
}
dispose(): void {
dispose(): number {
this._clearFlushTimeout()
this.closed = true
return this.retryBuffer.close() + this.writeBuffer.length
}

// PointSettings
Expand Down
12 changes: 6 additions & 6 deletions packages/core/src/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ export interface RetryDelayStrategyOptions {
* Options that configure strategy for retrying failed InfluxDB write operations.
*/
export interface WriteRetryOptions extends RetryDelayStrategyOptions {
/*
/**
* writeFailed is called to inform about write error
* @param this the instance of the API that failed
* @param error write error
* @param lines failed lines
* @param attempts a number of failed attempts to write the lines
* @return a Promise to force the API to not retry again and use the promise as a result of the flush operation,
* @param this - the instance of the API that failed
* @param error - write error
* @param lines - failed lines
* @param attempts - a number of failed attempts to write the lines
* @returns a Promise to force the API to use it as a result of the flush operation,
* void/undefined to continue with default retry mechanism
*/
writeFailed(
Expand Down
9 changes: 7 additions & 2 deletions packages/core/test/unit/WriteApi.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ describe('WriteApi', () => {
})
it('can be closed and flushed without any data', async () => {
await subject.close().catch(e => expect.fail('should not happen', e))
await subject.flush().catch(e => expect.fail('should not happen', e))
await subject.flush(true).catch(e => expect.fail('should not happen', e))
})
it('fails on close without server connection', async () => {
subject.writeRecord('test value=1')
Expand Down Expand Up @@ -149,9 +149,14 @@ describe('WriteApi', () => {
useSubject({flushInterval: 0, maxRetries: 0, batchSize: 2})
subject.writeRecords(['test value=1', 'test value=2', 'test value=3'])
await new Promise(resolve => setTimeout(resolve, 10)) // wait for HTTP to finish
subject.dispose()
let count = subject.dispose()
expect(logs.error).to.length(1)
expect(logs.warn).to.length(0)
expect(count).equals(1)
count = subject.dispose() // dispose is idempotent
expect(logs.error).to.length(1) // no more errorrs
expect(logs.warn).to.length(0)
expect(count).equals(1)
})
})
describe('flush on background', () => {
Expand Down
2 changes: 2 additions & 0 deletions packages/core/test/unit/query/FluxTableMetaData.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ describe('FluxTableMetaData', () => {
['string', '1', '1'],
['base64Binary', '1', '1'],
['dateTime', '1', '1'],
['dateTime', '', null],
['duration', '1', '1'],
['duration', '', null],
[undefined, '1', '1'],
]
for (const entry of serializationTable) {
Expand Down
2 changes: 1 addition & 1 deletion scripts/enhance-doc-index-md.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ function enhanceIndexMD(file) {
if (line.startsWith('## API Reference')) {
acc.push('')
acc.push(
`The is the API Reference Documentation of InfluxDB v2 JavaScript client version **${version}**.`
`Welcome to the API Reference Documentation of InfluxDB v2 JavaScript client version **${version}** _(${new Date().toISOString()})_.`
)
acc.push('Use this client library with InfluxDB 2.x and InfluxDB 1.8+.')
acc.push(
Expand Down