message caching #326

Merged
sarah merged 6 commits from cache into trunk 2022-01-20 21:22:50 +00:00
12 changed files with 226 additions and 106 deletions

View File

@ -1 +1 @@
2022-01-19-16-16-v1.5.4-11-g84d451f
2022-01-20-12-53-v1.5.4-14-g6865ec1

View File

@ -1 +1 @@
2022-01-19-21-15-v1.5.4-11-g84d451f
2022-01-20-17-53-v1.5.4-14-g6865ec1

View File

@ -143,25 +143,20 @@ class CwtchNotifier {
var senderHandle = data['RemotePeer'];
var senderImage = data['Picture'];
var isAuto = data['Auto'] == "true";
String? contenthash = data['ContentHash'];
var selectedConversation = appState.selectedProfile == data["ProfileOnion"] && appState.selectedConversation == identifier;
// We might not have received a contact created for this contact yet...
// In that case the **next** event we receive will actually update these values...
if (profileCN.getProfile(data["ProfileOnion"])?.contactList.getContact(identifier) != null) {
if (appState.selectedProfile != data["ProfileOnion"] || appState.selectedConversation != identifier) {
profileCN.getProfile(data["ProfileOnion"])?.contactList.getContact(identifier)!.unreadMessages++;
} else {
profileCN.getProfile(data["ProfileOnion"])?.contactList.getContact(identifier)!.newMarker++;
}
profileCN.getProfile(data["ProfileOnion"])?.contactList.updateLastMessageTime(identifier, DateTime.now());
profileCN.getProfile(data["ProfileOnion"])?.contactList.getContact(identifier)!.updateMessageCache(identifier, messageID, timestamp, senderHandle, senderImage, isAuto, data["Data"]);
// We only ever see messages from authenticated peers.
// If the contact is marked as offline then override this - can happen when the contact is removed from the front
// end during syncing.
if (profileCN.getProfile(data["ProfileOnion"])?.contactList.getContact(identifier)!.isOnline() == false) {
profileCN.getProfile(data["ProfileOnion"])?.contactList.getContact(identifier)!.status = "Authenticated";
}
}
profileCN.getProfile(data["ProfileOnion"])?.contactList.newMessage(
identifier,
messageID,
timestamp,
senderHandle,
senderImage,
isAuto,
data["Data"],
contenthash,
selectedConversation,
);
break;
case "PeerAcknowledgement":
@ -200,18 +195,11 @@ class CwtchNotifier {
var timestampSent = DateTime.tryParse(data['TimestampSent'])!;
var currentTotal = profileCN.getProfile(data["ProfileOnion"])?.contactList.getContact(identifier)!.totalMessages;
var isAuto = data['Auto'] == "true";
String? contenthash = data['ContentHash'];
var selectedConversation = appState.selectedProfile == data["ProfileOnion"] && appState.selectedConversation == identifier;
// Only bother to do anything if we know about the group and the provided index is greater than our current total...
if (currentTotal != null && idx >= currentTotal) {
profileCN.getProfile(data["ProfileOnion"])?.contactList.getContact(identifier)!.updateMessageCache(identifier, idx, timestampSent, senderHandle, senderImage, isAuto, data["Data"]);
//if not currently open
if (appState.selectedProfile != data["ProfileOnion"] || appState.selectedConversation != identifier) {
profileCN.getProfile(data["ProfileOnion"])?.contactList.getContact(identifier)!.unreadMessages++;
} else {
profileCN.getProfile(data["ProfileOnion"])?.contactList.getContact(identifier)!.newMarker++;
}
// TODO: There are 2 timestamps associated with a new group message - time sent and time received.
// Sent refers to the time a profile alleges they sent a message
// Received refers to the time we actually saw the message from the server
@ -222,7 +210,8 @@ class CwtchNotifier {
// For now we perform some minimal checks on the sent timestamp to use to provide a useful ordering for honest contacts
// and ensure that malicious contacts in groups can only set this timestamp to a value within the range of `last seen message time`
// and `local now`.
profileCN.getProfile(data["ProfileOnion"])?.contactList.updateLastMessageTime(identifier, timestampSent.toLocal());
profileCN.getProfile(data["ProfileOnion"])?.contactList.newMessage(identifier, idx, timestampSent, senderHandle, senderImage, isAuto, data["Data"], contenthash, selectedConversation);
notificationManager.notify("New Message From Group!");
}
} else {

View File

@ -19,9 +19,9 @@ class ContactInfoState extends ChangeNotifier {
late int _totalMessages = 0;
late DateTime _lastMessageTime;
late Map<String, GlobalKey<MessageRowState>> keys;
late List<MessageCache?> messageCache;
int _newMarker = 0;
DateTime _newMarkerClearAt = DateTime.now();
late MessageCache messageCache;
// todo: a nicer way to model contacts, groups and other "entities"
late bool _isGroup;
@ -55,7 +55,7 @@ class ContactInfoState extends ChangeNotifier {
this._lastMessageTime = lastMessageTime == null ? DateTime.fromMillisecondsSinceEpoch(0) : lastMessageTime;
this._server = server;
this._archived = archived;
this.messageCache = List.empty(growable: true);
this.messageCache = new MessageCache();
keys = Map<String, GlobalKey<MessageRowState>>();
}
@ -64,6 +64,7 @@ class ContactInfoState extends ChangeNotifier {
String get savePeerHistory => this._savePeerHistory;
String? get acnCircuit => this._acnCircuit;
set acnCircuit(String? acnCircuit) {
this._acnCircuit = acnCircuit;
notifyListeners();
@ -90,6 +91,7 @@ class ContactInfoState extends ChangeNotifier {
}
bool get isGroup => this._isGroup;
set isGroup(bool newVal) {
this._isGroup = newVal;
notifyListeners();
@ -110,12 +112,14 @@ class ContactInfoState extends ChangeNotifier {
}
String get status => this._status;
set status(String newVal) {
this._status = newVal;
notifyListeners();
}
int get unreadMessages => this._unreadMessages;
set unreadMessages(int newVal) {
// don't reset newMarker position when unreadMessages is being cleared
if (newVal > 0) {
@ -149,18 +153,21 @@ class ContactInfoState extends ChangeNotifier {
}
int get totalMessages => this._totalMessages;
set totalMessages(int newVal) {
this._totalMessages = newVal;
notifyListeners();
}
String get imagePath => this._imagePath;
set imagePath(String newVal) {
this._imagePath = newVal;
notifyListeners();
}
DateTime get lastMessageTime => this._lastMessageTime;
set lastMessageTime(DateTime newVal) {
this._lastMessageTime = newVal;
notifyListeners();
@ -197,18 +204,27 @@ class ContactInfoState extends ChangeNotifier {
return ret;
}
void updateMessageCache(int conversation, int messageID, DateTime timestamp, String senderHandle, String senderImage, bool isAuto, String data) {
this.messageCache.insert(0, MessageCache(MessageMetadata(profileOnion, conversation, messageID, timestamp, senderHandle, senderImage, "", {}, false, false, isAuto), data));
this.totalMessages += 1;
}
void newMessage(int identifier, int messageID, DateTime timestamp, String senderHandle, String senderImage, bool isAuto, String data, String? contenthash, bool selectedConversation) {
dan marked this conversation as resolved
Review

why doesn't this notify listeners?

why doesn't this notify listeners?
if (!selectedConversation) {
unreadMessages++;
} else {
newMarker++;
}
void bumpMessageCache() {
this.messageCache.insert(0, null);
this.messageCache.addNew(profileOnion, identifier, messageID, timestamp, senderHandle, senderImage, isAuto, data, contenthash);
this.totalMessages += 1;
// We only ever see messages from authenticated peers.
// If the contact is marked as offline then override this - can happen when the contact is removed from the front
// end during syncing.
if (isOnline() == false) {
status = "Authenticated";
}
notifyListeners();
}
void ackCache(int messageID) {
this.messageCache.firstWhere((element) => element?.metadata.messageID == messageID)?.metadata.ackd = true;
this.messageCache.ackCache(messageID);
notifyListeners();
}
}

View File

@ -122,4 +122,9 @@ class ContactListState extends ChangeNotifier {
int idx = _contacts.indexWhere((element) => element.onion == byHandle);
return idx >= 0 ? _contacts[idx] : null;
}
void newMessage(int identifier, int messageID, DateTime timestamp, String senderHandle, String senderImage, bool isAuto, String data, String? contenthash, bool selectedConversation) {
getContact(identifier)?.newMessage(identifier, messageID, timestamp, senderHandle, senderImage, isAuto, data, contenthash, selectedConversation);
updateLastMessageTime(identifier, DateTime.now());
}
}

View File

@ -1,9 +1,12 @@
import 'dart:convert';
import 'package:cwtch/config.dart';
import 'package:cwtch/cwtch/cwtch.dart';
import 'package:flutter/material.dart';
import 'package:flutter/widgets.dart';
import 'package:provider/provider.dart';
import '../main.dart';
import 'messagecache.dart';
import 'messages/filemessage.dart';
import 'messages/invitemessage.dart';
import 'messages/malformedmessage.dart';
@ -28,7 +31,9 @@ const GroupConversationHandleLength = 32;
abstract class Message {
MessageMetadata getMetadata();
Widget getWidget(BuildContext context, Key key);
Widget getPreviewWidget(BuildContext context);
}
@ -57,29 +62,110 @@ Message compileOverlay(MessageMetadata metadata, String messageData) {
}
}
Future<Message> messageHandler(BuildContext context, String profileOnion, int conversationIdentifier, int index, {bool byID = false}) {
abstract class CacheHandler {
MessageInfo? lookup(MessageCache cache);
Future<dynamic> fetch(Cwtch cwtch, String profileOnion, int conversationIdentifier);
void add(MessageCache cache, MessageInfo messageInfo, String contenthash);
}
class ByIndex implements CacheHandler {
int index;
ByIndex(this.index);
MessageInfo? lookup(MessageCache cache) {
return cache.getByIndex(index);
}
Future<dynamic> fetch(Cwtch cwtch, String profileOnion, int conversationIdentifier) {
return cwtch.GetMessage(profileOnion, conversationIdentifier, index);
}
void add(MessageCache cache, MessageInfo messageInfo, String contenthash) {
cache.add(messageInfo, index, contenthash);
}
}
class ById implements CacheHandler {
int id;
ById(this.id);
MessageInfo? lookup(MessageCache cache) {
return cache.getById(id);
}
Future<dynamic> fetch(Cwtch cwtch, String profileOnion, int conversationIdentifier) {
return cwtch.GetMessageByID(profileOnion, conversationIdentifier, id);
}
void add(MessageCache cache, MessageInfo messageInfo, String contenthash) {
cache.addUnindexed(messageInfo, contenthash);
}
}
class ByContentHash implements CacheHandler {
String hash;
ByContentHash(this.hash);
MessageInfo? lookup(MessageCache cache) {
return cache.getByContentHash(hash);
}
Future<dynamic> fetch(Cwtch cwtch, String profileOnion, int conversationIdentifier) {
return cwtch.GetMessageByContentHash(profileOnion, conversationIdentifier, hash);
}
void add(MessageCache cache, MessageInfo messageInfo, String contenthash) {
cache.addUnindexed(messageInfo, contenthash);
}
}
Future<Message> messageHandler(BuildContext context, String profileOnion, int conversationIdentifier, CacheHandler cacheHandler) {
var malformedMetadata = MessageMetadata(profileOnion, conversationIdentifier, 0, DateTime.now(), "", "", "", <String, String>{}, false, true, false);
// Hit cache
MessageInfo? messageInfo = getMessageInfoFromCache(context, profileOnion, conversationIdentifier, cacheHandler);
if (messageInfo != null) {
return Future.value(compileOverlay(messageInfo.metadata, messageInfo.wrapper));
}
// Fetch and Cache
var messageInfoFuture = fetchAndCacheMessageInfo(context, profileOnion, conversationIdentifier, cacheHandler);
return messageInfoFuture.then((MessageInfo? messageInfo) {
if (messageInfo != null) {
return compileOverlay(messageInfo.metadata, messageInfo.wrapper);
} else {
return MalformedMessage(malformedMetadata);
}
});
}
MessageInfo? getMessageInfoFromCache(BuildContext context, String profileOnion, int conversationIdentifier, CacheHandler cacheHandler) {
// Hit cache
try {
var cache = Provider.of<ProfileInfoState>(context, listen: false).contactList.getContact(conversationIdentifier)?.messageCache;
if (cache != null && cache.length > index) {
if (cache[index] != null) {
return Future.value(compileOverlay(cache[index]!.metadata, cache[index]!.wrapper));
if (cache != null) {
MessageInfo? messageInfo = cacheHandler.lookup(cache);
if (messageInfo != null) {
return messageInfo;
}
}
} catch (e) {
EnvironmentConfig.debugLog("message handler exception on get from cache: $e");
// provider check failed...make an expensive call...
}
return null;
}
Future<MessageInfo?> fetchAndCacheMessageInfo(BuildContext context, String profileOnion, int conversationIdentifier, CacheHandler cacheHandler) {
// Load and cache
try {
Future<dynamic> rawMessageEnvelopeFuture;
if (byID) {
rawMessageEnvelopeFuture = Provider.of<FlwtchState>(context, listen: false).cwtch.GetMessageByID(profileOnion, conversationIdentifier, index);
} else {
rawMessageEnvelopeFuture = Provider.of<FlwtchState>(context, listen: false).cwtch.GetMessage(profileOnion, conversationIdentifier, index);
}
rawMessageEnvelopeFuture = cacheHandler.fetch(Provider.of<FlwtchState>(context, listen: false).cwtch, profileOnion, conversationIdentifier);
return rawMessageEnvelopeFuture.then((dynamic rawMessageEnvelope) {
var metadata = MessageMetadata(profileOnion, conversationIdentifier, index, DateTime.now(), "", "", "", <String, String>{}, false, true, false);
try {
dynamic messageWrapper = jsonDecode(rawMessageEnvelope);
// There are 2 conditions in which this error condition can be met:
@ -94,7 +180,7 @@ Future<Message> messageHandler(BuildContext context, String profileOnion, int co
if (messageWrapper['Message'] == null || messageWrapper['Message'] == '' || messageWrapper['Message'] == '{}') {
return Future.delayed(Duration(seconds: 2), () {
print("Tail recursive call to messageHandler called. This should be a rare event. If you see multiples of this log over a short period of time please log it as a bug.");
return messageHandler(context, profileOnion, conversationIdentifier, -1, byID: byID).then((value) => value);
return fetchAndCacheMessageInfo(context, profileOnion, conversationIdentifier, cacheHandler);
});
}
@ -107,16 +193,25 @@ Future<Message> messageHandler(BuildContext context, String profileOnion, int co
var ackd = messageWrapper['Acknowledged'];
var error = messageWrapper['Error'] != null;
var signature = messageWrapper['Signature'];
metadata = MessageMetadata(profileOnion, conversationIdentifier, messageID, timestamp, senderHandle, senderImage, signature, attributes, ackd, error, false);
var contenthash = messageWrapper['ContentHash'];
var localIndex = messageWrapper['LocalIndex'];
var metadata = MessageMetadata(profileOnion, conversationIdentifier, messageID, timestamp, senderHandle, senderImage, signature, attributes, ackd, error, false);
var messageInfo = new MessageInfo(metadata, messageWrapper['Message']);
return compileOverlay(metadata, messageWrapper['Message']);
var cache = Provider.of<ProfileInfoState>(context, listen: false).contactList.getContact(conversationIdentifier)?.messageCache;
if (cache != null) {
cacheHandler.add(cache, messageInfo, contenthash);
}
return messageInfo;
} catch (e) {
EnvironmentConfig.debugLog("an error! " + e.toString());
return MalformedMessage(metadata);
EnvironmentConfig.debugLog("message handler exception on parse message and cache: " + e.toString());
return null;
}
});
} catch (e) {
return Future.value(MalformedMessage(MessageMetadata(profileOnion, conversationIdentifier, -1, DateTime.now(), "", "", "", <String, String>{}, false, true, false)));
EnvironmentConfig.debugLog("message handler exeption on get message: $e");
return Future.value(null);
}
}
@ -139,12 +234,14 @@ class MessageMetadata extends ChangeNotifier {
dynamic get attributes => this._attributes;
bool get ackd => this._ackd;
set ackd(bool newVal) {
this._ackd = newVal;
notifyListeners();
}
bool get error => this._error;
set error(bool newVal) {
this._error = newVal;
notifyListeners();

View File

@ -1,7 +1,58 @@
import 'message.dart';
class MessageCache {
class MessageInfo {
final MessageMetadata metadata;
final String wrapper;
MessageCache(this.metadata, this.wrapper);
MessageInfo(this.metadata, this.wrapper);
}
class MessageCache {
late Map<int, MessageInfo> cache;
late List<int?> cacheByIndex;
late Map<String, int> cacheByHash;
MessageCache() {
cache = {};
cacheByIndex = List.empty(growable: true);
cacheByHash = {};
}
int get indexedLength => cacheByIndex.length;
MessageInfo? getById(int id) => cache[id];
MessageInfo? getByIndex(int index) {
if (index >= cacheByIndex.length) {
return null;
}
return cache[cacheByIndex[index]];
}
MessageInfo? getByContentHash(String contenthash) => cache[cacheByHash[contenthash]];
void addNew(String profileOnion, int conversation, int messageID, DateTime timestamp, String senderHandle, String senderImage, bool isAuto, String data, String? contenthash) {
this.cache[messageID] = MessageInfo(MessageMetadata(profileOnion, conversation, messageID, timestamp, senderHandle, senderImage, "", {}, false, false, isAuto), data);
this.cacheByIndex.insert(0, messageID);
if (contenthash != null && contenthash != "") {
this.cacheByHash[contenthash] = messageID;
}
}
void add(MessageInfo messageInfo, int index, String? contenthash) {
this.cache[messageInfo.metadata.messageID] = messageInfo;
this.cacheByIndex.insert(index, messageInfo.metadata.messageID);
if (contenthash != null && contenthash != "") {
this.cacheByHash[contenthash] = messageInfo.metadata.messageID;
}
}
void addUnindexed(MessageInfo messageInfo, String? contenthash) {
this.cache[messageInfo.metadata.messageID] = messageInfo;
if (contenthash != null && contenthash != "") {
this.cacheByHash[contenthash] = messageInfo.metadata.messageID;
}
}
void ackCache(int messageID) {
cache[messageID]?.metadata.ackd = true;
}
}

View File

@ -9,6 +9,8 @@ import 'package:flutter/widgets.dart';
import 'package:provider/provider.dart';
import '../../main.dart';
import '../messagecache.dart';
import '../profile.dart';
class QuotedMessageStructure {
final String quotedHash;
@ -21,22 +23,6 @@ class QuotedMessageStructure {
};
}
class LocallyIndexedMessage {
final dynamic message;
final int index;
LocallyIndexedMessage(this.message, this.index);
LocallyIndexedMessage.fromJson(Map<String, dynamic> json)
: message = json['Message'],
index = json['LocalIndex'];
Map<String, dynamic> toJson() => {
'Message': message,
'LocalIndex': index,
};
}
class QuotedMessage extends Message {
final MessageMetadata metadata;
final String content;
@ -70,35 +56,10 @@ class QuotedMessage extends Message {
return MalformedBubble();
}
var quotedMessagePotentials = Provider.of<FlwtchState>(context).cwtch.GetMessageByContentHash(metadata.profileOnion, metadata.conversationIdentifier, message["quotedHash"]);
Future<LocallyIndexedMessage?> quotedMessage = quotedMessagePotentials.then((matchingMessages) {
if (matchingMessages == "[]") {
return null;
}
// reverse order the messages from newest to oldest and return the
// first matching message where it's index is less than the index of this
// message
try {
var list = (jsonDecode(matchingMessages) as List<dynamic>).map((data) => LocallyIndexedMessage.fromJson(data)).toList();
LocallyIndexedMessage candidate = list.reversed.first;
return candidate;
} catch (e) {
// Malformed Message will be returned...
return null;
}
});
return ChangeNotifierProvider.value(
value: this.metadata,
builder: (bcontext, child) {
return MessageRow(
QuotedMessageBubble(message["body"], quotedMessage.then((LocallyIndexedMessage? localIndex) {
if (localIndex != null) {
return messageHandler(bcontext, metadata.profileOnion, metadata.conversationIdentifier, localIndex.index);
}
return MalformedMessage(this.metadata);
})),
key: key);
return MessageRow(QuotedMessageBubble(message["body"], messageHandler(bcontext, metadata.profileOnion, metadata.conversationIdentifier, ByContentHash(message["quotedHash"]))), key: key);
});
} catch (e) {
return MalformedBubble();

View File

@ -132,7 +132,6 @@ class ProfileInfoState extends ChangeNotifier {
@override
void dispose() {
super.dispose();
print("profileinfostate.dispose()");
}
void updateFrom(String onion, String name, String picture, String contactsJson, String serverJson, bool online) {

View File

@ -225,8 +225,11 @@ class _MessageViewState extends State<MessageView> {
ctrlrCompose.clear();
focusNode.requestFocus();
Future.delayed(const Duration(milliseconds: 80), () {
Provider.of<ProfileInfoState>(context, listen: false).contactList.getContact(Provider.of<ContactInfoState>(context, listen: false).identifier)?.bumpMessageCache();
var profile = Provider.of<ContactInfoState>(context, listen: false).profileOnion;
var identifier = Provider.of<ContactInfoState>(context, listen: false).identifier;
fetchAndCacheMessageInfo(context, profile, identifier, ByIndex(0));
Provider.of<ContactInfoState>(context, listen: false).newMarker++;
Provider.of<ContactInfoState>(context, listen: false).totalMessages += 1;
// Resort the contact list...
Provider.of<ProfileInfoState>(context, listen: false).contactList.updateLastMessageTime(Provider.of<ContactInfoState>(context, listen: false).identifier, DateTime.now());
});
@ -281,8 +284,7 @@ class _MessageViewState extends State<MessageView> {
var children;
if (Provider.of<AppState>(context).selectedConversation != null && Provider.of<AppState>(context).selectedIndex != null) {
var quoted = FutureBuilder(
future:
messageHandler(context, Provider.of<AppState>(context).selectedProfile!, Provider.of<AppState>(context).selectedConversation!, Provider.of<AppState>(context).selectedIndex!, byID: true),
future: messageHandler(context, Provider.of<AppState>(context).selectedProfile!, Provider.of<AppState>(context).selectedConversation!, ById(Provider.of<AppState>(context).selectedIndex!)),
builder: (context, snapshot) {
if (snapshot.hasData) {
var message = snapshot.data! as Message;

View File

@ -83,7 +83,7 @@ class _MessageListState extends State<MessageList> {
var messageIndex = index;
return FutureBuilder(
future: messageHandler(outerContext, profileOnion, contactHandle, messageIndex),
future: messageHandler(outerContext, profileOnion, contactHandle, ByIndex(messageIndex)),
builder: (context, snapshot) {
if (snapshot.hasData) {
var message = snapshot.data as Message;

View File

@ -220,8 +220,8 @@ class MessageRowState extends State<MessageRow> with SingleTickerProviderStateMi
)))));
var mark = Provider.of<ContactInfoState>(context).newMarker;
if (mark > 0 &&
Provider.of<ContactInfoState>(context).messageCache.length > mark &&
Provider.of<ContactInfoState>(context).messageCache[mark - 1]?.metadata.messageID == Provider.of<MessageMetadata>(context).messageID) {
Provider.of<ContactInfoState>(context).messageCache.indexedLength > mark &&
Provider.of<ContactInfoState>(context).messageCache.getByIndex(mark - 1)?.metadata.messageID == Provider.of<MessageMetadata>(context).messageID) {
return Column(crossAxisAlignment: fromMe ? CrossAxisAlignment.end : CrossAxisAlignment.start, children: [Align(alignment: Alignment.center, child: _bubbleNew()), mr]);
} else {
return mr;