message caching #326

Merged
sarah merged 6 commits from cache into trunk 2022-01-20 21:22:50 +00:00
10 changed files with 202 additions and 109 deletions
Showing only changes of commit 793b6e2e1a - Show all commits

View File

@ -143,25 +143,10 @@ class CwtchNotifier {
var senderHandle = data['RemotePeer'];
var senderImage = data['Picture'];
var isAuto = data['Auto'] == "true";
String? contenthash = data['ContentHash'];
var selectedConversation = appState.selectedProfile == data["ProfileOnion"] && appState.selectedConversation == identifier;
// We might not have received a contact created for this contact yet...
// In that case the **next** event we receive will actually update these values...
if (profileCN.getProfile(data["ProfileOnion"])?.contactList.getContact(identifier) != null) {
if (appState.selectedProfile != data["ProfileOnion"] || appState.selectedConversation != identifier) {
profileCN.getProfile(data["ProfileOnion"])?.contactList.getContact(identifier)!.unreadMessages++;
} else {
profileCN.getProfile(data["ProfileOnion"])?.contactList.getContact(identifier)!.newMarker++;
}
profileCN.getProfile(data["ProfileOnion"])?.contactList.updateLastMessageTime(identifier, DateTime.now());
profileCN.getProfile(data["ProfileOnion"])?.contactList.getContact(identifier)!.updateMessageCache(identifier, messageID, timestamp, senderHandle, senderImage, isAuto, data["Data"]);
// We only ever see messages from authenticated peers.
// If the contact is marked as offline then override this - can happen when the contact is removed from the front
// end during syncing.
if (profileCN.getProfile(data["ProfileOnion"])?.contactList.getContact(identifier)!.isOnline() == false) {
profileCN.getProfile(data["ProfileOnion"])?.contactList.getContact(identifier)!.status = "Authenticated";
}
}
profileCN.getProfile(data["ProfileOnion"])?.contactList.newMessage(identifier, messageID, timestamp, senderHandle, senderImage, isAuto, data["Data"], contenthash, selectedConversation, );
break;
case "PeerAcknowledgement":
@ -200,18 +185,12 @@ class CwtchNotifier {
var timestampSent = DateTime.tryParse(data['TimestampSent'])!;
var currentTotal = profileCN.getProfile(data["ProfileOnion"])?.contactList.getContact(identifier)!.totalMessages;
var isAuto = data['Auto'] == "true";
String? contenthash = data['ContentHash'];
var selectedConversation = appState.selectedProfile == data["ProfileOnion"] && appState.selectedConversation == identifier;
// Only bother to do anything if we know about the group and the provided index is greater than our current total...
if (currentTotal != null && idx >= currentTotal) {
profileCN.getProfile(data["ProfileOnion"])?.contactList.getContact(identifier)!.updateMessageCache(identifier, idx, timestampSent, senderHandle, senderImage, isAuto, data["Data"]);
//if not currently open
if (appState.selectedProfile != data["ProfileOnion"] || appState.selectedConversation != identifier) {
profileCN.getProfile(data["ProfileOnion"])?.contactList.getContact(identifier)!.unreadMessages++;
} else {
profileCN.getProfile(data["ProfileOnion"])?.contactList.getContact(identifier)!.newMarker++;
}
// TODO: There are 2 timestamps associated with a new group message - time sent and time received.
// Sent refers to the time a profile alleges they sent a message
// Received refers to the time we actually saw the message from the server
@ -222,7 +201,8 @@ class CwtchNotifier {
// For now we perform some minimal checks on the sent timestamp to use to provide a useful ordering for honest contacts
// and ensure that malicious contacts in groups can only set this timestamp to a value within the range of `last seen message time`
// and `local now`.
profileCN.getProfile(data["ProfileOnion"])?.contactList.updateLastMessageTime(identifier, timestampSent.toLocal());
profileCN.getProfile(data["ProfileOnion"])?.contactList.newMessage(identifier, idx, timestampSent, senderHandle, senderImage, isAuto, data["Data"], contenthash, selectedConversation);
notificationManager.notify("New Message From Group!");
}
} else {

View File

@ -21,6 +21,8 @@ class ContactInfoState extends ChangeNotifier {
late Map<String, GlobalKey<MessageRowState>> keys;
int _newMarker = 0;
DateTime _newMarkerClearAt = DateTime.now();
//late List<MessageInfo?> messageCache;
late MessageCache messageCache;
// todo: a nicer way to model contacts, groups and other "entities"
late bool _isGroup;
@ -54,7 +56,8 @@ class ContactInfoState extends ChangeNotifier {
this._lastMessageTime = lastMessageTime == null ? DateTime.fromMillisecondsSinceEpoch(0) : lastMessageTime;
this._server = server;
this._archived = archived;
this.messageCache = List.empty(growable: true);
//this.messageCache = List.empty(growable: true);
this.messageCache = new MessageCache();
keys = Map<String, GlobalKey<MessageRowState>>();
}
@ -196,18 +199,31 @@ class ContactInfoState extends ChangeNotifier {
return ret;
}
void updateMessageCache(int conversation, int messageID, DateTime timestamp, String senderHandle, String senderImage, bool isAuto, String data) {
this.messageCache.insert(0, MessageInfo(MessageMetadata(profileOnion, conversation, messageID, timestamp, senderHandle, senderImage, "", {}, false, false, isAuto), data));
void newMessage(int identifier, int messageID, DateTime timestamp, String senderHandle, String senderImage, bool isAuto, String data, String? contenthash, bool selectedConversation) {
if (!selectedConversation) {
unreadMessages++;
} else {
newMarker++;
}
dan marked this conversation as resolved
Review

why doesn't this notify listeners?

why doesn't this notify listeners?
this.messageCache.addNew(profileOnion, identifier, messageID, timestamp, senderHandle, senderImage, isAuto, data, contenthash);
this.totalMessages += 1;
// We only ever see messages from authenticated peers.
// If the contact is marked as offline then override this - can happen when the contact is removed from the front
// end during syncing.
if (isOnline() == false) {
status = "Authenticated";
}
}
void bumpMessageCache() {
this.messageCache.insert(0, null);
this.messageCache.bumpMessageCache();
this.totalMessages += 1;
}
void ackCache(int messageID) {
this.messageCache.firstWhere((element) => element?.metadata.messageID == messageID)?.metadata.ackd = true;
this.messageCache.ackCache(messageID);
notifyListeners();
}
}
}

View File

@ -122,4 +122,9 @@ class ContactListState extends ChangeNotifier {
int idx = _contacts.indexWhere((element) => element.onion == byHandle);
return idx >= 0 ? _contacts[idx] : null;
}
}
void newMessage(int identifier, int messageID, DateTime timestamp, String senderHandle, String senderImage, bool isAuto, String data, String? contenthash, bool selectedConversation) {
getContact(identifier)?.newMessage(identifier, messageID, timestamp, senderHandle, senderImage, isAuto, data, contenthash, selectedConversation);
updateLastMessageTime(identifier, DateTime.now());
}
}

View File

@ -1,9 +1,11 @@
import 'dart:convert';
import 'package:cwtch/config.dart';
import 'package:flutter/material.dart';
import 'package:flutter/widgets.dart';
import 'package:provider/provider.dart';
import '../main.dart';
import 'messagecache.dart';
import 'messages/filemessage.dart';
import 'messages/invitemessage.dart';
import 'messages/malformedmessage.dart';
@ -28,7 +30,9 @@ const GroupConversationHandleLength = 32;
abstract class Message {
MessageMetadata getMetadata();
Widget getWidget(BuildContext context, Key key);
Widget getPreviewWidget(BuildContext context);
}
@ -57,48 +61,108 @@ Message compileOverlay(MessageMetadata metadata, String messageData) {
}
}
Future<Message> messageHandler(BuildContext context, String profileOnion, int conversationIdentifier, int index, {bool byID = false}) {
Future<Message> messageHandler(BuildContext context, String profileOnion, int conversationIdentifier,
{bool byIndex = false, int? index, bool byID = false, int? id, bool byHash = false, String? hash}) {
var malformedMetadata = MessageMetadata(profileOnion, conversationIdentifier, 0, DateTime.now(), "", "", "", <String, String>{}, false, true, false);
if (!byIndex && !byID && !byHash) {
dan marked this conversation as resolved Outdated
Outdated
Review

Use an enum to have the compiler do this check

Use an enum to have the compiler do this check
EnvironmentConfig.debugLog("Error calling messageHandler: one of byIndex, byID, byHash must be set");
return Future.value(MalformedMessage(malformedMetadata));
}
if ((byID && id == null) || (byIndex && index == null) || (byHash && hash == null)) {
dan marked this conversation as resolved Outdated
Outdated
Review

Use types to have the compiler enforce this coupling

Use types to have the compiler enforce this coupling
EnvironmentConfig.debugLog("Error calling messageHandler: byType needs corresponding value and it was not set");
return Future.value(MalformedMessage(malformedMetadata));
}
// Hit cache
MessageInfo? messageInfo = getMessageInfoFromCache(context, profileOnion, conversationIdentifier, byIndex: byIndex, index: index, byID: byID, id: id, byHash: byHash, hash: hash);
if (messageInfo != null) {
return Future.value(compileOverlay(messageInfo.metadata, messageInfo.wrapper));
}
// Fetch and Cache
var messageInfoFuture = fetchAndCacheMessageInfo(context, profileOnion, conversationIdentifier, byIndex: byIndex, index: index, byID: byID, id: id, byHash: byHash, hash: hash);
return messageInfoFuture.then( (MessageInfo? messageInfo) {
if (messageInfo != null) {
return compileOverlay(messageInfo.metadata, messageInfo.wrapper);
} else {
return MalformedMessage(malformedMetadata);
}
});
}
MessageInfo? getMessageInfoFromCache(BuildContext context, String profileOnion, int conversationIdentifier,
{bool byIndex = false, int? index, bool byID = false, int? id, bool byHash = false, String? hash}) {
// Hit cache
try {
var cache = Provider.of<ProfileInfoState>(context, listen: false).contactList.getContact(conversationIdentifier)?.messageCache;
if (cache != null && cache.length > index) {
if (cache[index] != null) {
return Future.value(compileOverlay(cache[index]!.metadata, cache[index]!.wrapper));
if (cache != null) {
MessageInfo? messageInfo = null;
if (byID) {
messageInfo = cache.getById(id!);
} else if (byHash) {
messageInfo = cache.getByContentHash(hash!);
} else {
messageInfo = cache.getByIndex(index!);
}
if (messageInfo != null) {
return messageInfo;
}
}
} catch (e) {
EnvironmentConfig.debugLog("message handler exception on get from cache: $e");
// provider check failed...make an expensive call...
}
return null;
}
Future<MessageInfo?> fetchAndCacheMessageInfo(BuildContext context, String profileOnion, int conversationIdentifier,
{bool byIndex = false, int? index, bool byID = false, int? id, bool byHash = false, String? hash}) {
// Load and cache
try {
Future<dynamic> rawMessageEnvelopeFuture;
if (byID) {
rawMessageEnvelopeFuture = Provider.of<FlwtchState>(context, listen: false).cwtch.GetMessageByID(profileOnion, conversationIdentifier, index);
rawMessageEnvelopeFuture = Provider
.of<FlwtchState>(context, listen: false)
.cwtch
.GetMessageByID(profileOnion, conversationIdentifier, id!);
} else if (byHash) {
rawMessageEnvelopeFuture = Provider
.of<FlwtchState>(context, listen: false)
.cwtch
.GetMessageByContentHash(profileOnion, conversationIdentifier, hash!);
} else {
rawMessageEnvelopeFuture = Provider.of<FlwtchState>(context, listen: false).cwtch.GetMessage(profileOnion, conversationIdentifier, index);
rawMessageEnvelopeFuture = Provider
.of<FlwtchState>(context, listen: false)
.cwtch
.GetMessage(profileOnion, conversationIdentifier, index!);
}
return rawMessageEnvelopeFuture.then((dynamic rawMessageEnvelope) {
var metadata = MessageMetadata(profileOnion, conversationIdentifier, index, DateTime.now(), "", "", "", <String, String>{}, false, true, false);
try {
dynamic messageWrapper = jsonDecode(rawMessageEnvelope);
// There are 2 conditions in which this error condition can be met:
// 1. The application == nil, in which case this instance of the UI is already
// broken beyond repair, and will either be replaced by a new version, or requires a complete
// restart.
// 2. This index was incremented and we happened to fetch the timeline prior to the messages inclusion.
// This should be rare as Timeline addition/fetching is mutex protected and Dart itself will pipeline the
// calls to libCwtch-go - however because we use goroutines on the backend there is always a chance that one
// will find itself delayed.
// The second case is recoverable by tail-recursing this future.
// There are 2 conditions in which this error condition can be met:
// 1. The application == nil, in which case this instance of the UI is already
// broken beyond repair, and will either be replaced by a new version, or requires a complete
// restart.
// 2. This index was incremented and we happened to fetch the timeline prior to the messages inclusion.
// This should be rare as Timeline addition/fetching is mutex protected and Dart itself will pipeline the
// calls to libCwtch-go - however because we use goroutines on the backend there is always a chance that one
// will find itself delayed.
// The second case is recoverable by tail-recursing this future.
if (messageWrapper['Message'] == null || messageWrapper['Message'] == '' || messageWrapper['Message'] == '{}') {
return Future.delayed(Duration(seconds: 2), () {
print("Tail recursive call to messageHandler called. This should be a rare event. If you see multiples of this log over a short period of time please log it as a bug.");
return messageHandler(context, profileOnion, conversationIdentifier, -1, byID: byID).then((value) => value);
return fetchAndCacheMessageInfo(context, profileOnion, conversationIdentifier, byIndex: byIndex,
index: index,
byID: byID,
id: id,
byHash: byHash,
hash: hash).then((value) => value);
});
}
// Construct the initial metadata
// Construct the initial metadata
var messageID = messageWrapper['ID'];
var timestamp = DateTime.tryParse(messageWrapper['Timestamp'])!;
var senderHandle = messageWrapper['PeerID'];
@ -107,16 +171,47 @@ Future<Message> messageHandler(BuildContext context, String profileOnion, int co
var ackd = messageWrapper['Acknowledged'];
var error = messageWrapper['Error'] != null;
var signature = messageWrapper['Signature'];
metadata = MessageMetadata(profileOnion, conversationIdentifier, messageID, timestamp, senderHandle, senderImage, signature, attributes, ackd, error, false);
var contenthash = messageWrapper['ContentHash'];
var localIndex = messageWrapper['LocalIndex'];
var metadata = MessageMetadata(
profileOnion,
conversationIdentifier,
messageID,
timestamp,
senderHandle,
senderImage,
signature,
attributes,
ackd,
error,
false);
var messageInfo = new MessageInfo(metadata, messageWrapper['Message']);
return compileOverlay(metadata, messageWrapper['Message']);
var cache = Provider
.of<ProfileInfoState>(context, listen: false)
.contactList
.getContact(conversationIdentifier)
?.messageCache;
if (cache != null) {
if (byID) {
cache.addUnindexed(messageInfo, contenthash);
} else if (byHash) {
cache.addUnindexed(messageInfo, contenthash);
} else {
cache.add(messageInfo, index!, contenthash);
}
}
return messageInfo;
} catch (e) {
EnvironmentConfig.debugLog("an error! " + e.toString());
return MalformedMessage(metadata);
EnvironmentConfig.debugLog("message handler exception on parse message and cache: " + e.toString());
return null;
}
});
} catch (e) {
return Future.value(MalformedMessage(MessageMetadata(profileOnion, conversationIdentifier, -1, DateTime.now(), "", "", "", <String, String>{}, false, true, false)));
EnvironmentConfig.debugLog("message handler exeption on get message: $e");
return Future.value(null);
}
}
@ -139,12 +234,14 @@ class MessageMetadata extends ChangeNotifier {
dynamic get attributes => this._attributes;
bool get ackd => this._ackd;
set ackd(bool newVal) {
this._ackd = newVal;
notifyListeners();
}
bool get error => this._error;
set error(bool newVal) {
this._error = newVal;
notifyListeners();

View File

@ -9,21 +9,51 @@ class MessageInfo {
class MessageCache {
late Map<int, MessageInfo> cache;
late List<int?> cacheByIndex;
late Map<String, int> cacheByHash;
MessageCache() {
this.cache = {};
this.cacheByIndex = List.empty(growable: true);
cache = {};
cacheByIndex = List.empty(growable: true);
cacheByHash = {};
}
int get indexedLength => cacheByIndex.length;
void addNew(int conversation, int messageID, DateTime timestamp, String senderHandle, String senderImage, bool isAuto, String data) {
MessageInfo? getById(int id) => cache[id];
MessageInfo? getByIndex(int index) {
if (index >= cacheByIndex.length) {
return null;
}
return cache[cacheByIndex[index]];
}
MessageInfo? getByContentHash(String contenthash) => cache[cacheByHash[contenthash]];
void addNew(String profileOnion, int conversation, int messageID, DateTime timestamp, String senderHandle, String senderImage, bool isAuto, String data, String? contenthash) {
this.cache[messageID] = MessageInfo(MessageMetadata(profileOnion, conversation, messageID, timestamp, senderHandle, senderImage, "", {}, false, false, isAuto), data);
this.cacheByIndex.insert(0, messageID);
if (contenthash != null && contenthash != "") {
this.cacheByHash[contenthash] = messageID;
}
}
void add(MessageInfo messageInfo, int index, String? contenthash) {
this.cache[messageInfo.metadata.messageID] = messageInfo;
this.cacheByIndex.insert(index, messageInfo.metadata.messageID);
if (contenthash != null && contenthash != "") {
this.cacheByHash[contenthash] = messageInfo.metadata.messageID;
}
}
void addUnindexed(MessageInfo messageInfo, String? contenthash) {
this.cache[messageInfo.metadata.messageID] = messageInfo;
if (contenthash != null && contenthash != "") {
this.cacheByHash[contenthash] = messageInfo.metadata.messageID;
}
}
// TODO inserting nulls travel down list causing fails for all
void bumpMessageCache() {
this.messageCache.insert(0, null);
this.totalMessages += 1;
this.cacheByIndex.insert(0, null);
}
void ackCache(int messageID) {

View File

@ -9,6 +9,8 @@ import 'package:flutter/widgets.dart';
import 'package:provider/provider.dart';
import '../../main.dart';
import '../messagecache.dart';
import '../profile.dart';
class QuotedMessageStructure {
final String quotedHash;
@ -21,22 +23,6 @@ class QuotedMessageStructure {
};
}
class LocallyIndexedMessage {
final dynamic message;
final int index;
LocallyIndexedMessage(this.message, this.index);
LocallyIndexedMessage.fromJson(Map<String, dynamic> json)
: message = json['Message'],
index = json['LocalIndex'];
Map<String, dynamic> toJson() => {
'Message': message,
'LocalIndex': index,
};
}
class QuotedMessage extends Message {
final MessageMetadata metadata;
final String content;
@ -70,34 +56,11 @@ class QuotedMessage extends Message {
return MalformedBubble();
}
var quotedMessagePotentials = Provider.of<FlwtchState>(context).cwtch.GetMessageByContentHash(metadata.profileOnion, metadata.conversationIdentifier, message["quotedHash"]);
Future<LocallyIndexedMessage?> quotedMessage = quotedMessagePotentials.then((matchingMessages) {
if (matchingMessages == "[]") {
return null;
}
// reverse order the messages from newest to oldest and return the
// first matching message where it's index is less than the index of this
// message
try {
var list = (jsonDecode(matchingMessages) as List<dynamic>).map((data) => LocallyIndexedMessage.fromJson(data)).toList();
LocallyIndexedMessage candidate = list.reversed.first;
return candidate;
} catch (e) {
// Malformed Message will be returned...
return null;
}
});
return ChangeNotifierProvider.value(
value: this.metadata,
builder: (bcontext, child) {
return MessageRow(
QuotedMessageBubble(message["body"], quotedMessage.then((LocallyIndexedMessage? localIndex) {
if (localIndex != null) {
return messageHandler(bcontext, metadata.profileOnion, metadata.conversationIdentifier, localIndex.index);
}
return MalformedMessage(this.metadata);
})),
QuotedMessageBubble(message["body"], messageHandler(bcontext, metadata.profileOnion, metadata.conversationIdentifier, byHash: true, hash: message["quotedHash"])),
key: key);
});
} catch (e) {

View File

@ -132,7 +132,6 @@ class ProfileInfoState extends ChangeNotifier {
@override
void dispose() {
super.dispose();
print("profileinfostate.dispose()");
}
void updateFrom(String onion, String name, String picture, String contactsJson, String serverJson, bool online) {

View File

@ -225,7 +225,10 @@ class _MessageViewState extends State<MessageView> {
ctrlrCompose.clear();
focusNode.requestFocus();
Future.delayed(const Duration(milliseconds: 80), () {
Provider.of<ProfileInfoState>(context, listen: false).contactList.getContact(Provider.of<ContactInfoState>(context, listen: false).identifier)?.bumpMessageCache();
var profile = Provider.of<ContactInfoState>(context, listen: false).profileOnion;
var identifier = Provider.of<ContactInfoState>(context, listen: false).identifier;
//Provider.of<ProfileInfoState>(context, listen: false).contactList.getContact(Provider.of<ContactInfoState>(context, listen: false).identifier)?.bumpMessageCache();
fetchAndCacheMessageInfo(context, profile, identifier, byIndex: true, index: 0);
Provider.of<ContactInfoState>(context, listen: false).newMarker++;
// Resort the contact list...
Provider.of<ProfileInfoState>(context, listen: false).contactList.updateLastMessageTime(Provider.of<ContactInfoState>(context, listen: false).identifier, DateTime.now());
@ -282,7 +285,7 @@ class _MessageViewState extends State<MessageView> {
if (Provider.of<AppState>(context).selectedConversation != null && Provider.of<AppState>(context).selectedIndex != null) {
var quoted = FutureBuilder(
future:
messageHandler(context, Provider.of<AppState>(context).selectedProfile!, Provider.of<AppState>(context).selectedConversation!, Provider.of<AppState>(context).selectedIndex!, byID: true),
messageHandler(context, Provider.of<AppState>(context).selectedProfile!, Provider.of<AppState>(context).selectedConversation!, id: Provider.of<AppState>(context).selectedIndex!, byID: true),
builder: (context, snapshot) {
if (snapshot.hasData) {
var message = snapshot.data! as Message;

View File

@ -83,7 +83,7 @@ class _MessageListState extends State<MessageList> {
var messageIndex = index;
return FutureBuilder(
future: messageHandler(outerContext, profileOnion, contactHandle, messageIndex),
future: messageHandler(outerContext, profileOnion, contactHandle, byIndex: true, index: messageIndex),
builder: (context, snapshot) {
if (snapshot.hasData) {
var message = snapshot.data as Message;

View File

@ -220,8 +220,8 @@ class MessageRowState extends State<MessageRow> with SingleTickerProviderStateMi
)))));
var mark = Provider.of<ContactInfoState>(context).newMarker;
if (mark > 0 &&
Provider.of<ContactInfoState>(context).messageCache.length > mark &&
Provider.of<ContactInfoState>(context).messageCache[mark - 1]?.metadata.messageID == Provider.of<MessageMetadata>(context).messageID) {
Provider.of<ContactInfoState>(context).messageCache.indexedLength > mark &&
Provider.of<ContactInfoState>(context).messageCache.getByIndex(mark - 1)?.metadata.messageID == Provider.of<MessageMetadata>(context).messageID) {
return Column(crossAxisAlignment: fromMe ? CrossAxisAlignment.end : CrossAxisAlignment.start, children: [Align(alignment: Alignment.center, child: _bubbleNew()), mr]);
} else {
return mr;