Skip to content

Commit

Permalink
Revert Asyncness removal
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Mar 10, 2022
1 parent 57005f8 commit 6be1b5a
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 29 deletions.
16 changes: 8 additions & 8 deletions src/Equinox.Core/Cache.fs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ type CacheEntry<'state>(initialToken: StreamToken, initialState: 'state, superse
currentToken, currentState

type ICache =
abstract member UpdateIfNewer : key: string * options: CacheItemOptions * entry: CacheEntry<'state> -> unit
abstract member TryGet : key: string -> (StreamToken * 'state) option
abstract member UpdateIfNewer : key: string * options: CacheItemOptions * entry: CacheEntry<'state> -> Async<unit>
abstract member TryGet : key: string -> Async<(StreamToken * 'state) option>

namespace Equinox

Expand All @@ -41,15 +41,15 @@ type Cache(name, sizeMb : int) =
| RelativeExpiration relative -> CacheItemPolicy(SlidingExpiration = relative)

interface ICache with
member _.UpdateIfNewer(key, options, entry) =
member _.UpdateIfNewer(key, options, entry) = async {
let policy = toPolicy options
match cache.AddOrGetExisting(key, box entry, policy) with
| null -> ()
| :? CacheEntry<'state> as existingEntry -> existingEntry.UpdateIfNewer entry
| x -> failwithf "UpdateIfNewer Incompatible cache entry %A" x
| x -> failwithf "UpdateIfNewer Incompatible cache entry %A" x }

member _.TryGet key =
member _.TryGet key = async {
match cache.Get key with
| null -> None
| :? CacheEntry<'state> as existingEntry -> Some existingEntry.Value
| x -> failwithf "TryGet Incompatible cache entry %A" x
| null -> return None
| :? CacheEntry<'state> as existingEntry -> return Some existingEntry.Value
| x -> return failwithf "TryGet Incompatible cache entry %A" x }
6 changes: 3 additions & 3 deletions src/Equinox.Core/Caching.fs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
module Equinox.Core.Caching

type internal Decorator<'event, 'state, 'context>(inner : ICategory<'event, 'state, string, 'context>, updateCache : string -> StreamToken * 'state -> unit) =
type internal Decorator<'event, 'state, 'context>(inner : ICategory<'event, 'state, string, 'context>, updateCache : string -> StreamToken * 'state -> Async<unit>) =

let cache streamName inner = async {
let! tokenAndState = inner
updateCache streamName tokenAndState
do! updateCache streamName tokenAndState
return tokenAndState }

interface ICategory<'event, 'state, string, 'context> with
Expand All @@ -15,7 +15,7 @@ type internal Decorator<'event, 'state, 'context>(inner : ICategory<'event, 'sta
match! inner.TrySync(log, streamName, streamToken, state, events, context) with
| SyncResult.Conflict resync -> return SyncResult.Conflict (resync |> cache streamName)
| SyncResult.Written (token', state') ->
updateCache streamName (token', state')
do! updateCache streamName (token', state')
return SyncResult.Written (token', state') }

let applyCacheUpdatesWithSlidingExpiration
Expand Down
16 changes: 8 additions & 8 deletions src/Equinox.CosmosStore/CosmosStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -1133,20 +1133,20 @@ module internal Caching =
mapUnfolds : Choice<unit, 'event list -> 'state -> 'event seq, 'event list -> 'state -> 'event list * 'event list>) =
let cache streamName inner = async {
let! tokenAndState = inner
updateCache streamName tokenAndState
do! updateCache streamName tokenAndState
return tokenAndState }
interface ICategory<'event, 'state, string, 'context> with
member _.Load(log, streamName, allowStale) : Async<StreamToken * 'state> =
match tryReadCache streamName with
| None -> category.Load(log, streamName, initial, checkUnfolds, fold, isOrigin) |> cache streamName
| Some tokenAndState when allowStale -> async { return tokenAndState } // read already updated TTL, no need to write
| Some (token, state) -> category.Reload(log, streamName, token, state, fold, isOrigin) |> cache streamName
member _.Load(log, streamName, allowStale) : Async<StreamToken * 'state> = async {
match! tryReadCache streamName with
| None -> return! category.Load(log, streamName, initial, checkUnfolds, fold, isOrigin) |> cache streamName
| Some tokenAndState when allowStale -> return tokenAndState // read already updated TTL, no need to write
| Some (token, state) -> return! category.Reload(log, streamName, token, state, fold, isOrigin) |> cache streamName }
member _.TrySync(log : ILogger, streamName, streamToken, state, events : 'event list, context) : Async<SyncResult<'state>> = async {
match! category.Sync(log, streamName, streamToken, state, events, mapUnfolds, fold, isOrigin, context, compressUnfolds) with
| SyncResult.Conflict resync ->
return SyncResult.Conflict (cache streamName resync)
| SyncResult.Written (token', state') ->
updateCache streamName (token', state')
do! updateCache streamName (token', state')
return SyncResult.Written (token', state') }

module ConnectionString =
Expand Down Expand Up @@ -1430,7 +1430,7 @@ type CosmosStoreCategory<'event, 'state, 'context>
let createCategory _name : ICategory<_, _, string, 'context> =
let tryReadCache, updateCache =
match caching with
| CachingStrategy.NoCaching -> (fun _ -> None), fun _ _ -> ()
| CachingStrategy.NoCaching -> (fun _ -> async { return None }), fun _ _ -> async { () }
| CachingStrategy.SlidingWindow (cache, window) -> cache.TryGet, Caching.applyCacheUpdatesWithSlidingExpiration (cache, null) window
| CachingStrategy.FixedTimeSpan (cache, period) -> cache.TryGet, Caching.applyCacheUpdatesWithFixedTimeSpan (cache, null) period
let isOrigin, checkUnfolds, mapUnfolds =
Expand Down
10 changes: 5 additions & 5 deletions src/Equinox.EventStore/EventStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -503,11 +503,11 @@ type private Folder<'event, 'state, 'context>(category : Category<'event, 'state
member _.Load(log, streamName, allowStale) : Async<StreamToken * 'state> =
match readCache with
| None -> batched log streamName
| Some (cache : ICache, prefix : string) ->
match cache.TryGet(prefix + streamName) with
| None -> batched log streamName
| Some tokenAndState when allowStale -> async { return tokenAndState }
| Some (token, state) -> category.LoadFromToken fold state streamName token log
| Some (cache : ICache, prefix : string) -> async {
match! cache.TryGet(prefix + streamName) with
| None -> return! batched log streamName
| Some tokenAndState when allowStale -> return tokenAndState
| Some (token, state) -> return! category.LoadFromToken fold state streamName token log }
member _.TrySync(log, streamName, token, initialState, events : 'event list, context) : Async<SyncResult<'state>> = async {
match! category.TrySync(log, fold, streamName, token, initialState, events, context) with
| SyncResult.Conflict resync -> return SyncResult.Conflict resync
Expand Down
10 changes: 5 additions & 5 deletions src/Equinox.SqlStreamStore/SqlStreamStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -466,11 +466,11 @@ type private Folder<'event, 'state, 'context>(category : Category<'event, 'state
member _.Load(log, streamName, allowStale) : Async<StreamToken * 'state> =
match readCache with
| None -> batched log streamName
| Some (cache : ICache, prefix : string) ->
match cache.TryGet(prefix + streamName) with
| None -> batched log streamName
| Some tokenAndState when allowStale -> async { return tokenAndState }
| Some (token, state) -> category.LoadFromToken fold state streamName token log
| Some (cache : ICache, prefix : string) -> async {
match! cache.TryGet(prefix + streamName) with
| None -> return! batched log streamName
| Some tokenAndState when allowStale -> return tokenAndState
| Some (token, state) -> return! category.LoadFromToken fold state streamName token log }
member _.TrySync(log : ILogger, streamName, streamToken, initialState, events : 'event list, context) : Async<SyncResult<'state>> = async {
match! category.TrySync(log, fold, streamName, streamToken, initialState, events, context) with
| SyncResult.Conflict resync -> return SyncResult.Conflict resync
Expand Down

0 comments on commit 6be1b5a

Please sign in to comment.