Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce a writeQueue for event tracking #124

Merged
merged 4 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions Sources/Confidence/Confidence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,7 @@ extension Confidence {
let eventSenderEngine = EventSenderEngineImpl(
clientSecret: clientSecret,
uploader: uploader,
storage: eventStorage,
flushPolicies: [SizeFlushPolicy(batchSize: 10)])
storage: eventStorage)
return Confidence(
clientSecret: clientSecret,
region: region,
Expand Down
50 changes: 34 additions & 16 deletions Sources/Confidence/EventSenderEngine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,36 +24,54 @@ final class EventSenderEngineImpl: EventSenderEngine {
private let clientSecret: String
private let payloadMerger: PayloadMerger = PayloadMergerImpl()
private let semaphore = DispatchSemaphore(value: 1)
private let writeQueue: DispatchQueue

convenience init(
clientSecret: String,
uploader: ConfidenceClient,
storage: EventStorage
) {
self.init(
clientSecret: clientSecret,
uploader: uploader,
storage: storage,
flushPolicies: [SizeFlushPolicy(batchSize: 10)],
writeQueue: DispatchQueue(label: "ConfidenceWriteQueue")
)
}

init(
clientSecret: String,
uploader: ConfidenceClient,
storage: EventStorage,
flushPolicies: [FlushPolicy]
flushPolicies: [FlushPolicy],
writeQueue: DispatchQueue
) {
self.uploader = uploader
self.clientSecret = clientSecret
self.storage = storage
self.flushPolicies = flushPolicies + [ManualFlushPolicy()]
self.writeQueue = writeQueue

writeReqChannel.sink { [weak self] event in
guard let self = self else { return }
if event.name != manualFlushEvent.name { // skip storing flush events.
do {
try self.storage.writeEvent(event: event)
} catch {
writeReqChannel
.receive(on: self.writeQueue)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is new.
also, supporting to pass the write queue for testing purposes.

.sink { [weak self] event in
guard let self = self else { return }
if event.name != manualFlushEvent.name { // skip storing flush events.
do {
try self.storage.writeEvent(event: event)
} catch {
}
}
}

self.flushPolicies.forEach { policy in policy.hit(event: event) }
let shouldFlush = self.flushPolicies.contains { policy in policy.shouldFlush() }
self.flushPolicies.forEach { policy in policy.hit(event: event) }
let shouldFlush = self.flushPolicies.contains { policy in policy.shouldFlush() }

if shouldFlush {
self.uploadReqChannel.send(EventSenderEngineImpl.sendSignalName)
self.flushPolicies.forEach { policy in policy.reset() }
if shouldFlush {
self.uploadReqChannel.send(EventSenderEngineImpl.sendSignalName)
self.flushPolicies.forEach { policy in policy.reset() }
}
}
}
.store(in: &cancellables)
.store(in: &cancellables)

uploadReqChannel.sink { [weak self] _ in
guard let self = self else { return }
Expand Down
92 changes: 52 additions & 40 deletions Tests/ConfidenceTests/EventSenderEngineTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,29 @@ final class ImmidiateFlushPolicy: FlushPolicy {
}

final class EventSenderEngineTest: XCTestCase {
// swiftlint:disable implicitly_unwrapped_optional
var writeQueue: DispatchQueue!
var uploaderMock: EventUploaderMock!
var storageMock: EventStorageMock!
// swiftlint:enable implicitly_unwrapped_optional

override func setUp() async throws {
writeQueue = DispatchQueue(label: "ConfidenceWriteQueue")
uploaderMock = EventUploaderMock()
storageMock = EventStorageMock()
}

func testPayloadOnEmit() throws {
let flushPolicies = [MinSizeFlushPolicy(maxSize: 1)]
let uploader = EventUploaderMock()
let eventSenderEngine = EventSenderEngineImpl(
clientSecret: "CLIENT_SECRET",
uploader: uploader,
storage: EventStorageMock(),
flushPolicies: flushPolicies
uploader: uploaderMock,
storage: storageMock,
flushPolicies: [MinSizeFlushPolicy(maxSize: 1)],
writeQueue: writeQueue
)

let expectation = XCTestExpectation(description: "Upload finished")
let cancellable = uploader.subject.sink { _ in
let cancellable = uploaderMock.subject.sink { _ in
expectation.fulfill()
}
eventSenderEngine.emit(
Expand All @@ -67,100 +78,102 @@ final class EventSenderEngineTest: XCTestCase {


wait(for: [expectation], timeout: 5)
XCTAssertEqual(try XCTUnwrap(uploader.calledRequest)[0].eventDefinition, "my_event")
XCTAssertEqual(try XCTUnwrap(uploader.calledRequest)[0].payload, NetworkStruct(fields: [
XCTAssertEqual(try XCTUnwrap(uploaderMock.calledRequest)[0].eventDefinition, "my_event")
XCTAssertEqual(try XCTUnwrap(uploaderMock.calledRequest)[0].payload, NetworkStruct(fields: [
"a": .number(0.0),
"message": .number(1.0)
]))
cancellable.cancel()
}

func testAddingEventsWithSizeFlushPolicyWorks() throws {
let flushPolicies = [MinSizeFlushPolicy(maxSize: 5)]
let uploader = EventUploaderMock()
let eventSenderEngine = EventSenderEngineImpl(
clientSecret: "CLIENT_SECRET",
uploader: uploader,
storage: EventStorageMock(),
flushPolicies: flushPolicies
uploader: uploaderMock,
storage: storageMock,
flushPolicies: [MinSizeFlushPolicy(maxSize: 5)],
writeQueue: writeQueue
)

eventSenderEngine.emit(eventName: "Hello", message: [:], context: [:])
// TODO: We need to wait for writeReqChannel to complete to make this test meaningful
XCTAssertNil(uploader.calledRequest)
XCTAssertNil(uploaderMock.calledRequest)
}

func testRemoveEventsFromStorageOnBadRequest() throws {
MockedClientURLProtocol.mockedOperation = .badRequest
let client = RemoteConfidenceClient(
let badRequestUploader = RemoteConfidenceClient(
options: ConfidenceClientOptions(credentials: ConfidenceClientCredentials.clientSecret(secret: "")),
session: MockedClientURLProtocol.mockedSession(),
metadata: ConfidenceMetadata(name: "", version: ""))

let flushPolicies = [ImmidiateFlushPolicy()]
let storage = EventStorageMock()
let eventSenderEngine = EventSenderEngineImpl(
clientSecret: "CLIENT_SECRET",
uploader: client,
storage: storage,
flushPolicies: flushPolicies
uploader: badRequestUploader,
storage: storageMock,
flushPolicies: [ImmidiateFlushPolicy()],
writeQueue: writeQueue
)
eventSenderEngine.emit(eventName: "testEvent", message: ConfidenceStruct(), context: ConfidenceStruct())
let expectation = expectation(description: "events batched")
storage.eventsRemoved{
storageMock.eventsRemoved{
expectation.fulfill()
}
wait(for: [expectation], timeout: 2)

XCTAssertEqual(storage.isEmpty(), true)
XCTAssertEqual(storageMock.isEmpty(), true)
}

func testKeepEventsInStorageForRetry() throws {
MockedClientURLProtocol.mockedOperation = .needRetryLater
let client = RemoteConfidenceClient(
let retryLaterUploader = RemoteConfidenceClient(
options: ConfidenceClientOptions(credentials: ConfidenceClientCredentials.clientSecret(secret: "")),
session: MockedClientURLProtocol.mockedSession(),
metadata: ConfidenceMetadata(name: "", version: ""))

let flushPolicies = [ImmidiateFlushPolicy()]
let storage = EventStorageMock()
let eventSenderEngine = EventSenderEngineImpl(
clientSecret: "CLIENT_SECRET",
uploader: client,
storage: storage,
flushPolicies: flushPolicies
uploader: retryLaterUploader,
storage: storageMock,
flushPolicies: [ImmidiateFlushPolicy()],
writeQueue: writeQueue
)

eventSenderEngine.emit(eventName: "testEvent", message: ConfidenceStruct(), context: ConfidenceStruct())

XCTAssertEqual(storage.isEmpty(), false)
writeQueue.sync {
XCTAssertEqual(storageMock.isEmpty(), false)
}
}

func testManualFlushWorks() throws {
let uploaderMock = EventUploaderMock()
let storageMock = EventStorageMock()
let eventSenderEngine = EventSenderEngineImpl(
clientSecret: "CLIENT_SECRET",
uploader: uploaderMock,
storage: storageMock,
// no other flush policy is set which means that only manual flushes will trigger upload
flushPolicies: []
flushPolicies: [],
writeQueue: writeQueue
)

eventSenderEngine.emit(eventName: "Hello", message: [:], context: [:])
eventSenderEngine.emit(eventName: "Hello", message: [:], context: [:])
eventSenderEngine.emit(eventName: "Hello", message: [:], context: [:])
eventSenderEngine.emit(eventName: "Hello", message: [:], context: [:])
XCTAssertEqual(storageMock.events.count, 4)
XCTAssertNil(uploaderMock.calledRequest)


writeQueue.sync {
XCTAssertEqual(storageMock.events.count, 4)
XCTAssertNil(uploaderMock.calledRequest)
}

eventSenderEngine.flush()

let expectation = XCTestExpectation(description: "Upload finished")
let uploadExpectation = XCTestExpectation(description: "Upload finished")
let cancellable = uploaderMock.subject.sink { _ in
expectation.fulfill()
uploadExpectation.fulfill()
}
wait(for: [expectation], timeout: 1)
wait(for: [uploadExpectation], timeout: 1)
let uploadRequest = uploaderMock.calledRequest
XCTAssertEqual(uploadRequest?.count, 4)

Expand All @@ -169,14 +182,13 @@ final class EventSenderEngineTest: XCTestCase {


func testManualFlushEventIsNotStored() throws {
let uploaderMock = EventUploaderMock()
let storageMock = EventStorageMock()
let eventSenderEngine = EventSenderEngineImpl(
clientSecret: "CLIENT_SECRET",
uploader: uploaderMock,
storage: storageMock,
// no other flush policy is set which means that only manual flushes will trigger upload
flushPolicies: []
flushPolicies: [],
writeQueue: writeQueue
)

eventSenderEngine.flush()
Expand Down
Loading