diff --git a/Makefile b/Makefile index db9a3c27bb..2d95b1c7d7 100644 --- a/Makefile +++ b/Makefile @@ -40,6 +40,7 @@ test: | test1 test2 v1: | wakunode1 example1 sim1 v2: | wakunode2 example2 wakubridge chat2 chat2bridge +v2light: | wakunode2 waku.nims: ln -s waku.nimble $@ @@ -144,12 +145,17 @@ testcommon: | build deps ############# ## Waku v2 ## ############# -.PHONY: test2 wakunode2 example2 sim2 scripts2 wakubridge chat2 chat2bridge +.PHONY: test2 wakunode2 example2 sim2 scripts2 wakubridge chat2 chat2bridge mytests test2: | build deps librln testcommon echo -e $(BUILD_MSG) "build/$@" && \ $(ENV_SCRIPT) nim test2 $(NIM_PARAMS) $(EXPERIMENTAL_PARAMS) waku.nims + +mytests: | build deps librln + echo -e $(BUILD_MSG) "build/$@" && \ + $(ENV_SCRIPT) nim mytests $(NIM_PARAMS) $(EXPERIMENTAL_PARAMS) waku.nims + wakunode2: | build deps librln echo -e $(BUILD_MSG) "build/$@" && \ $(ENV_SCRIPT) nim wakunode2 $(NIM_PARAMS) $(EXPERIMENTAL_PARAMS) waku.nims diff --git a/apps/wakunode2/wakunode2.nim b/apps/wakunode2/wakunode2.nim index 2f4d0af295..ae7806195a 100644 --- a/apps/wakunode2/wakunode2.nim +++ b/apps/wakunode2/wakunode2.nim @@ -25,6 +25,7 @@ import ../../waku/common/sqlite, ../../waku/common/utils/nat, ../../waku/common/logging, + ../../waku/v2/config, ../../waku/v2/node/peer_manager, ../../waku/v2/node/peer_manager/peer_store/waku_peer_storage, ../../waku/v2/node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations, @@ -32,9 +33,6 @@ import ../../waku/v2/node/waku_node, ../../waku/v2/node/waku_metrics, ../../waku/v2/protocol/waku_archive, - ../../waku/v2/protocol/waku_archive/driver/queue_driver, - ../../waku/v2/protocol/waku_archive/driver/sqlite_driver, - ../../waku/v2/protocol/waku_archive/driver/sqlite_driver/migrations as archive_driver_sqlite_migrations, ../../waku/v2/protocol/waku_archive/retention_policy, ../../waku/v2/protocol/waku_archive/retention_policy/retention_policy_capacity, ../../waku/v2/protocol/waku_archive/retention_policy/retention_policy_time, @@ -50,7 +48,7 @@ import ../../waku/v2/utils/peers, ./wakunode2_setup_rest, ./wakunode2_setup_rpc, - ./config + ./wakunode2_archive_driver when defined(rln): import @@ -158,36 +156,6 @@ proc setupWakuArchiveRetentionPolicy(retentionPolicy: string): SetupResult[Optio else: return err("unknown retention policy") -proc setupWakuArchiveDriver(dbUrl: string, vacuum: bool, migrate: bool): SetupResult[ArchiveDriver] = - let db = ?setupDatabaseConnection(dbUrl) - - if db.isSome(): - # SQLite vacuum - # TODO: Run this only if the database engine is SQLite - let (pageSize, pageCount, freelistCount) = ?gatherSqlitePageStats(db.get()) - debug "sqlite database page stats", pageSize=pageSize, pages=pageCount, freePages=freelistCount - - if vacuum and (pageCount > 0 and freelistCount > 0): - ?performSqliteVacuum(db.get()) - - # Database migration - if migrate: - ?archive_driver_sqlite_migrations.migrate(db.get()) - - if db.isSome(): - debug "setting up sqlite waku archive driver" - let res = SqliteDriver.new(db.get()) - if res.isErr(): - return err("failed to init sqlite archive driver: " & res.error) - - ok(res.value) - - else: - debug "setting up in-memory waku archive driver" - let driver = QueueDriver.new() # Defaults to a capacity of 25.000 messages - ok(driver) - - proc retrieveDynamicBootstrapNodes*(dnsDiscovery: bool, dnsDiscoveryUrl: string, dnsDiscoveryNameServers: seq[ValidIpAddress]): SetupResult[seq[RemotePeerInfo]] = if dnsDiscovery and dnsDiscoveryUrl != "": @@ -642,7 +610,7 @@ when isMainModule: error "failed to configure the message store database connection", error=dbUrlValidationRes.error quit(QuitFailure) - let archiveDriverRes = setupWakuArchiveDriver(dbUrlValidationRes.get(), vacuum=conf.storeMessageDbVacuum, migrate=conf.storeMessageDbMigration) + let archiveDriverRes = setupWakuArchiveDriver(conf) if archiveDriverRes.isOk(): archiveDriver = some(archiveDriverRes.get()) else: diff --git a/apps/wakunode2/wakunode2_archive_driver.nim b/apps/wakunode2/wakunode2_archive_driver.nim new file mode 100644 index 0000000000..4fec7d157d --- /dev/null +++ b/apps/wakunode2/wakunode2_archive_driver.nim @@ -0,0 +1,94 @@ +import + std/strutils, + chronicles, + stew/results, + ../../waku/v2/protocol/waku_archive, + ../../waku/v2/config, + ../../waku/v2/protocol/waku_archive/driver/sqlite_driver, + ../../waku/v2/protocol/waku_archive/driver/postgres_driver, + ../../waku/v2/protocol/waku_archive/driver/queue_driver, + ../../waku/v2/protocol/waku_archive/driver/sqlite_driver/migrations as archive_driver_sqlite_migrations, + ../../waku/common/sqlite + +type SetupResult[T] = Result[T, string] + +proc performSqliteVacuum(db: SqliteDatabase): SetupResult[void] = + ## SQLite database vacuuming + # TODO: Run vacuuming conditionally based on database page stats + # if (pageCount > 0 and freelistCount > 0): + + debug "starting sqlite database vacuuming" + + let resVacuum = db.vacuum() + if resVacuum.isErr(): + return err("failed to execute vacuum: " & resVacuum.error) + + debug "finished sqlite database vacuuming" + + + +proc gatherSqlitePageStats(db: SqliteDatabase): SetupResult[(int64, int64, int64)] = + let + pageSize = ?db.getPageSize() + pageCount = ?db.getPageCount() + freelistCount = ?db.getFreelistCount() + + ok((pageSize, pageCount, freelistCount)) + + +proc setupSqliteDriver(conf: WakuNodeConf, path: string): SetupResult[ArchiveDriver] = + let res = SqliteDatabase.new(path) + + if res.isErr(): + return err("could not create sqlite database") + + let database = res.get() + + # TODO: Run this only if the database engine is SQLite + let (pageSize, pageCount, freelistCount) = ?gatherSqlitePageStats(database) + debug "sqlite database page stats", pageSize=pageSize, pages=pageCount, freePages=freelistCount + + if conf.storeMessageDbVacuum and (pageCount > 0 and freelistCount > 0): + ?performSqliteVacuum(database) + +# Database migration + if conf.storeMessageDbMigration: + ?archive_driver_sqlite_migrations.migrate(database) + + debug "setting up sqlite waku archive driver" + let sqliteDriverRes = SqliteDriver.new(database) + if sqliteDriverRes.isErr(): + return err("failed to init sqlite archive driver: " & res.error) + + ok(sqliteDriverRes.value) + +proc setupPostgresDriver(conf: WakuNodeConf): SetupResult[ArchiveDriver] = + let res = PostgresDriver.new(conf) + if res.isErr(): + return err("could not create postgres driver") + + ok(res.value) + +proc setupWakuArchiveDriver*(conf: WakuNodeConf): SetupResult[ArchiveDriver] = + if conf.storeMessageDbUrl == "" or conf.storeMessageDbUrl == "none": + let driver = QueueDriver.new() # Defaults to a capacity of 25.000 messages + return ok(driver) + + let dbUrlParts = conf.storeMessageDbUrl.split("://", 1) + let + engine = dbUrlParts[0] + path = dbUrlParts[1] + + let connRes = case engine + of "sqlite": + setupSqliteDriver(conf, path) + of "postgres": + setupPostgresDriver(conf) + else: + return err("unknown database engine") + + if connRes.isErr(): + return err("failed to init connection" & connRes.error) + ok(connRes.get()) + + diff --git a/apps/wakunode2/wakunode2_setup_rest.nim b/apps/wakunode2/wakunode2_setup_rest.nim index d7b2b3bdff..fc14c969d7 100644 --- a/apps/wakunode2/wakunode2_setup_rest.nim +++ b/apps/wakunode2/wakunode2_setup_rest.nim @@ -13,7 +13,7 @@ import ../../waku/v2/node/rest/debug/handlers as debug_api, ../../waku/v2/node/rest/relay/handlers as relay_api, ../../waku/v2/node/rest/relay/topic_cache, - ./config + ../../waku/v2/config logScope: diff --git a/apps/wakunode2/wakunode2_setup_rpc.nim b/apps/wakunode2/wakunode2_setup_rpc.nim index 53567a0db6..8fbd095ae2 100644 --- a/apps/wakunode2/wakunode2_setup_rpc.nim +++ b/apps/wakunode2/wakunode2_setup_rpc.nim @@ -15,7 +15,7 @@ import ../../waku/v2/node/jsonrpc/filter/handlers as filter_api, ../../waku/v2/node/jsonrpc/relay/handlers as relay_api, ../../waku/v2/node/jsonrpc/store/handlers as store_api, - ./config + ../../waku/v2/config logScope: topics = "wakunode jsonrpc" diff --git a/tests/docker-compose.yml b/tests/docker-compose.yml new file mode 100644 index 0000000000..14fcdb1686 --- /dev/null +++ b/tests/docker-compose.yml @@ -0,0 +1,11 @@ +version: "3.8" + + +services: + db: + image: postgres:9.6-alpine + restart: always + environment: + POSTGRES_PASSWORD: test123 + ports: + - "5432:5432" diff --git a/tests/my_tests.nim b/tests/my_tests.nim new file mode 100644 index 0000000000..7705b6d92d --- /dev/null +++ b/tests/my_tests.nim @@ -0,0 +1,5 @@ +## Waku v2 + +# Waku archive test suite +import + ./v2/waku_archive/test_driver_postgres diff --git a/tests/v2/waku_archive/test_driver_postgres.nim b/tests/v2/waku_archive/test_driver_postgres.nim new file mode 100644 index 0000000000..42687700a6 --- /dev/null +++ b/tests/v2/waku_archive/test_driver_postgres.nim @@ -0,0 +1,184 @@ +{.used.} + +import + std/sequtils, + testutils/unittests, + chronos +import + ../../../waku/v2/protocol/waku_archive, + ../../../waku/v2/config, + ../../../waku/v2/protocol/waku_archive/driver/postgres_driver, + ../../../waku/v2/protocol/waku_message, + ../testlib/common, + ../testlib/waku2 + +proc defaultConf : WakuNodeConf = + return WakuNodeConf( + storeMessageDbUrl: "postgres://postgres:test123@localhost:5432/postgres", + listenAddress: ValidIpAddress.init("127.0.0.1"), rpcAddress: ValidIpAddress.init("127.0.0.1"), restAddress: ValidIpAddress.init("127.0.0.1"), metricsServerAddress: ValidIpAddress.init("127.0.0.1")) + +suite "Postgres driver": + + test "init driver and database": + + ## When + let driverRes = PostgresDriver.new(defaultConf()) + + ## Then + require: + driverRes.isOk() + + let driver: ArchiveDriver = driverRes.tryGet() + require: + not driver.isNil() + + discard driverRes.get().reset() + let initRes = driverRes.get().init() + + require: + initRes.isOk() + + ## Cleanup + driver.close().expect("driver to close") + + test "insert a message": + ## Given + const contentTopic = "test-content-topic" + + let driverRes = PostgresDriver.new(defaultConf()) + + require: + driverRes.isOk() + + discard driverRes.get().reset() + discard driverRes.get().init() + + let driver: ArchiveDriver = driverRes.tryGet() + require: + not driver.isNil() + + + let msg = fakeWakuMessage(contentTopic=contentTopic) + + let computedDigest = computeDigest(msg) + ## When + let putRes = driver.put(DefaultPubsubTopic, msg, computedDigest, msg.timestamp) + + ## Then + require: + putRes.isOk() + + let storedMsg = driver.getAllMessages().tryGet() + require: + storedMsg.len == 1 + storedMsg.all do (item: auto) -> bool: + let (pubsubTopic, actualMsg, digest, storeTimestamp) = item + actualMsg.contentTopic == contentTopic and + pubsubTopic == DefaultPubsubTopic and + toHex(computedDigest.data) == toHex(digest) and + toHex(actualMsg.payload) == toHex(msg.payload) + + ## Cleanup + driver.close().expect("driver to close") + + test "insert and query message": + ## Given + const contentTopic1 = "test-content-topic-1" + const contentTopic2 = "test-content-topic-2" + const pubsubTopic1 = "pubsubtopic-1" + const pubsubTopic2 = "pubsubtopic-2" + + let driverRes = PostgresDriver.new(defaultConf()) + + require: + driverRes.isOk() + + discard driverRes.get().reset() + discard driverRes.get().init() + + let driver: ArchiveDriver = driverRes.tryGet() + require: + not driver.isNil() + + let msg1 = fakeWakuMessage(contentTopic=contentTopic1) + + ## When + var putRes = driver.put(pubsubTopic1, msg1, computeDigest(msg1), msg1.timestamp) + + ## Then + require: + putRes.isOk() + + let msg2 = fakeWakuMessage(contentTopic=contentTopic2) + + ## When + putRes = driver.put(pubsubTopic2, msg2, computeDigest(msg2), msg2.timestamp) + + ## Then + require: + putRes.isOk() + + let countMessagesRes = driver.getMessagesCount() + + require: + countMessagesRes.isOk() and + countMessagesRes.get() == 2 + + var messagesRes = driver.getMessages(contentTopic = @[contentTopic1]) + + require: + messagesRes.isOk() + + require: + messagesRes.get().len == 1 + + # Get both content topics, check ordering + messagesRes = driver.getMessages(contentTopic = @[contentTopic1, contentTopic2]) + + require: + messagesRes.isOk() + + require: + messagesRes.get().len == 2 and messagesRes.get()[0][1].WakuMessage.contentTopic == contentTopic1 + + # Descending order + messagesRes = driver.getMessages(contentTopic = @[contentTopic1, contentTopic2], ascendingOrder = false) + + require: + messagesRes.isOk() + + require: + messagesRes.get().len == 2 and messagesRes.get()[0][1].WakuMessage.contentTopic == contentTopic2 + + # cursor + + let cursor = ArchiveCursor(storeTime: messagesRes.get()[0][3]) + # Get both content topics + messagesRes = driver.getMessages(contentTopic = @[contentTopic1, contentTopic2],cursor = some(cursor)) + + require: + messagesRes.isOk() + + require: + messagesRes.get().len == 1 + + # Get both content topics but one pubsub topic + messagesRes = driver.getMessages(contentTopic = @[contentTopic1, contentTopic2], pubsubTopic = some(pubsubTopic1)) + + require: + messagesRes.isOk() + + require: + messagesRes.get().len == 1 and messagesRes.get()[0][1].WakuMessage.contentTopic == contentTopic1 + + # Limit + messagesRes = driver.getMessages(contentTopic = @[contentTopic1, contentTopic2], maxPageSize = 1) + + require: + messagesRes.isOk() + + require: + messagesRes.get().len == 1 + + ## Cleanup + driver.close().expect("driver to close") diff --git a/waku.nimble b/waku.nimble index a391b120eb..387f4cf178 100644 --- a/waku.nimble +++ b/waku.nimble @@ -81,6 +81,8 @@ task bridge, "Build Waku v1 - v2 bridge": task test2, "Build & run Waku v2 tests": test "all_tests_v2" +task mytests, "Build & run Waku v2 tests": + test "my_tests" task sim2, "Build Waku v2 simulation tools": buildBinary "quicksim2", "tools/simulation/", "-d:chronicles_log_level=DEBUG" diff --git a/apps/wakunode2/config.nim b/waku/v2/config.nim similarity index 100% rename from apps/wakunode2/config.nim rename to waku/v2/config.nim diff --git a/waku/v2/node/waku_node.nim b/waku/v2/node/waku_node.nim index 9c8774a225..ada76c90bd 100644 --- a/waku/v2/node/waku_node.nim +++ b/waku/v2/node/waku_node.nim @@ -24,9 +24,14 @@ import libp2p/transports/tcptransport, libp2p/transports/wstransport import + ../config, ../protocol/waku_message, ../protocol/waku_relay, ../protocol/waku_archive, + ../protocol/waku_archive/driver/queue_driver, + ../protocol/waku_archive/driver/sqlite_driver, + ../protocol/waku_archive/driver/sqlite_driver/migrations as archive_driver_sqlite_migrations, + ../protocol/waku_archive/driver/postgres_driver, ../protocol/waku_store, ../protocol/waku_store/client as store_client, ../protocol/waku_filter, @@ -39,6 +44,7 @@ import ../protocol/waku_peer_exchange, ../utils/peers, ../utils/time, + ../../common/sqlite, ./peer_manager, ./wakuswitch @@ -651,6 +657,9 @@ proc unsubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: Conte ## Waku archive + +type MountArchiveResult[T] = Result[T, string] + proc mountArchive*(node: WakuNode, driver: Option[ArchiveDriver], messageValidator: Option[MessageValidator], diff --git a/waku/v2/protocol/waku_archive/driver/postgres_driver.nim b/waku/v2/protocol/waku_archive/driver/postgres_driver.nim new file mode 100644 index 0000000000..9d4ee222d8 --- /dev/null +++ b/waku/v2/protocol/waku_archive/driver/postgres_driver.nim @@ -0,0 +1,248 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/db_postgres, + std/sequtils, + std/parseutils, + std/strformat, + std/nre, + std/options, + stew/[results,byteutils], + std/strutils + +import + ../../../protocol/waku_message, + ../../../utils/time, + ../../../config, + ../common, + ../driver + +export postgres_driver + +type PostgresDriver* = ref object of ArchiveDriver + connection : DbConn + preparedInsert: SqlPrepared + +proc dropTableQuery(): string = + "DROP TABLE messages" + +proc createTableQuery(): string = + "CREATE TABLE IF NOT EXISTS messages (" & + " pubsubTopic VARCHAR NOT NULL," & + " contentTopic VARCHAR NOT NULL," & + " payload VARCHAR," & + " version INTEGER NOT NULL," & + " timestamp BIGINT NOT NULL," & + " id VARCHAR NOT NULL," & + " storedAt BIGINT NOT NULL," & + " CONSTRAINT messageIndex PRIMARY KEY (storedAt, id, pubsubTopic)" & + ");" + +proc insertRow(): string = + """INSERT INTO messages (id, storedAt, contentTopic, payload, pubsubTopic, version, timestamp) VALUES ($1, $2, $3, $4, $5, $6, $7);""" + + +proc new*(T: type PostgresDriver, conf: WakuNodeConf): ArchiveDriverResult[T] = + var host : string + var user : string + var password : string + var dbName : string + var port : string + var connectionString : string + var dbConn : DbConn + try: + let regex = re("""^postgres:\/\/([^:]+):([^@]+)@([^:]+):(\d+)\/(.+)$""") + let matches = find(conf.storeMessageDbUrl,regex).get.captures + user = matches[0] + password = matches[1] + host = matches[2] + port = matches[3] + dbName = matches[4] + connectionString = "user={user} host={host} port={port} dbname={dbName} password={password}".fmt + except KeyError,InvalidUnicodeError, RegexInternalError, ValueError, StudyError, SyntaxError: + return err("could not parse postgres string") + + try: + dbConn = open("","", "", connectionString) + except DbError: + return err("could not connect to postgres") + + ok(PostgresDriver(connection: dbConn)) + +method reset*(s: PostgresDriver) : ArchiveDriverResult[void] = + try: + let res = s.connection.tryExec(sql(dropTableQuery())) + if not res: + return err("failed to reset database") + except DbError: + return err("failed to reset database") + ok() + +method init*(s: PostgresDriver) : ArchiveDriverResult[void] = + try: + let res = s.connection.tryExec(sql(createTableQuery())) + if not res: + return err("failed to migrate database") + s.preparedInsert = prepare(s.connection, "insertRow", sql(insertRow()), 7) + except DbError: + let + e = getCurrentException() + msg = getCurrentExceptionMsg() + exceptionMessage = "failed to init driver, got exception " & repr(e) & " with message " & msg + return err(exceptionMessage) + ok() + +method put*(s: PostgresDriver, pubsubTopic: PubsubTopic, message: WakuMessage, digest: MessageDigest, receivedTime: Timestamp): ArchiveDriverResult[void] = + try: + let res = s.connection.tryExec(s.preparedInsert, toHex(digest.data), receivedTime, message.contentTopic, toHex(message.payload), pubsubTopic, int64(message.version), message.timestamp) + if not res: + return err("failed to insert into database") + except DbError: + return err("failed to insert into database") + + ok() + +proc extractRow(r: Row) : ArchiveDriverResult[ArchiveRow] = + var wakuMessage : WakuMessage + var timestamp : Timestamp + var version : uint + var pubSubTopic : string + var contentTopic : string + var storedAt : int64 + var digest : string + var payload : string + try: + storedAt = parseInt(r[0]) + contentTopic = r[1] + payload = parseHexStr(r[2]) + pubSubTopic = r[3] + version = parseUInt(r[4]) + timestamp = parseInt(r[5]) + digest = parseHexStr(r[6]) + except ValueError: + return err("could not parse timestamp") + wakuMessage.timestamp = timestamp + wakuMessage.version = uint32(version) + wakuMessage.contentTopic = contentTopic + wakuMessage.payload = @(payload.toOpenArrayByte(0, payload.high)) + + ok((pubSubTopic, wakuMessage, @(digest.toOpenArrayByte(0, digest.high)), storedAt)) + + +method getAllMessages*(s: PostgresDriver): ArchiveDriverResult[seq[ArchiveRow]] = + ## Retrieve all messages from the store. + var rows : seq[Row] + var results : seq[ArchiveRow] + try: + rows = s.connection.getAllRows(sql("""SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id FROM messages ORDER BY storedAt ASC""")) + except DbError: + return err("failed to query rows") + + for r in rows: + let rowRes = extractRow(r) + if rowRes.isErr(): + return err("failed to extract row") + + results.add(rowRes.get()) + + ok(results) + +method getMessages*( + s: PostgresDriver, + contentTopic: seq[ContentTopic] = @[], + pubsubTopic = none(PubsubTopic), + cursor = none(ArchiveCursor), + startTime = none(Timestamp), + endTime = none(Timestamp), + maxPageSize = DefaultPageSize, + ascendingOrder = true +): ArchiveDriverResult[seq[ArchiveRow]] = + var query = """SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id FROM messages""" + var statements : seq[string] + var args : seq[string] + + if contentTopic.len > 0: + let cstmt = "contentTopic IN (" & "?".repeat(contentTopic.len).join(",") & ")" + statements.add(cstmt) + for t in contentTopic: + args.add(t) + + if pubsubTopic.isSome(): + statements.add("pubsubTopic = ?") + args.add(pubsubTopic.get()) + + if cursor.isSome(): + let comp = if ascendingOrder: ">" else: "<" + statements.add("(storedAt, id) " & comp & " (?,?)") + args.add($cursor.get().storeTime) + args.add($cursor.get().digest.data) + + if startTime.isSome(): + statements.add("storedAt >= ?") + args.add($startTime.get()) + + if endTime.isSome(): + statements.add("storedAt <= ?") + args.add($endTime.get()) + + if statements.len > 0: + query &= " WHERE " & statements.join(" AND ") + + var direction : string + if ascendingOrder: + direction = "ASC" + else: + direction = "DESC" + + query &= " ORDER BY storedAt " & direction & ", id " & direction + + query &= " LIMIT ?" + args.add($maxPageSize) + + var rows : seq[Row] + var results : seq[ArchiveRow] + try: + rows = s.connection.getAllRows(sql(query), args) + except DbError: + return err("failed to query rows") + + for r in rows: + let rowRes = extractRow(r) + if rowRes.isErr(): + return err("failed to extract row") + + results.add(rowRes.get()) + + ok(results) + +method getMessagesCount*(s: PostgresDriver): ArchiveDriverResult[int64] = + var count : int64 + try: + let row = s.connection.getRow(sql("""SELECT COUNT(1) FROM messages""")) + count = parseInt(row[0]) + + except DbError: + return err("failed to query count") + except ValueError: + return err("failed to parse query count result") + + ok(count) + + +method getOldestMessageTimestamp*(s: PostgresDriver): ArchiveDriverResult[Timestamp] = + return err("not implemented") + +method getNewestMessageTimestamp*(s: PostgresDriver): ArchiveDriverResult[Timestamp] = + return err("not implemented") + + +method deleteMessagesOlderThanTimestamp*(s: PostgresDriver, ts: Timestamp): ArchiveDriverResult[void] = + return err("not implemented") + +method deleteOldestMessagesNotWithinLimit*(s: PostgresDriver, limit: int): ArchiveDriverResult[void] = + return err("not implemented") + +