Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Re-implement unread counts (again) #8059

Merged
merged 17 commits into from
Sep 2, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,12 @@ def __nonzero__(self) -> bool:
__bool__ = __nonzero__ # python3


@attr.s(slots=True, frozen=True)
# We can't freeze this class, because we need to update it after it's instantiated to
# update its unread count. This is because we calculate the unread count for a room only
# if there are updates for it, which we check after the instance has been created.
# This should not be a big deal because we update the notification counts afterwards as
# well anyway.
@attr.s(slots=True, frozen=False)
class JoinedSyncResult:
room_id = attr.ib(type=str)
timeline = attr.ib(type=TimelineBatch)
Expand All @@ -103,6 +108,7 @@ class JoinedSyncResult:
account_data = attr.ib(type=List[JsonDict])
unread_notifications = attr.ib(type=JsonDict)
summary = attr.ib(type=Optional[JsonDict])
unread_count = attr.ib(type=int)

def __nonzero__(self) -> bool:
"""Make the result appear empty if there are no updates. This is used
Expand Down Expand Up @@ -938,15 +944,10 @@ async def unread_notifs_for_room_id(
receipt_type="m.read",
)

if last_unread_event_id:
notifs = await self.store.get_unread_event_push_actions_by_room_for_user(
room_id, sync_config.user.to_string(), last_unread_event_id
)
return notifs

# There is no new information in this period, so your notification
# count is whatever it was last time.
return None
notifs = await self.store.get_unread_event_push_actions_by_room_for_user(
room_id, sync_config.user.to_string(), last_unread_event_id
)
return notifs

async def generate_sync_result(
self,
Expand Down Expand Up @@ -1894,6 +1895,7 @@ async def _generate_room_entry(
account_data=account_data_events,
unread_notifications=unread_notifications,
summary=summary,
unread_count=0,
)

if room_sync or always_include:
Expand All @@ -1903,6 +1905,8 @@ async def _generate_room_entry(
unread_notifications["notification_count"] = notifs["notify_count"]
unread_notifications["highlight_count"] = notifs["highlight_count"]

room_sync.unread_count = notifs["unread_count"]

sync_result_builder.joined.append(room_sync)

if batch.limited and since_token:
Expand Down
1 change: 1 addition & 0 deletions synapse/rest/client/v2_alpha/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,7 @@ def serialize(events):
result["ephemeral"] = {"events": ephemeral_events}
result["unread_notifications"] = room.unread_notifications
result["summary"] = room.summary
result["org.matrix.msc2654.unread_count"] = room.unread_count

return result

Expand Down
149 changes: 110 additions & 39 deletions synapse/storage/databases/main/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,25 @@ def __init__(self, database: DatabasePool, db_conn, hs):
self._rotate_delay = 3
self._rotate_count = 10000

def _stream_ordering_from_event_id_and_room_id_txn(
self, txn: LoggingTransaction, event_id: str, room_id: str,
) -> int:
"""Retrieve the stream ordering for the given event.

Args:
event_id: The ID of the event to retrieve the stream ordering of.
room_id: The room the event was sent into.

