Skip to content

Commit

Permalink
Backport #323
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed May 12, 2022
1 parent b24c6d5 commit d6a161e
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 18 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ The `Unreleased` section name is replaced by the expected version of next releas
### Added

- `eqx dump`/`Equinox.Tool`: Add `-F` option to opt out of pretty printing unfolds [#319](https://github.com/jet/equinox/pull/319)
- `eqx dump`/`Equinox.Tool`: Show payload statistics [#323](https://github.com/jet/equinox/pull/323)
- `eqx dump`/`Equinox.Tool`: Add `-B` option to prevent assuming UTF-8 bodies [#323](https://github.com/jet/equinox/pull/323)

- ### Changed

Expand Down
51 changes: 33 additions & 18 deletions tools/Equinox.Tool/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ and [<NoComparison; NoEquality>]StatsArguments =
and [<NoComparison; NoEquality>]DumpArguments =
| [<AltCommandLine "-s">] Stream of FsCodec.StreamName
| [<AltCommandLine "-C"; Unique>] Correlation
| [<AltCommandLine "-B"; Unique>] Blobs
| [<AltCommandLine "-J"; Unique>] JsonSkip
| [<AltCommandLine "-P"; Unique>] Pretty
| [<AltCommandLine "-F"; Unique>] FlattenUnfolds
Expand All @@ -104,7 +105,8 @@ and [<NoComparison; NoEquality>]DumpArguments =
member a.Usage = a |> function
| Stream _ -> "Specify stream(s) to dump."
| Correlation -> "Include Correlation/Causation identifiers"
| JsonSkip -> "Don't attempt to decode JSON"
| Blobs -> "Don't assume Data/Metadata is UTF-8 text"
| JsonSkip -> "Don't assume Data/Metadata is JSON"
| Pretty -> "Pretty print the JSON over multiple lines"
| FlattenUnfolds -> "Don't pretty print the JSON over multiple lines for Unfolds"
| TimeRegular -> "Don't humanize time intervals between events"
Expand Down Expand Up @@ -236,6 +238,16 @@ let createStoreLog verbose verboseConsole maybeSeqEndpoint =
let c = match maybeSeqEndpoint with None -> c | Some endpoint -> c.WriteTo.Seq(endpoint)
c.CreateLogger() :> ILogger

let dumpStats storeConfig log =
match storeConfig with
| Some (Storage.StorageConfig.Cosmos _) ->
Equinox.CosmosStore.Core.Log.InternalMetrics.dump log
| Some (Storage.StorageConfig.Es _) ->
Equinox.EventStore.Log.InternalMetrics.dump log
| Some (Storage.StorageConfig.Sql _) ->
Equinox.SqlStreamStore.Log.InternalMetrics.dump log
| _ -> ()

module LoadTest =

open Equinox.Tools.TestHarness
Expand Down Expand Up @@ -297,15 +309,7 @@ module LoadTest =
for r in results do
resultFile.Information("Aggregate: {aggregate}", r)
log.Information("Run completed; Current memory allocation: {bytes:n2} MiB", (GC.GetTotalMemory(true) |> float) / 1024./1024.)

match storeConfig with
| Some (Storage.StorageConfig.Cosmos _) ->
Equinox.CosmosStore.Core.Log.InternalMetrics.dump log
| Some (Storage.StorageConfig.Es _) ->
Equinox.EventStore.Log.InternalMetrics.dump log
| Some (Storage.StorageConfig.Sql _) ->
Equinox.SqlStreamStore.Log.InternalMetrics.dump log
| _ -> ()
dumpStats storeConfig log

let createDomainLog verbose verboseConsole maybeSeqEndpoint =
let c = LoggerConfiguration().Destructure.FSharpTypes().Enrich.FromLogContext()
Expand Down Expand Up @@ -393,7 +397,7 @@ module Dump =
let createStoreLog verboseStore = createStoreLog verboseStore verboseConsole maybeSeq
let storeLog, storeConfig = a.ConfigureStore(log, createStoreLog)
let doU, doE = not (args.Contains EventsOnly), not (args.Contains UnfoldsOnly)
let doC, doJ, doT = args.Contains Correlation, not (args.Contains JsonSkip), not (args.Contains TimeRegular)
let doC, doJ, doS, doT = args.Contains Correlation, not (args.Contains JsonSkip), not (args.Contains Blobs), not (args.Contains TimeRegular)
let cat = Services.StreamResolver(storeConfig)

let streams = args.GetResults DumpArguments.Stream
Expand All @@ -405,15 +409,21 @@ module Dump =
let isOriginAndSnapshot = (fun (event : FsCodec.ITimelineEvent<_>) -> not doE && event.IsUnfold), fun _state -> failwith "no snapshot required"
let formatUnfolds, formatEvents =
let indentedOptions = FsCodec.SystemTextJson.Options.Create(indent = true)
let prettify : string -> _ = System.Text.Json.JsonDocument.Parse >> fun d -> System.Text.Json.JsonSerializer.Serialize(d, indentedOptions)
let prettify (json : string) =
use parsed = System.Text.Json.JsonDocument.Parse json
System.Text.Json.JsonSerializer.Serialize(parsed, indentedOptions)
if args.Contains FlattenUnfolds then id else prettify
, if args.Contains Pretty then prettify else id
let mutable payloadBytes = 0
let render format (data : byte[]) =
try match data with
| null | [||] -> null
| _ when doJ -> System.Text.Encoding.UTF8.GetString data |> format
| _ -> $"(%d{System.Text.Encoding.UTF8.GetString(data).Length} chars)"
with e -> log.ForContext("str", System.Text.Encoding.UTF8.GetString data).Warning(e, "JSON Parse failure - use SkipJson option to inhibit"); reraise()
payloadBytes <- payloadBytes + data.Length
if data = null then null
elif not doS then $"%6d{data.Length}b"
else try let s = System.Text.Encoding.UTF8.GetString data
if doJ then try format s
with e -> log.ForContext("str", s).Warning(e, "JSON Parse failure - use --JsonSkip option to inhibit"); reraise()
else $"(%d{s.Length} chars)"
with e -> log.Warning(e, "UTF-8 Parse failure - use --Blobs option to inhibit"); reraise()
let readStream (streamName : FsCodec.StreamName) = async {
let stream = cat.Resolve(idCodec, fold, initial, isOriginAndSnapshot) streamName
let! _token, events = stream.Load(storeLog)
Expand All @@ -438,6 +448,11 @@ module Dump =
|> Seq.map readStream
|> Async.Parallel
|> Async.Ignore<unit[]>
|> Async.RunSynchronously

log.Information("Total Payload {kib:n1}KiB", float payloadBytes / 1024.)
if verboseConsole then
dumpStats (Some storeConfig) log

[<EntryPoint>]
let main argv =
Expand All @@ -451,7 +466,7 @@ let main argv =
try match args.GetSubCommand() with
| Init iargs -> CosmosInit.containerAndOrDb log iargs |> Async.RunSynchronously
| Config cargs -> SqlInit.databaseOrSchema log cargs |> Async.RunSynchronously
| Dump dargs -> Dump.run (log, verboseConsole, maybeSeq) dargs |> Async.RunSynchronously
| Dump dargs -> Dump.run (log, verboseConsole, maybeSeq) dargs
| Stats sargs -> CosmosStats.run (log, verboseConsole, maybeSeq) sargs |> Async.RunSynchronously
| Run rargs ->
let reportFilename = args.GetResult(LogFile, programName + ".log") |> fun n -> System.IO.FileInfo(n).FullName
Expand Down

0 comments on commit d6a161e

Please sign in to comment.