message caching #326

Merged
sarah merged 6 commits from cache into trunk 2022-01-20 21:22:50 +00:00
12 changed files with 226 additions and 106 deletions

View File

@ -1 +1 @@
2022-01-19-16-16-v1.5.4-11-g84d451f
2022-01-20-12-53-v1.5.4-14-g6865ec1

View File

@ -1 +1 @@
2022-01-19-21-15-v1.5.4-11-g84d451f
2022-01-20-17-53-v1.5.4-14-g6865ec1

View File

@ -143,25 +143,20 @@ class CwtchNotifier {
var senderHandle = data['RemotePeer'];
var senderImage = data['Picture'];
var isAuto = data['Auto'] == "true";
String? contenthash = data['ContentHash'];
var selectedConversation = appState.selectedProfile == data["ProfileOnion"] && appState.selectedConversation == identifier;
// We might not have received a contact created for this contact yet...
// In that case the **next** event we receive will actually update these values...
if (profileCN.getProfile(data["ProfileOnion"])?.contactList.getContact(identifier) != null) {
if (appState.selectedProfile != data["ProfileOnion"] || appState.selectedConversation != identifier) {
profileCN.getProfile(data["ProfileOnion"])?.contactList.getContact(identifier)!.unreadMessages++;
} else {
profileCN.getProfile(data["ProfileOnion"])?.contactList.getContact(identifier)!.newMarker++;
}
profileCN.getProfile(data["ProfileOnion"])?.contactList.updateLastMessageTime(identifier, DateTime.now());
profileCN.getProfile(data["ProfileOnion"])?.contactList.getContact(identifier)!.updateMessageCache(identifier, messageID, timestamp, senderHandle, senderImage, isAuto, data["Data"]);
// We only ever see messages from authenticated peers.
// If the contact is marked as offline then override this - can happen when the contact is removed from the front
// end during syncing.
if (profileCN.getProfile(data["ProfileOnion"])?.contactList.getContact(identifier)!.isOnline() == false) {
profileCN.getProfile(data["ProfileOnion"])?.contactList.getContact(identifier)!.status = "Authenticated";
}
}
profileCN.getProfile(data["ProfileOnion"])?.contactList.newMessage(
identifier,
messageID,
timestamp,
senderHandle,
senderImage,
isAuto,
data["Data"],
contenthash,
selectedConversation,
);
break;
case "PeerAcknowledgement":
@ -200,18 +195,11 @@ class CwtchNotifier {
var timestampSent = DateTime.tryParse(data['TimestampSent'])!;
var currentTotal = profileCN.getProfile(data["ProfileOnion"])?.contactList.getContact(identifier)!.totalMessages;
var isAuto = data['Auto'] == "true";
String? contenthash = data['ContentHash'];
var selectedConversation = appState.selectedProfile == data["ProfileOnion"] && appState.selectedConversation == identifier;
// Only bother to do anything if we know about the group and the provided index is greater than our current total...
if (currentTotal != null && idx >= currentTotal) {
profileCN.getProfile(data["ProfileOnion"])?.contactList.getContact(identifier)!.updateMessageCache(identifier, idx, timestampSent, senderHandle, senderImage, isAuto, data["Data"]);
//if not currently open
if (appState.selectedProfile != data["ProfileOnion"] || appState.selectedConversation != identifier) {
profileCN.getProfile(data["ProfileOnion"])?.contactList.getContact(identifier)!.unreadMessages++;
} else {
profileCN.getProfile(data["ProfileOnion"])?.contactList.getContact(identifier)!.newMarker++;
}
// TODO: There are 2 timestamps associated with a new group message - time sent and time received.
// Sent refers to the time a profile alleges they sent a message
// Received refers to the time we actually saw the message from the server
@ -222,7 +210,8 @@ class CwtchNotifier {
// For now we perform some minimal checks on the sent timestamp to use to provide a useful ordering for honest contacts
// and ensure that malicious contacts in groups can only set this timestamp to a value within the range of `last seen message time`
// and `local now`.
profileCN.getProfile(data["ProfileOnion"])?.contactList.updateLastMessageTime(identifier, timestampSent.toLocal());
profileCN.getProfile(data["ProfileOnion"])?.contactList.newMessage(identifier, idx, timestampSent, senderHandle, senderImage, isAuto, data["Data"], contenthash, selectedConversation);
notificationManager.notify("New Message From Group!");
}
} else {

View File

@ -19,9 +19,9 @@ class ContactInfoState extends ChangeNotifier {
late int _totalMessages = 0;
late DateTime _lastMessageTime;
late Map<String, GlobalKey<MessageRowState>> keys;
late List<MessageCache?> messageCache;
int _newMarker = 0;
DateTime _newMarkerClearAt = DateTime.now();
late MessageCache messageCache;
// todo: a nicer way to model contacts, groups and other "entities"
late bool _isGroup;
@ -55,7 +55,7 @@ class ContactInfoState extends ChangeNotifier {
this._lastMessageTime = lastMessageTime == null ? DateTime.fromMillisecondsSinceEpoch(0) : lastMessageTime;
this._server = server;
this._archived = archived;
this.messageCache = List.empty(growable: true);
this.messageCache = new MessageCache();
keys = Map<String, GlobalKey<MessageRowState>>();
}
@ -64,6 +64,7 @@ class ContactInfoState extends ChangeNotifier {
String get savePeerHistory => this._savePeerHistory;
String? get acnCircuit => this._acnCircuit;
set acnCircuit(String? acnCircuit) {
this._acnCircuit = acnCircuit;
notifyListeners();
@ -90,6 +91,7 @@ class ContactInfoState extends ChangeNotifier {
}
bool get isGroup => this._isGroup;
set isGroup(bool newVal) {
this._isGroup = newVal;
notifyListeners();
@ -110,12 +112,14 @@ class ContactInfoState extends ChangeNotifier {
}
String get status => this._status;
set status(String newVal) {
this._status = newVal;
notifyListeners();
}
int get unreadMessages => this._unreadMessages;
set unreadMessages(int newVal) {
// don't reset newMarker position when unreadMessages is being cleared
if (newVal > 0) {
@ -149,18 +153,21 @@ class ContactInfoState extends ChangeNotifier {
}
int get totalMessages => this._totalMessages;
set totalMessages(int newVal) {
this._totalMessages = newVal;
notifyListeners();
}
String get imagePath => this._imagePath;
set imagePath(String newVal) {
this._imagePath = newVal;
notifyListeners();
}
DateTime get lastMessageTime => this._lastMessageTime;
set lastMessageTime(DateTime newVal) {
this._lastMessageTime = newVal;
notifyListeners();
@ -197,18 +204,27 @@ class ContactInfoState extends ChangeNotifier {
return ret;
}
void updateMessageCache(int conversation, int messageID, DateTime timestamp, String senderHandle, String senderImage, bool isAuto, String data) {
this.messageCache.insert(0, MessageCache(MessageMetadata(profileOnion, conversation, messageID, timestamp, senderHandle, senderImage, "", {}, false, false, isAuto), data));
this.totalMessages += 1;
}
void newMessage(int identifier, int messageID, DateTime timestamp, String senderHandle, String senderImage, bool isAuto, String data, String? contenthash, bool selectedConversation) {
dan marked this conversation as resolved
Review

why doesn't this notify listeners?

why doesn't this notify listeners?
if (!selectedConversation) {
unreadMessages++;
} else {
newMarker++;
}
void bumpMessageCache() {
this.messageCache.insert(0, null);
this.messageCache.addNew(profileOnion, identifier, messageID, timestamp, senderHandle, senderImage, isAuto, data, contenthash);
this.totalMessages += 1;
// We only ever see messages from authenticated peers.
// If the contact is marked as offline then override this - can happen when the contact is removed from the front
// end during syncing.
if (isOnline() == false) {
status = "Authenticated";
}
notifyListeners();
}
void ackCache(int messageID) {
this.messageCache.firstWhere((element) => element?.metadata.messageID == messageID)?.metadata.ackd = true;
this.messageCache.ackCache(messageID);
notifyListeners();
}
}

View File

@ -122,4 +122,9 @@ class ContactListState extends ChangeNotifier {
int idx = _contacts.indexWhere((element) => element.onion == byHandle);
return idx >= 0 ? _contacts[idx] : null;
}
void newMessage(int identifier, int messageID, DateTime timestamp, String senderHandle, String senderImage, bool isAuto, String data, String? contenthash, bool selectedConversation) {
getContact(identifier)?.newMessage(identifier, messageID, timestamp, senderHandle, senderImage, isAuto, data, contenthash, selectedConversation);
updateLastMessageTime(identifier, DateTime.now());
}
}

View File

@ -1,9 +1,12 @@
import 'dart:convert';
import 'package:cwtch/config.dart';
import 'package:cwtch/cwtch/cwtch.dart';
import 'package:flutter/material.dart';
import 'package:flutter/widgets.dart';
import 'package:provider/provider.dart';
import '../main.dart';
import 'messagecache.dart';
import 'messages/filemessage.dart';
import 'messages/invitemessage.dart';
import 'messages/malformedmessage.dart';
@ -28,7 +31,9 @@ const GroupConversationHandleLength = 32;
abstract class Message {
MessageMetadata getMetadata();
Widget getWidget(BuildContext context, Key key);
Widget getPreviewWidget(BuildContext context);
}
@ -57,29 +62,110 @@ Message compileOverlay(MessageMetadata metadata, String messageData) {
}
}
Future<Message> messageHandler(BuildContext context, String profileOnion, int conversationIdentifier, int index, {bool byID = false}) {
abstract class CacheHandler {
MessageInfo? lookup(MessageCache cache);
Future<dynamic> fetch(Cwtch cwtch, String profileOnion, int conversationIdentifier);
dan marked this conversation as resolved Outdated
Outdated
Review

Use an enum to have the compiler do this check

Use an enum to have the compiler do this check
void add(MessageCache cache, MessageInfo messageInfo, String contenthash);
}
class ByIndex implements CacheHandler {
dan marked this conversation as resolved Outdated
Outdated
Review

Use types to have the compiler enforce this coupling

Use types to have the compiler enforce this coupling
int index;
ByIndex(this.index);
MessageInfo? lookup(MessageCache cache) {
return cache.getByIndex(index);
}
Future<dynamic> fetch(Cwtch cwtch, String profileOnion, int conversationIdentifier) {
return cwtch.GetMessage(profileOnion, conversationIdentifier, index);
}
void add(MessageCache cache, MessageInfo messageInfo, String contenthash) {
cache.add(messageInfo, index, contenthash);
}
}
class ById implements CacheHandler {
int id;
ById(this.id);
MessageInfo? lookup(MessageCache cache) {
return cache.getById(id);
}
Future<dynamic> fetch(Cwtch cwtch, String profileOnion, int conversationIdentifier) {
return cwtch.GetMessageByID(profileOnion, conversationIdentifier, id);
}
void add(MessageCache cache, MessageInfo messageInfo, String contenthash) {
cache.addUnindexed(messageInfo, contenthash);
}
}
class ByContentHash implements CacheHandler {
String hash;
ByContentHash(this.hash);
MessageInfo? lookup(MessageCache cache) {
return cache.getByContentHash(hash);
}
Future<dynamic> fetch(Cwtch cwtch, String profileOnion, int conversationIdentifier) {
return cwtch.GetMessageByContentHash(profileOnion, conversationIdentifier, hash);
}
void add(MessageCache cache, MessageInfo messageInfo, String contenthash) {
cache.addUnindexed(messageInfo, contenthash);
}
}
Future<Message> messageHandler(BuildContext context, String profileOnion, int conversationIdentifier, CacheHandler cacheHandler) {
var malformedMetadata = MessageMetadata(profileOnion, conversationIdentifier, 0, DateTime.now(), "", "", "", <String, String>{}, false, true, false);
// Hit cache
MessageInfo? messageInfo = getMessageInfoFromCache(context, profileOnion, conversationIdentifier, cacheHandler);
if (messageInfo != null) {
return Future.value(compileOverlay(messageInfo.metadata, messageInfo.wrapper));
}
// Fetch and Cache
var messageInfoFuture = fetchAndCacheMessageInfo(context, profileOnion, conversationIdentifier, cacheHandler);
return messageInfoFuture.then((MessageInfo? messageInfo) {
if (messageInfo != null) {
return compileOverlay(messageInfo.metadata, messageInfo.wrapper);
} else {
return MalformedMessage(malformedMetadata);
}
});
}
MessageInfo? getMessageInfoFromCache(BuildContext context, String profileOnion, int conversationIdentifier, CacheHandler cacheHandler) {
// Hit cache
try {
var cache = Provider.of<ProfileInfoState>(context, listen: false).contactList.getContact(conversationIdentifier)?.messageCache;
if (cache != null && cache.length > index) {
if (cache[index] != null) {
return Future.value(compileOverlay(cache[index]!.metadata, cache[index]!.wrapper));
if (cache != null) {
MessageInfo? messageInfo = cacheHandler.lookup(cache);
if (messageInfo != null) {
return messageInfo;
}
}
} catch (e) {
EnvironmentConfig.debugLog("message handler exception on get from cache: $e");
// provider check failed...make an expensive call...
}
return null;
}
Future<MessageInfo?> fetchAndCacheMessageInfo(BuildContext context, String profileOnion, int conversationIdentifier, CacheHandler cacheHandler) {
// Load and cache
try {
Future<dynamic> rawMessageEnvelopeFuture;
if (byID) {
rawMessageEnvelopeFuture = Provider.of<FlwtchState>(context, listen: false).cwtch.GetMessageByID(profileOnion, conversationIdentifier, index);
} else {
rawMessageEnvelopeFuture = Provider.of<FlwtchState>(context, listen: false).cwtch.GetMessage(profileOnion, conversationIdentifier, index);
}
rawMessageEnvelopeFuture = cacheHandler.fetch(Provider.of<FlwtchState>(context, listen: false).cwtch, profileOnion, conversationIdentifier);
return rawMessageEnvelopeFuture.then((dynamic rawMessageEnvelope) {
var metadata = MessageMetadata(profileOnion, conversationIdentifier, index, DateTime.now(), "", "", "", <String, String>{}, false, true, false);
try {
dynamic messageWrapper = jsonDecode(rawMessageEnvelope);
// There are 2 conditions in which this error condition can be met:
@ -94,7 +180,7 @@ Future<Message> messageHandler(BuildContext context, String profileOnion, int co
if (messageWrapper['Message'] == null || messageWrapper['Message'] == '' || messageWrapper['Message'] == '{}') {
return Future.delayed(Duration(seconds: 2), () {
print("Tail recursive call to messageHandler called. This should be a rare event. If you see multiples of this log over a short period of time please log it as a bug.");
return messageHandler(context, profileOnion, conversationIdentifier, -1, byID: byID).then((value) => value);
return fetchAndCacheMessageInfo(context, profileOnion, conversationIdentifier, cacheHandler);
});
}
@ -107,16 +193,25 @@ Future<Message> messageHandler(BuildContext context, String profileOnion, int co
var ackd = messageWrapper['Acknowledged'];
var error = messageWrapper['Error'] != null;
var signature = messageWrapper['Signature'];
metadata = MessageMetadata(profileOnion, conversationIdentifier, messageID, timestamp, senderHandle, senderImage, signature, attributes, ackd, error, false);
var contenthash = messageWrapper['ContentHash'];
var localIndex = messageWrapper['LocalIndex'];
var metadata = MessageMetadata(profileOnion, conversationIdentifier, messageID, timestamp, senderHandle, senderImage, signature, attributes, ackd, error, false);
var messageInfo = new MessageInfo(metadata, messageWrapper['Message']);
return compileOverlay(metadata, messageWrapper['Message']);
var cache = Provider.of<ProfileInfoState>(context, listen: false).contactList.getContact(conversationIdentifier)?.messageCache;
if (cache != null) {
cacheHandler.add(cache, messageInfo, contenthash);
}
return messageInfo;
} catch (e) {
EnvironmentConfig.debugLog("an error! " + e.toString());
return MalformedMessage(metadata);
EnvironmentConfig.debugLog("message handler exception on parse message and cache: " + e.toString());
return null;
}
});
} catch (e) {
return Future.value(MalformedMessage(MessageMetadata(profileOnion, conversationIdentifier, -1, DateTime.now(), "", "", "", <String, String>{}, false, true, false)));
EnvironmentConfig.debugLog("message handler exeption on get message: $e");
return Future.value(null);
}
}
@ -139,12 +234,14 @@ class MessageMetadata extends ChangeNotifier {
dynamic get attributes => this._attributes;
bool get ackd => this._ackd;
set ackd(bool newVal) {
this._ackd = newVal;
notifyListeners();
}
bool get error => this._error;
set error(bool newVal) {
this._error = newVal;
notifyListeners();

View File

@ -1,7 +1,58 @@
import 'message.dart';
class MessageCache {
class MessageInfo {
final MessageMetadata metadata;
final String wrapper;
MessageCache(this.metadata, this.wrapper);
MessageInfo(this.metadata, this.wrapper);
}
class MessageCache {
late Map<int, MessageInfo> cache;
late List<int?> cacheByIndex;
late Map<String, int> cacheByHash;
MessageCache() {
cache = {};
cacheByIndex = List.empty(growable: true);
cacheByHash = {};
}
int get indexedLength => cacheByIndex.length;
MessageInfo? getById(int id) => cache[id];
MessageInfo? getByIndex(int index) {
if (index >= cacheByIndex.length) {
return null;
}
return cache[cacheByIndex[index]];
}
MessageInfo? getByContentHash(String contenthash) => cache[cacheByHash[contenthash]];
void addNew(String profileOnion, int conversation, int messageID, DateTime timestamp, String senderHandle, String senderImage, bool isAuto, String data, String? contenthash) {
this.cache[messageID] = MessageInfo(MessageMetadata(profileOnion, conversation, messageID, timestamp, senderHandle, senderImage, "", {}, false, false, isAuto), data);
this.cacheByIndex.insert(0, messageID);
if (contenthash != null && contenthash != "") {
this.cacheByHash[contenthash] = messageID;
}
}
void add(MessageInfo messageInfo, int index, String? contenthash) {
this.cache[messageInfo.metadata.messageID] = messageInfo;
this.cacheByIndex.insert(index, messageInfo.metadata.messageID);
if (contenthash != null && contenthash != "") {
this.cacheByHash[contenthash] = messageInfo.metadata.messageID;
}
}
void addUnindexed(MessageInfo messageInfo, String? contenthash) {
this.cache[messageInfo.metadata.messageID] = messageInfo;
if (contenthash != null && contenthash != "") {
this.cacheByHash[contenthash] = messageInfo.metadata.messageID;
}
}
void ackCache(int messageID) {
cache[messageID]?.metadata.ackd = true;
}
}

View File

@ -9,6 +9,8 @@ import 'package:flutter/widgets.dart';
import 'package:provider/provider.dart';
import '../../main.dart';
import '../messagecache.dart';
import '../profile.dart';
class QuotedMessageStructure {
final String quotedHash;
@ -21,22 +23,6 @@ class QuotedMessageStructure {
};
}
class LocallyIndexedMessage {
final dynamic message;
final int index;
LocallyIndexedMessage(this.message, this.index);
LocallyIndexedMessage.fromJson(Map<String, dynamic> json)
: message = json['Message'],
index = json['LocalIndex'];
Map<String, dynamic> toJson() => {
'Message': message,
'LocalIndex': index,
};
}
class QuotedMessage extends Message {
final MessageMetadata metadata;
final String content;
@ -70,35 +56,10 @@ class QuotedMessage extends Message {
return MalformedBubble();
}
var quotedMessagePotentials = Provider.of<FlwtchState>(context).cwtch.GetMessageByContentHash(metadata.profileOnion, metadata.conversationIdentifier, message["quotedHash"]);
Future<LocallyIndexedMessage?> quotedMessage = quotedMessagePotentials.then((matchingMessages) {
if (matchingMessages == "[]") {
return null;
}
// reverse order the messages from newest to oldest and return the
// first matching message where it's index is less than the index of this
// message
try {
var list = (jsonDecode(matchingMessages) as List<dynamic>).map((data) => LocallyIndexedMessage.fromJson(data)).toList();
LocallyIndexedMessage candidate = list.reversed.first;
return candidate;
} catch (e) {
// Malformed Message will be returned...
return null;
}
});
return ChangeNotifierProvider.value(
value: this.metadata,
builder: (bcontext, child) {
return MessageRow(
QuotedMessageBubble(message["body"], quotedMessage.then((LocallyIndexedMessage? localIndex) {
if (localIndex != null) {
return messageHandler(bcontext, metadata.profileOnion, metadata.conversationIdentifier, localIndex.index);
}
return MalformedMessage(this.metadata);
})),
key: key);
return MessageRow(QuotedMessageBubble(message["body"], messageHandler(bcontext, metadata.profileOnion, metadata.conversationIdentifier, ByContentHash(message["quotedHash"]))), key: key);
});
} catch (e) {
return MalformedBubble();

View File

@ -132,7 +132,6 @@ class ProfileInfoState extends ChangeNotifier {
@override
void dispose() {
super.dispose();
print("profileinfostate.dispose()");
}
void updateFrom(String onion, String name, String picture, String contactsJson, String serverJson, bool online) {

View File

@ -225,8 +225,11 @@ class _MessageViewState extends State<MessageView> {
ctrlrCompose.clear();
focusNode.requestFocus();
Future.delayed(const Duration(milliseconds: 80), () {
Provider.of<ProfileInfoState>(context, listen: false).contactList.getContact(Provider.of<ContactInfoState>(context, listen: false).identifier)?.bumpMessageCache();
var profile = Provider.of<ContactInfoState>(context, listen: false).profileOnion;
var identifier = Provider.of<ContactInfoState>(context, listen: false).identifier;
fetchAndCacheMessageInfo(context, profile, identifier, ByIndex(0));
Provider.of<ContactInfoState>(context, listen: false).newMarker++;
Provider.of<ContactInfoState>(context, listen: false).totalMessages += 1;
// Resort the contact list...
Provider.of<ProfileInfoState>(context, listen: false).contactList.updateLastMessageTime(Provider.of<ContactInfoState>(context, listen: false).identifier, DateTime.now());
});
@ -281,8 +284,7 @@ class _MessageViewState extends State<MessageView> {
var children;
if (Provider.of<AppState>(context).selectedConversation != null && Provider.of<AppState>(context).selectedIndex != null) {
var quoted = FutureBuilder(
future:
messageHandler(context, Provider.of<AppState>(context).selectedProfile!, Provider.of<AppState>(context).selectedConversation!, Provider.of<AppState>(context).selectedIndex!, byID: true),
future: messageHandler(context, Provider.of<AppState>(context).selectedProfile!, Provider.of<AppState>(context).selectedConversation!, ById(Provider.of<AppState>(context).selectedIndex!)),
builder: (context, snapshot) {
if (snapshot.hasData) {
var message = snapshot.data! as Message;

View File

@ -83,7 +83,7 @@ class _MessageListState extends State<MessageList> {
var messageIndex = index;
return FutureBuilder(
future: messageHandler(outerContext, profileOnion, contactHandle, messageIndex),
future: messageHandler(outerContext, profileOnion, contactHandle, ByIndex(messageIndex)),
builder: (context, snapshot) {
if (snapshot.hasData) {
var message = snapshot.data as Message;

View File

@ -220,8 +220,8 @@ class MessageRowState extends State<MessageRow> with SingleTickerProviderStateMi
)))));
var mark = Provider.of<ContactInfoState>(context).newMarker;
if (mark > 0 &&
Provider.of<ContactInfoState>(context).messageCache.length > mark &&
Provider.of<ContactInfoState>(context).messageCache[mark - 1]?.metadata.messageID == Provider.of<MessageMetadata>(context).messageID) {
Provider.of<ContactInfoState>(context).messageCache.indexedLength > mark &&
Provider.of<ContactInfoState>(context).messageCache.getByIndex(mark - 1)?.metadata.messageID == Provider.of<MessageMetadata>(context).messageID) {
return Column(crossAxisAlignment: fromMe ? CrossAxisAlignment.end : CrossAxisAlignment.start, children: [Align(alignment: Alignment.center, child: _bubbleNew()), mr]);
} else {
return mr;