Returns:
The stream ordering for this event.
"""
return self.db_pool.simple_select_one_onecol_txn(
txn=txn,
table="events",
keyvalues={"room_id": room_id, "event_id": event_id},
retcol="stream_ordering",
)

@cachedInlineCallbacks(num_args=3, tree=True, max_entries=5000)
def get_unread_event_push_actions_by_room_for_user(
self, room_id, user_id, last_read_event_id
Expand All @@ -113,19 +132,19 @@ def get_unread_event_push_actions_by_room_for_user(
return ret

def _get_unread_counts_by_receipt_txn(
self, txn, room_id, user_id, last_read_event_id
self, txn, room_id, user_id, last_read_event_id,
):
sql = (
"SELECT stream_ordering"
" FROM events"
" WHERE room_id = ? AND event_id = ?"
)
txn.execute(sql, (room_id, last_read_event_id))
results = txn.fetchall()
if len(results) == 0:
return {"notify_count": 0, "highlight_count": 0}
if last_read_event_id is None:
stream_ordering = self.get_stream_ordering_for_local_membership_txn(
txn, user_id, room_id, Membership.JOIN,
)
else:
stream_ordering = self._stream_ordering_from_event_id_and_room_id_txn(
txn, last_read_event_id, room_id,
)

stream_ordering = results[0][0]
if stream_ordering is None:
return {"notify_count": 0, "unread_count": 0, "highlight_count": 0}

return self._get_unread_counts_by_pos_txn(
txn, room_id, user_id, stream_ordering
Expand All @@ -134,48 +153,100 @@ def _get_unread_counts_by_receipt_txn(
def _get_unread_counts_by_pos_txn(self, txn, room_id, user_id, stream_ordering):

# First get number of notifications.
# We don't need to put a notif=1 clause as all rows always have
# notif=1
sql = (
"SELECT count(*)"
" FROM event_push_actions ea"
" WHERE"
" user_id = ?"
" AND room_id = ?"
" AND stream_ordering > ?"
# We need to look specifically for events with notif = 1 because otherwise we'll
# count unread events (from MSC2654) that might not notify.
notify_count = self._get_count_from_push_actions_txn(
txn=txn,
user_id=user_id,
room_id=room_id,
stream_ordering=stream_ordering,
push_actions_column="notif",
push_summary_column="notif_count",
)

txn.execute(sql, (user_id, room_id, stream_ordering))
row = txn.fetchone()
notify_count = row[0] if row else 0
# Now get the number of highlights
highlight_count = self._get_count_from_push_actions_txn(
txn=txn,
user_id=user_id,
room_id=room_id,
stream_ordering=stream_ordering,
push_actions_column="highlight",
push_summary_column=None,
)

# Finally, get the number of unread messages
unread_count = self._get_count_from_push_actions_txn(
txn=txn,
user_id=user_id,
room_id=room_id,
stream_ordering=stream_ordering,
push_actions_column="unread",
push_summary_column="unread_count",
)

return {
"notify_count": notify_count,
"unread_count": unread_count,
"highlight_count": highlight_count,
}

def _get_count_from_push_actions_txn(
self,
txn: LoggingTransaction,
user_id: str,
room_id: str,
stream_ordering: int,
push_actions_column: str,
push_summary_column: Optional[str],
) -> int:
"""Counts the number of rows in event_push_actions with the given flag set to
true, and adds it up with its matching count in event_push_summary if any.

txn.execute(
"""
SELECT notif_count FROM event_push_summary
WHERE room_id = ? AND user_id = ? AND stream_ordering > ?
""",
(room_id, user_id, stream_ordering),
)
rows = txn.fetchall()
if rows:
notify_count += rows[0][0]
Args:
user_id: The user to calculate the count for.
room_id: The room to calculate the count for.
stream_ordering: The stream ordering to use in the conditional clause when
querying from event_push_actions.
push_actions_column: The column to filter by when querying from
event_push_actions. The filtering will be done on the condition
"[column] = 1".
push_summary_column: The count in event_push_summary to add the results from
the first query to. None if there is no count in the event_push_summary
table to add the results to.

# Now get the number of highlights
Returns:
The desired count.
"""
sql = (
"SELECT count(*)"
" FROM event_push_actions ea"
" WHERE"
" highlight = 1"
" AND user_id = ?"
" user_id = ?"
" AND room_id = ?"
" AND stream_ordering > ?"
" AND %s = 1"
)

txn.execute(sql, (user_id, room_id, stream_ordering))
txn.execute(
sql % push_actions_column,
(user_id, room_id, stream_ordering),
)
row = txn.fetchone()
highlight_count = row[0] if row else 0
count = row[0] if row else 0

if push_summary_column:
txn.execute(
"""
SELECT %s FROM event_push_summary
WHERE room_id = ? AND user_id = ? AND stream_ordering > ?
""" % push_summary_column,
(room_id, user_id, stream_ordering),
)
rows = txn.fetchall()
if rows:
count += rows[0][0]

return {"notify_count": notify_count, "highlight_count": highlight_count}
return count

async def get_push_action_users_in_range(
self, min_stream_ordering, max_stream_ordering
Expand Down
36 changes: 35 additions & 1 deletion synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# limitations under the License.

import logging
from typing import Iterable, List, Set
from typing import Iterable, List, Optional, Set

from twisted.internet import defer

Expand Down Expand Up @@ -886,6 +886,40 @@ def _is_local_host_in_room_ignoring_users_txn(txn):
_is_local_host_in_room_ignoring_users_txn,
)

def get_stream_ordering_for_local_membership_txn(
self,
txn: LoggingTransaction,
user_id: str,
room_id: str,
membership: Membership,
) -> Optional[int]:
"""Get the stream ordering for a given local room membership.

Args:
user_id: The user ID to retrieve the membership for.
room_id: The room ID to retrieve the membership for.
membership: The membership to retrieve the stream ordering for.

Returns:
The stream ordering, or None if the membership wasn't found.
"""
txn.execute(
"""
SELECT stream_ordering FROM local_current_membership
LEFT JOIN events USING (event_id, room_id)
WHERE membership = ?
AND user_id = ?
AND room_id = ?
""",
(membership, user_id, room_id),
)
row = txn.fetchone()

if row is None:
return None

return row[0]


class RoomMemberBackgroundUpdateStore(SQLBaseStore):
def __init__(self, database: DatabasePool, db_conn, hs):
Expand Down