-
Notifications
You must be signed in to change notification settings - Fork 60
/
Copy pathCassandraStatements.scala
111 lines (95 loc) · 3.45 KB
/
CassandraStatements.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package akka.persistence.cassandra.journal
trait CassandraStatements {
def config: CassandraJournalConfig
def createKeyspace = s"""
CREATE KEYSPACE IF NOT EXISTS ${config.keyspace}
WITH REPLICATION = { 'class' : ${config.replicationStrategy} }
"""
def createConfigTable = s"""
CREATE TABLE IF NOT EXISTS ${configTableName} (
property text primary key, value text)
"""
def createTable = s"""
CREATE TABLE IF NOT EXISTS ${tableName} (
used boolean static,
persistence_id text,
partition_nr bigint,
sequence_nr bigint,
timestamp timeuuid,
timebucket text,
message blob,
tag1 text,
tag2 text,
tag3 text,
PRIMARY KEY ((persistence_id, partition_nr), sequence_nr, timestamp, timebucket))
WITH gc_grace_seconds =${config.gc_grace_seconds}
AND compaction = ${config.tableCompactionStrategy.asCQL}
"""
def createMetatdataTable = s"""
CREATE TABLE IF NOT EXISTS ${metadataTableName}(
persistence_id text PRIMARY KEY,
deleted_to bigint,
properties map<text,text>)
"""
def createEventsByTagMaterializedView(tagId: Int) = s"""
CREATE MATERIALIZED VIEW IF NOT EXISTS $eventsByTagViewName$tagId AS
SELECT tag$tagId, timebucket, timestamp, persistence_id, partition_nr, sequence_nr, message
FROM $tableName
WHERE persistence_id IS NOT NULL AND partition_nr IS NOT NULL AND sequence_nr IS NOT NULL
AND tag$tagId IS NOT NULL AND timestamp IS NOT NULL AND timebucket IS NOT NULL
PRIMARY KEY ((tag$tagId, timebucket), timestamp, persistence_id, partition_nr, sequence_nr)
WITH CLUSTERING ORDER BY (timestamp ASC)
"""
def writeMessage = s"""
INSERT INTO ${tableName} (persistence_id, partition_nr, sequence_nr, timestamp, timebucket, tag1, tag2, tag3, message, used)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, true)
"""
def deleteMessage = s"""
DELETE FROM ${tableName} WHERE
persistence_id = ? AND
partition_nr = ? AND
sequence_nr = ?
"""
def selectMessages = s"""
SELECT * FROM ${tableName} WHERE
persistence_id = ? AND
partition_nr = ? AND
sequence_nr >= ? AND
sequence_nr <= ?
"""
def selectInUse = s"""
SELECT used from ${tableName} WHERE
persistence_id = ? AND
partition_nr = ?
"""
def selectConfig = s"""
SELECT * FROM ${configTableName}
"""
def writeConfig = s"""
INSERT INTO ${configTableName}(property, value) VALUES(?, ?) IF NOT EXISTS
"""
def selectHighestSequenceNr = s"""
SELECT sequence_nr, used FROM ${tableName} WHERE
persistence_id = ? AND
partition_nr = ?
ORDER BY sequence_nr
DESC LIMIT 1
"""
def selectDeletedTo = s"""
SELECT deleted_to FROM ${metadataTableName} WHERE
persistence_id = ?
"""
def insertDeletedTo = s"""
INSERT INTO ${metadataTableName} (persistence_id, deleted_to)
VALUES ( ?, ? )
"""
def writeInUse =
s"""
INSERT INTO ${tableName} (persistence_id, partition_nr, used)
VALUES(?, ?, true)
"""
private def tableName = s"${config.keyspace}.${config.table}"
private def configTableName = s"${config.keyspace}.${config.configTable}"
private def metadataTableName = s"${config.keyspace}.${config.metadataTable}"
private def eventsByTagViewName = s"${config.keyspace}.${config.eventsByTagView}"
}