From ff3e60a7506810647ee71417c3b23d8f202f045f Mon Sep 17 00:00:00 2001 From: Dan Ballard Date: Wed, 23 Mar 2022 16:08:19 -0700 Subject: [PATCH] message cache allows index locking, rework messageHandler to use bulk fetching, sendMessage flow with no sleep; move some core getMessages/SendMessage handlers from FlwtchWorker to MainActivity --- .../kotlin/im/cwtch/flwtch/FlwtchWorker.kt | 20 +- .../kotlin/im/cwtch/flwtch/MainActivity.kt | 176 ++++++++------ lib/cwtch/cwtch.dart | 11 +- lib/cwtch/cwtchNotifier.dart | 4 +- lib/cwtch/ffi.dart | 52 +++- lib/cwtch/gomobile.dart | 17 +- lib/models/contact.dart | 5 +- lib/models/contactlist.dart | 2 +- lib/models/message.dart | 230 ++++++++++-------- lib/models/messagecache.dart | 121 +++++++-- lib/models/profile.dart | 2 +- lib/views/messageview.dart | 45 ++-- lib/widgets/messagerow.dart | 7 +- 13 files changed, 434 insertions(+), 258 deletions(-) diff --git a/android/app/src/main/kotlin/im/cwtch/flwtch/FlwtchWorker.kt b/android/app/src/main/kotlin/im/cwtch/flwtch/FlwtchWorker.kt index 35980b5e..32d0a7bd 100644 --- a/android/app/src/main/kotlin/im/cwtch/flwtch/FlwtchWorker.kt +++ b/android/app/src/main/kotlin/im/cwtch/flwtch/FlwtchWorker.kt @@ -290,25 +290,7 @@ class FlwtchWorker(context: Context, parameters: WorkerParameters) : val conversation = a.getInt("conversation").toLong() Cwtch.unblockContact(profile, conversation) } - "SendMessage" -> { - val profile = (a.get("ProfileOnion") as? String) ?: "" - val conversation = a.getInt("conversation").toLong() - val message = (a.get("message") as? String) ?: "" - Log.i(TAG, "SendMessage: $message") - Cwtch.sendMessage(profile, conversation, message) - } - "SendInvitation" -> { - val profile = (a.get("ProfileOnion") as? String) ?: "" - val conversation = a.getInt("conversation").toLong() - val target = a.getInt("target").toLong() - Cwtch.sendInvitation(profile, conversation, target) - } - "ShareFile" -> { - val profile = (a.get("ProfileOnion") as? String) ?: "" - val conversation = a.getInt("conversation").toLong() - val filepath = (a.get("filepath") as? String) ?: "" - Cwtch.shareFile(profile, conversation, filepath) - } + "DownloadFile" -> { val profile = (a.get("ProfileOnion") as? String) ?: "" val conversation = a.getInt("conversation").toLong() diff --git a/android/app/src/main/kotlin/im/cwtch/flwtch/MainActivity.kt b/android/app/src/main/kotlin/im/cwtch/flwtch/MainActivity.kt index 383887bb..48f5f9a1 100644 --- a/android/app/src/main/kotlin/im/cwtch/flwtch/MainActivity.kt +++ b/android/app/src/main/kotlin/im/cwtch/flwtch/MainActivity.kt @@ -35,6 +35,9 @@ import android.os.Environment import android.database.Cursor import android.provider.MediaStore +import cwtch.Cwtch + + class MainActivity: FlutterActivity() { override fun provideSplashScreen(): SplashScreen? = SplashView() @@ -168,84 +171,117 @@ class MainActivity: FlutterActivity() { // receives messages from the ForegroundService (which provides, ironically enough, the backend) private fun handleCwtch(@NonNull call: MethodCall, @NonNull result: Result) { var method = call.method + // todo change usage patern to match that in FlwtchWorker + // Unsafe for anything using int args, causes access time attempt to cast to string which will fail val argmap: Map = call.arguments as Map // the frontend calls Start every time it fires up, but we don't want to *actually* call Cwtch.Start() // in case the ForegroundService is still running. in both cases, however, we *do* want to re-register // the eventbus listener. - if (call.method == "Start") { - val uniqueTag = argmap["torPath"] ?: "nullEventBus" + when (call.method) { + "Start" -> { + val uniqueTag = argmap["torPath"] ?: "nullEventBus" - // note: because the ForegroundService is specified as UniquePeriodicWork, it can't actually get - // accidentally duplicated. however, we still need to manually check if it's running or not, so - // that we can divert this method call to ReconnectCwtchForeground instead if so. - val works = WorkManager.getInstance(this).getWorkInfosByTag(WORKER_TAG).get() - for (workInfo in works) { - WorkManager.getInstance(this).cancelWorkById(workInfo.id) - } - WorkManager.getInstance(this).pruneWork() - - Log.i("MainActivity.kt", "Start() launching foregroundservice") - // this is where the eventbus ForegroundService gets launched. WorkManager should keep it alive after this - val data: Data = Data.Builder().putString(FlwtchWorker.KEY_METHOD, call.method).putString(FlwtchWorker.KEY_ARGS, JSONObject(argmap).toString()).build() - // 15 minutes is the shortest interval you can request - val workRequest = PeriodicWorkRequestBuilder(15, TimeUnit.MINUTES).setInputData(data).addTag(WORKER_TAG).addTag(uniqueTag).build() - WorkManager.getInstance(this).enqueueUniquePeriodicWork("req_$uniqueTag", ExistingPeriodicWorkPolicy.REPLACE, workRequest) - return - } else if (call.method == "CreateDownloadableFile") { - this.dlToProfile = argmap["ProfileOnion"] ?: "" - this.dlToHandle = argmap["handle"] ?: "" - val suggestedName = argmap["filename"] ?: "filename.ext" - this.dlToFileKey = argmap["filekey"] ?: "" - val intent = Intent(Intent.ACTION_CREATE_DOCUMENT).apply { - addCategory(Intent.CATEGORY_OPENABLE) - type = "application/octet-stream" - putExtra(Intent.EXTRA_TITLE, suggestedName) - } - startActivityForResult(intent, FILEPICKER_REQUEST_CODE) - return - } else if (call.method == "ExportPreviewedFile") { - this.exportFromPath = argmap["Path"] ?: "" - val suggestion = argmap["FileName"] ?: "filename.ext" - var imgType = "jpeg" - if (suggestion.endsWith("png")) { - imgType = "png" - } else if (suggestion.endsWith("webp")) { - imgType = "webp" - } else if (suggestion.endsWith("bmp")) { - imgType = "bmp" - } else if (suggestion.endsWith("gif")) { - imgType = "gif" - } - val intent = Intent(Intent.ACTION_CREATE_DOCUMENT).apply { - addCategory(Intent.CATEGORY_OPENABLE) - type = "image/" + imgType - putExtra(Intent.EXTRA_TITLE, suggestion) - } - startActivityForResult(intent, PREVIEW_EXPORT_REQUEST_CODE) - return - } else if (call.method == "ExportProfile") { - this.exportFromPath = argmap["file"] ?: "" - val intent = Intent(Intent.ACTION_CREATE_DOCUMENT).apply { - addCategory(Intent.CATEGORY_OPENABLE) - type = "application/gzip" - putExtra(Intent.EXTRA_TITLE, argmap["file"]) - } - startActivityForResult(intent, PROFILE_EXPORT_REQUEST_CODE) - } - - // ...otherwise fallthru to a normal ffi method call (and return the result using the result callback) - val data: Data = Data.Builder().putString(FlwtchWorker.KEY_METHOD, method).putString(FlwtchWorker.KEY_ARGS, JSONObject(argmap).toString()).build() - val workRequest = OneTimeWorkRequestBuilder().setInputData(data).build() - WorkManager.getInstance(this).enqueue(workRequest) - WorkManager.getInstance(applicationContext).getWorkInfoByIdLiveData(workRequest.id).observe( - this, Observer { workInfo -> - if (workInfo != null && workInfo.state == WorkInfo.State.SUCCEEDED) { - val res = workInfo.outputData.keyValueMap.toString() - result.success(workInfo.outputData.getString("result")) + // note: because the ForegroundService is specified as UniquePeriodicWork, it can't actually get + // accidentally duplicated. however, we still need to manually check if it's running or not, so + // that we can divert this method call to ReconnectCwtchForeground instead if so. + val works = WorkManager.getInstance(this).getWorkInfosByTag(WORKER_TAG).get() + for (workInfo in works) { + WorkManager.getInstance(this).cancelWorkById(workInfo.id) } + WorkManager.getInstance(this).pruneWork() + + Log.i("MainActivity.kt", "Start() launching foregroundservice") + // this is where the eventbus ForegroundService gets launched. WorkManager should keep it alive after this + val data: Data = Data.Builder().putString(FlwtchWorker.KEY_METHOD, call.method).putString(FlwtchWorker.KEY_ARGS, JSONObject(argmap).toString()).build() + // 15 minutes is the shortest interval you can request + val workRequest = PeriodicWorkRequestBuilder(15, TimeUnit.MINUTES).setInputData(data).addTag(WORKER_TAG).addTag(uniqueTag).build() + WorkManager.getInstance(this).enqueueUniquePeriodicWork("req_$uniqueTag", ExistingPeriodicWorkPolicy.REPLACE, workRequest) } - ) + "CreateDownloadableFile" -> { + this.dlToProfile = argmap["ProfileOnion"] ?: "" + this.dlToHandle = argmap["handle"] ?: "" + val suggestedName = argmap["filename"] ?: "filename.ext" + this.dlToFileKey = argmap["filekey"] ?: "" + val intent = Intent(Intent.ACTION_CREATE_DOCUMENT).apply { + addCategory(Intent.CATEGORY_OPENABLE) + type = "application/octet-stream" + putExtra(Intent.EXTRA_TITLE, suggestedName) + } + startActivityForResult(intent, FILEPICKER_REQUEST_CODE) + } + "ExportPreviewedFile" -> { + this.exportFromPath = argmap["Path"] ?: "" + val suggestion = argmap["FileName"] ?: "filename.ext" + var imgType = "jpeg" + if (suggestion.endsWith("png")) { + imgType = "png" + } else if (suggestion.endsWith("webp")) { + imgType = "webp" + } else if (suggestion.endsWith("bmp")) { + imgType = "bmp" + } else if (suggestion.endsWith("gif")) { + imgType = "gif" + } + val intent = Intent(Intent.ACTION_CREATE_DOCUMENT).apply { + addCategory(Intent.CATEGORY_OPENABLE) + type = "image/" + imgType + putExtra(Intent.EXTRA_TITLE, suggestion) + } + startActivityForResult(intent, PREVIEW_EXPORT_REQUEST_CODE) + } + "ExportProfile" -> { + this.exportFromPath = argmap["file"] ?: "" + val intent = Intent(Intent.ACTION_CREATE_DOCUMENT).apply { + addCategory(Intent.CATEGORY_OPENABLE) + type = "application/gzip" + putExtra(Intent.EXTRA_TITLE, argmap["file"]) + } + startActivityForResult(intent, PROFILE_EXPORT_REQUEST_CODE) + } + "GetMessages" -> { + Log.d("MainActivity.kt", "Cwtch GetMessages") + + val profile = argmap["ProfileOnion"] ?: "" + val conversation: Int = call.argument("conversation") ?: 0 + val indexI: Int = call.argument("index") ?: 0 + val count: Int = call.argument("count") ?: 1 + + result.success(Cwtch.getMessages(profile, conversation.toLong(), indexI.toLong(), count.toLong())) + } + "SendMessage" -> { + val profile: String = call.argument("ProfileOnion") ?: "" + val conversation: Int = call.argument("conversation") ?: 0 + val message: String = call.argument("message") ?: "" + result.success(Cwtch.sendMessage(profile, conversation.toLong(), message)) + } + "SendInvitation" -> { + val profile: String = call.argument("ProfileOnion") ?: "" + val conversation: Int = call.argument("conversation") ?: 0 + val target: Int = call.argument("target") ?: 0 + result.success(Cwtch.sendInvitation(profile, conversation.toLong(), target.toLong())) + } + "ShareFile" -> { + val profile: String = call.argument("ProfileOnion") ?: "" + val conversation: Int = call.argument("conversation") ?: 0 + val filepath: String = call.argument("filepath") ?: "" + result.success(Cwtch.shareFile(profile, conversation.toLong(), filepath)) + } + else -> { + // ...otherwise fallthru to a normal ffi method call (and return the result using the result callback) + val data: Data = Data.Builder().putString(FlwtchWorker.KEY_METHOD, method).putString(FlwtchWorker.KEY_ARGS, JSONObject(argmap).toString()).build() + val workRequest = OneTimeWorkRequestBuilder().setInputData(data).build() + WorkManager.getInstance(this).enqueue(workRequest) + WorkManager.getInstance(applicationContext).getWorkInfoByIdLiveData(workRequest.id).observe( + this, Observer { workInfo -> + if (workInfo != null && workInfo.state == WorkInfo.State.SUCCEEDED) { + val res = workInfo.outputData.keyValueMap.toString() + result.success(workInfo.outputData.getString("result")) + } + } + ) + } + } } // using onresume/onstop for broadcastreceiver because of extended discussion on https://stackoverflow.com/questions/7439041/how-to-unregister-broadcastreceiver diff --git a/lib/cwtch/cwtch.dart b/lib/cwtch/cwtch.dart index e4a65896..f675a884 100644 --- a/lib/cwtch/cwtch.dart +++ b/lib/cwtch/cwtch.dart @@ -48,12 +48,15 @@ abstract class Cwtch { Future GetMessageByContentHash(String profile, int handle, String contentHash); // ignore: non_constant_identifier_names - void SendMessage(String profile, int handle, String message); - // ignore: non_constant_identifier_names - void SendInvitation(String profile, int handle, int target); + Future GetMessages(String profile, int handle, int index, int count); // ignore: non_constant_identifier_names - void ShareFile(String profile, int handle, String filepath); + Future SendMessage(String profile, int handle, String message); + // ignore: non_constant_identifier_names + Future SendInvitation(String profile, int handle, int target); + + // ignore: non_constant_identifier_names + Future ShareFile(String profile, int handle, String filepath); // ignore: non_constant_identifier_names void DownloadFile(String profile, int handle, String filepath, String manifestpath, String filekey); // android-only diff --git a/lib/cwtch/cwtchNotifier.dart b/lib/cwtch/cwtchNotifier.dart index dbeb6c7a..3f3b4951 100644 --- a/lib/cwtch/cwtchNotifier.dart +++ b/lib/cwtch/cwtchNotifier.dart @@ -159,7 +159,7 @@ class CwtchNotifier { var senderHandle = data['RemotePeer']; var senderImage = data['picture']; var isAuto = data['Auto'] == "true"; - String? contenthash = data['ContentHash']; + String contenthash = data['ContentHash']; var selectedProfile = appState.selectedProfile == data["ProfileOnion"]; var selectedConversation = selectedProfile && appState.selectedConversation == identifier; var notification = data["notification"]; @@ -211,7 +211,7 @@ class CwtchNotifier { var contact = profileCN.getProfile(data["ProfileOnion"])?.contactList.getContact(identifier); var currentTotal = contact!.totalMessages; var isAuto = data['Auto'] == "true"; - String? contenthash = data['ContentHash']; + String contenthash = data['ContentHash']; var selectedProfile = appState.selectedProfile == data["ProfileOnion"]; var selectedConversation = selectedProfile && appState.selectedConversation == identifier; var notification = data["notification"]; diff --git a/lib/cwtch/ffi.dart b/lib/cwtch/ffi.dart index a1c8fe2e..02a02486 100644 --- a/lib/cwtch/ffi.dart +++ b/lib/cwtch/ffi.dart @@ -61,6 +61,9 @@ typedef VoidFromStringIntFn = void Function(Pointer, int, int); typedef get_json_blob_string_function = Pointer Function(Pointer str, Int32 length); typedef GetJsonBlobStringFn = Pointer Function(Pointer str, int len); +typedef get_json_blob_from_string_int_string_function = Pointer Function(Pointer, Int32 , Int32, Pointer, Int32); +typedef GetJsonBlobFromStrIntStrFn = Pointer Function(Pointer, int, int, Pointer, int); + //func GetMessage(profile_ptr *C.char, profile_len C.int, handle_ptr *C.char, handle_len C.int, message_index C.int) *C.char { typedef get_json_blob_from_str_str_int_function = Pointer Function(Pointer, Int32, Pointer, Int32, Int32); typedef GetJsonBlobFromStrStrIntFn = Pointer Function(Pointer, int, Pointer, int, int); @@ -68,6 +71,9 @@ typedef GetJsonBlobFromStrStrIntFn = Pointer Function(Pointer, int, typedef get_json_blob_from_str_int_int_function = Pointer Function(Pointer, Int32, Int32, Int32); typedef GetJsonBlobFromStrIntIntFn = Pointer Function(Pointer, int, int, int); +typedef get_json_blob_from_str_int_int_int_function = Pointer Function(Pointer, Int32, Int32, Int32, Int32); +typedef GetJsonBlobFromStrIntIntIntFn = Pointer Function(Pointer, int, int, int, int); + typedef get_json_blob_from_str_int_string_function = Pointer Function(Pointer, Int32, Int32, Pointer, Int32); typedef GetJsonBlobFromStrIntStringFn = Pointer Function( Pointer, @@ -300,6 +306,19 @@ class CwtchFfi implements Cwtch { return jsonMessage; } + // ignore: non_constant_identifier_names + Future GetMessages(String profile, int handle, int index, int count) async { + var getMessagesC = library.lookup>("c_GetMessages"); + // ignore: non_constant_identifier_names + final GetMessages = getMessagesC.asFunction(); + final utf8profile = profile.toNativeUtf8(); + Pointer jsonMessageBytes = GetMessages(utf8profile, utf8profile.length, handle, index, count); + String jsonMessage = jsonMessageBytes.toDartString(); + _UnsafeFreePointerAnyUseOfThisFunctionMustBeDoubleApproved(jsonMessageBytes); + malloc.free(utf8profile); + return jsonMessage; + } + @override // ignore: non_constant_identifier_names void SendProfileEvent(String onion, String json) { @@ -359,39 +378,48 @@ class CwtchFfi implements Cwtch { @override // ignore: non_constant_identifier_names - void SendMessage(String profileOnion, int contactHandle, String message) { - var sendMessage = library.lookup>("c_SendMessage"); + Future SendMessage(String profileOnion, int contactHandle, String message) async { + var sendMessage = library.lookup>("c_SendMessage"); // ignore: non_constant_identifier_names - final SendMessage = sendMessage.asFunction(); + final SendMessage = sendMessage.asFunction(); final u1 = profileOnion.toNativeUtf8(); final u3 = message.toNativeUtf8(); - SendMessage(u1, u1.length, contactHandle, u3, u3.length); + Pointer jsonMessageBytes = SendMessage(u1, u1.length, contactHandle, u3, u3.length); + String jsonMessage = jsonMessageBytes.toDartString(); + _UnsafeFreePointerAnyUseOfThisFunctionMustBeDoubleApproved(jsonMessageBytes); malloc.free(u1); malloc.free(u3); + return jsonMessage; } @override // ignore: non_constant_identifier_names - void SendInvitation(String profileOnion, int contactHandle, int target) { - var sendInvitation = library.lookup>("c_SendInvitation"); + Future SendInvitation(String profileOnion, int contactHandle, int target) async { + var sendInvitation = library.lookup>("c_SendInvitation"); // ignore: non_constant_identifier_names - final SendInvitation = sendInvitation.asFunction(); + final SendInvitation = sendInvitation.asFunction(); final u1 = profileOnion.toNativeUtf8(); - SendInvitation(u1, u1.length, contactHandle, target); + Pointer jsonMessageBytes = SendInvitation(u1, u1.length, contactHandle, target); + String jsonMessage = jsonMessageBytes.toDartString(); + _UnsafeFreePointerAnyUseOfThisFunctionMustBeDoubleApproved(jsonMessageBytes); malloc.free(u1); + return jsonMessage; } @override // ignore: non_constant_identifier_names - void ShareFile(String profileOnion, int contactHandle, String filepath) { - var shareFile = library.lookup>("c_ShareFile"); + Future ShareFile(String profileOnion, int contactHandle, String filepath) async { + var shareFile = library.lookup>("c_ShareFile"); // ignore: non_constant_identifier_names - final ShareFile = shareFile.asFunction(); + final ShareFile = shareFile.asFunction(); final u1 = profileOnion.toNativeUtf8(); final u3 = filepath.toNativeUtf8(); - ShareFile(u1, u1.length, contactHandle, u3, u3.length); + Pointer jsonMessageBytes = ShareFile(u1, u1.length, contactHandle, u3, u3.length); + String jsonMessage = jsonMessageBytes.toDartString(); + _UnsafeFreePointerAnyUseOfThisFunctionMustBeDoubleApproved(jsonMessageBytes); malloc.free(u1); malloc.free(u3); + return jsonMessage; } @override diff --git a/lib/cwtch/gomobile.dart b/lib/cwtch/gomobile.dart index fc3db6f7..ded0b36c 100644 --- a/lib/cwtch/gomobile.dart +++ b/lib/cwtch/gomobile.dart @@ -94,6 +94,11 @@ class CwtchGomobile implements Cwtch { return cwtchPlatform.invokeMethod("GetMessageByID", {"ProfileOnion": profile, "conversation": conversation, "id": id}); } + // ignore: non_constant_identifier_names + Future GetMessages(String profile, int conversation, int index, int count) { + return cwtchPlatform.invokeMethod("GetMessages", {"ProfileOnion": profile, "conversation": conversation, "index": index, "count": count}); + } + @override // ignore: non_constant_identifier_names void SendProfileEvent(String onion, String jsonEvent) { @@ -129,20 +134,20 @@ class CwtchGomobile implements Cwtch { @override // ignore: non_constant_identifier_names - void SendMessage(String profileOnion, int conversation, String message) { - cwtchPlatform.invokeMethod("SendMessage", {"ProfileOnion": profileOnion, "conversation": conversation, "message": message}); + Future SendMessage(String profileOnion, int conversation, String message) { + return cwtchPlatform.invokeMethod("SendMessage", {"ProfileOnion": profileOnion, "conversation": conversation, "message": message}); } @override // ignore: non_constant_identifier_names - void SendInvitation(String profileOnion, int conversation, int target) { - cwtchPlatform.invokeMethod("SendInvitation", {"ProfileOnion": profileOnion, "conversation": conversation, "target": target}); + Future SendInvitation(String profileOnion, int conversation, int target) { + return cwtchPlatform.invokeMethod("SendInvitation", {"ProfileOnion": profileOnion, "conversation": conversation, "target": target}); } @override // ignore: non_constant_identifier_names - void ShareFile(String profileOnion, int conversation, String filepath) { - cwtchPlatform.invokeMethod("ShareFile", {"ProfileOnion": profileOnion, "conversation": conversation, "filepath": filepath}); + Future ShareFile(String profileOnion, int conversation, String filepath) { + return cwtchPlatform.invokeMethod("ShareFile", {"ProfileOnion": profileOnion, "conversation": conversation, "filepath": filepath}); } @override diff --git a/lib/models/contact.dart b/lib/models/contact.dart index 87976811..c0d3f792 100644 --- a/lib/models/contact.dart +++ b/lib/models/contact.dart @@ -82,7 +82,7 @@ class ContactInfoState extends ChangeNotifier { this._server = server; this._archived = archived; this._notificationPolicy = notificationPolicyFromString(notificationPolicy); - this.messageCache = new MessageCache(); + this.messageCache = new MessageCache(_totalMessages); keys = Map>(); } @@ -183,6 +183,7 @@ class ContactInfoState extends ChangeNotifier { set totalMessages(int newVal) { this._totalMessages = newVal; + this.messageCache.storageMessageCount = newVal; notifyListeners(); } @@ -251,7 +252,7 @@ class ContactInfoState extends ChangeNotifier { return ret; } - void newMessage(int identifier, int messageID, DateTime timestamp, String senderHandle, String senderImage, bool isAuto, String data, String? contenthash, bool selectedConversation) { + void newMessage(int identifier, int messageID, DateTime timestamp, String senderHandle, String senderImage, bool isAuto, String data, String contenthash, bool selectedConversation) { if (!selectedConversation) { unreadMessages++; } else { diff --git a/lib/models/contactlist.dart b/lib/models/contactlist.dart index f4aaadcf..92ecc12d 100644 --- a/lib/models/contactlist.dart +++ b/lib/models/contactlist.dart @@ -123,7 +123,7 @@ class ContactListState extends ChangeNotifier { return idx >= 0 ? _contacts[idx] : null; } - void newMessage(int identifier, int messageID, DateTime timestamp, String senderHandle, String senderImage, bool isAuto, String data, String? contenthash, bool selectedConversation) { + void newMessage(int identifier, int messageID, DateTime timestamp, String senderHandle, String senderImage, bool isAuto, String data, String contenthash, bool selectedConversation) { getContact(identifier)?.newMessage(identifier, messageID, timestamp, senderHandle, senderImage, isAuto, data, contenthash, selectedConversation); updateLastMessageTime(identifier, DateTime.now()); } diff --git a/lib/models/message.dart b/lib/models/message.dart index 4c6956d3..89a380af 100644 --- a/lib/models/message.dart +++ b/lib/models/message.dart @@ -63,9 +63,11 @@ Message compileOverlay(MessageMetadata metadata, String messageData) { } abstract class CacheHandler { - MessageInfo? lookup(MessageCache cache); - Future fetch(Cwtch cwtch, String profileOnion, int conversationIdentifier); - void add(MessageCache cache, MessageInfo messageInfo, String contenthash); + //Future lookup(MessageCache cache); + //Future fetch(Cwtch cwtch, String profileOnion, int conversationIdentifier, MessageCache cache); + + Future get(Cwtch cwtch, String profileOnion, int conversationIdentifier, MessageCache cache); + //void add(MessageCache cache, MessageInfo messageInfo); } class ByIndex implements CacheHandler { @@ -73,16 +75,44 @@ class ByIndex implements CacheHandler { ByIndex(this.index); - MessageInfo? lookup(MessageCache cache) { + Future lookup(MessageCache cache) async { + var msg = cache.getByIndex(index); + return msg; + } + + Future get( Cwtch cwtch, String profileOnion, int conversationIdentifier, MessageCache cache) async { + var chunk = 40; + if (chunk > cache.storageMessageCount - index) { + chunk = cache.storageMessageCount - index; + } + + if (index < cache.cacheByIndex.length) { + return cache.getByIndex(index); + } + cache.lockIndexs(index, index+chunk); + var msgs = await cwtch.GetMessages(profileOnion, conversationIdentifier, index, chunk); + int i = 0; // declared here for use in finally to unlock + try { + List messagesWrapper = jsonDecode(msgs); + + for(; i < messagesWrapper.length; i++) { + var messageInfo = messageWrapperToInfo(profileOnion, conversationIdentifier, messagesWrapper[i]); + cache.addIndexed(messageInfo, index + i); + } + //messageWrapperToInfo + } catch (e, stacktrace) { + EnvironmentConfig.debugLog("Error: Getting indexed messages $index to ${index+chunk} failed parsing: " + e.toString() + " " + stacktrace.toString()); + } finally { + // todo unlock remaining and mark malformed + if (i != chunk) { + cache.malformIndexes(index+i, index+chunk); + } + } return cache.getByIndex(index); } - Future fetch(Cwtch cwtch, String profileOnion, int conversationIdentifier) { - return cwtch.GetMessage(profileOnion, conversationIdentifier, index); - } - - void add(MessageCache cache, MessageInfo messageInfo, String contenthash) { - cache.add(messageInfo, index, contenthash); + void add(MessageCache cache, MessageInfo messageInfo) { + cache.addIndexed(messageInfo, index); } } @@ -91,17 +121,28 @@ class ById implements CacheHandler { ById(this.id); - MessageInfo? lookup(MessageCache cache) { - return cache.getById(id); + Future lookup(MessageCache cache) { + return Future.value(cache.getById(id)); } - Future fetch(Cwtch cwtch, String profileOnion, int conversationIdentifier) { - return cwtch.GetMessageByID(profileOnion, conversationIdentifier, id); + Future fetch(Cwtch cwtch, String profileOnion, int conversationIdentifier, MessageCache cache) async { + var rawMessageEnvelope = await cwtch.GetMessageByID(profileOnion, conversationIdentifier, id); + var messageInfo = messageJsonToInfo(profileOnion, conversationIdentifier, rawMessageEnvelope); + if (messageInfo == null) { + return Future.value(null); + } + cache.addUnindexed(messageInfo); + return Future.value(messageInfo); } - void add(MessageCache cache, MessageInfo messageInfo, String contenthash) { - cache.addUnindexed(messageInfo, contenthash); + Future get(Cwtch cwtch, String profileOnion, int conversationIdentifier, MessageCache cache) async { + var messageInfo = await lookup(cache); + if (messageInfo != null) { + return Future.value(messageInfo); + } + return fetch(cwtch, profileOnion, conversationIdentifier, cache); } + } class ByContentHash implements CacheHandler { @@ -109,113 +150,91 @@ class ByContentHash implements CacheHandler { ByContentHash(this.hash); - MessageInfo? lookup(MessageCache cache) { - return cache.getByContentHash(hash); + Future lookup(MessageCache cache) { + return Future.value(cache.getByContentHash(hash)); } - Future fetch(Cwtch cwtch, String profileOnion, int conversationIdentifier) { - return cwtch.GetMessageByContentHash(profileOnion, conversationIdentifier, hash); - } - - void add(MessageCache cache, MessageInfo messageInfo, String contenthash) { - cache.addUnindexed(messageInfo, contenthash); - } -} - -Future messageHandler(BuildContext context, String profileOnion, int conversationIdentifier, CacheHandler cacheHandler) { - var malformedMetadata = MessageMetadata(profileOnion, conversationIdentifier, 0, DateTime.now(), "", "", "", {}, false, true, false); - // Hit cache - MessageInfo? messageInfo = getMessageInfoFromCache(context, profileOnion, conversationIdentifier, cacheHandler); - if (messageInfo != null) { - return Future.value(compileOverlay(messageInfo.metadata, messageInfo.wrapper)); - } - - // Fetch and Cache - var messageInfoFuture = fetchAndCacheMessageInfo(context, profileOnion, conversationIdentifier, cacheHandler); - return messageInfoFuture.then((MessageInfo? messageInfo) { - if (messageInfo != null) { - return compileOverlay(messageInfo.metadata, messageInfo.wrapper); - } else { - return MalformedMessage(malformedMetadata); + Future fetch(Cwtch cwtch, String profileOnion, int conversationIdentifier, MessageCache cache) async { + var rawMessageEnvelope = await cwtch.GetMessageByContentHash(profileOnion, conversationIdentifier, hash); + var messageInfo = messageJsonToInfo(profileOnion, conversationIdentifier, rawMessageEnvelope); + if (messageInfo == null) { + return Future.value(null); } - }); + cache.addUnindexed(messageInfo); + return Future.value(messageInfo); + } + + Future get( Cwtch cwtch, String profileOnion, int conversationIdentifier, MessageCache cache) async { + var messageInfo = await lookup(cache); + if (messageInfo != null) { + return Future.value(messageInfo); + } + return fetch(cwtch, profileOnion, conversationIdentifier, cache); + } } -MessageInfo? getMessageInfoFromCache(BuildContext context, String profileOnion, int conversationIdentifier, CacheHandler cacheHandler) { - // Hit cache +Future messageHandler(BuildContext context, String profileOnion, int conversationIdentifier, CacheHandler cacheHandler) async { + var malformedMetadata = MessageMetadata(profileOnion, conversationIdentifier, 0, DateTime.now(), "", "", "", {}, false, true, false, ""); + var cwtch = Provider.of(context, listen: false).cwtch; + + MessageCache? cache; try { - var cache = Provider.of(context, listen: false).contactList.getContact(conversationIdentifier)?.messageCache; - if (cache != null) { - MessageInfo? messageInfo = cacheHandler.lookup(cache); - if (messageInfo != null) { - return messageInfo; - } + cache = Provider + .of(context, listen: false) + .contactList + .getContact(conversationIdentifier) + ?.messageCache; + if (cache == null) { + EnvironmentConfig.debugLog("error: cannot get message cache for profile: $profileOnion conversation: $conversationIdentifier"); + return MalformedMessage(malformedMetadata); } } catch (e) { EnvironmentConfig.debugLog("message handler exception on get from cache: $e"); // provider check failed...make an expensive call... + return MalformedMessage(malformedMetadata); + } + + MessageInfo? messageInfo = await cacheHandler.get(cwtch, profileOnion, conversationIdentifier, cache); + + if (messageInfo != null) { + return compileOverlay(messageInfo.metadata, messageInfo.wrapper); + } else { + return MalformedMessage(malformedMetadata); } - return null; } -Future fetchAndCacheMessageInfo(BuildContext context, String profileOnion, int conversationIdentifier, CacheHandler cacheHandler) { -// Load and cache - var profileInfostate = Provider.of(context, listen: false); +MessageInfo? messageJsonToInfo(String profileOnion, int conversationIdentifier, dynamic messageJson) { try { - Future rawMessageEnvelopeFuture; + dynamic messageWrapper = jsonDecode(messageJson); - rawMessageEnvelopeFuture = cacheHandler.fetch(Provider.of(context, listen: false).cwtch, profileOnion, conversationIdentifier); + if (messageWrapper == null || messageWrapper['Message'] == '' || messageWrapper['Message'] == '{}') { + return null; + } - return rawMessageEnvelopeFuture.then((dynamic rawMessageEnvelope) { - try { - dynamic messageWrapper = jsonDecode(rawMessageEnvelope); - // There are 2 conditions in which this error condition can be met: - // 1. The application == nil, in which case this instance of the UI is already - // broken beyond repair, and will either be replaced by a new version, or requires a complete - // restart. - // 2. This index was incremented and we happened to fetch the timeline prior to the messages inclusion. - // This should be rare as Timeline addition/fetching is mutex protected and Dart itself will pipeline the - // calls to libCwtch-go - however because we use goroutines on the backend there is always a chance that one - // will find itself delayed. - // The second case is recoverable by tail-recursing this future. - if (messageWrapper['Message'] == null || messageWrapper['Message'] == '' || messageWrapper['Message'] == '{}') { - return Future.delayed(Duration(seconds: 2), () { - print("Tail recursive call to messageHandler called. This should be a rare event. If you see multiples of this log over a short period of time please log it as a bug."); - return fetchAndCacheMessageInfo(context, profileOnion, conversationIdentifier, cacheHandler); - }); - } - - // Construct the initial metadata - var messageID = messageWrapper['ID']; - var timestamp = DateTime.tryParse(messageWrapper['Timestamp'])!; - var senderHandle = messageWrapper['PeerID']; - var senderImage = messageWrapper['ContactImage']; - var attributes = messageWrapper['Attributes']; - var ackd = messageWrapper['Acknowledged']; - var error = messageWrapper['Error'] != null; - var signature = messageWrapper['Signature']; - var contenthash = messageWrapper['ContentHash']; - var localIndex = messageWrapper['LocalIndex']; - var metadata = MessageMetadata(profileOnion, conversationIdentifier, messageID, timestamp, senderHandle, senderImage, signature, attributes, ackd, error, false); - var messageInfo = new MessageInfo(metadata, messageWrapper['Message']); - - var cache = profileInfostate.contactList.getContact(conversationIdentifier)?.messageCache; - if (cache != null) { - cacheHandler.add(cache, messageInfo, contenthash); - } - - return messageInfo; - } catch (e, stacktrace) { - EnvironmentConfig.debugLog("message handler exception on parse message and cache: " + e.toString() + " " + stacktrace.toString()); - return null; - } - }); - } catch (e) { - EnvironmentConfig.debugLog("message handler exeption on get message: $e"); - return Future.value(null); + return messageWrapperToInfo(profileOnion, conversationIdentifier, messageWrapper); + } catch (e, stacktrace) { + EnvironmentConfig.debugLog("message handler exception on parse message and cache: " + e.toString() + " " + stacktrace.toString()); + return null; } } +MessageInfo messageWrapperToInfo(String profileOnion, int conversationIdentifier, dynamic messageWrapper) { + // Construct the initial metadata + var messageID = messageWrapper['ID']; + var timestamp = DateTime.tryParse(messageWrapper['Timestamp'])!; + var senderHandle = messageWrapper['PeerID']; + var senderImage = messageWrapper['ContactImage']; + var attributes = messageWrapper['Attributes']; + var ackd = messageWrapper['Acknowledged']; + var error = messageWrapper['Error'] != null; + var signature = messageWrapper['Signature']; + var contenthash = messageWrapper['ContentHash']; + var metadata = MessageMetadata(profileOnion, conversationIdentifier, messageID, timestamp, senderHandle, senderImage, signature, attributes, ackd, error, false, contenthash); + var messageInfo = new MessageInfo(metadata, messageWrapper['Message']); + + return messageInfo; +} + class MessageMetadata extends ChangeNotifier { // meta-metadata final String profileOnion; @@ -231,6 +250,7 @@ class MessageMetadata extends ChangeNotifier { final bool isAuto; final String? signature; + final String contenthash; dynamic get attributes => this._attributes; @@ -249,5 +269,5 @@ class MessageMetadata extends ChangeNotifier { } MessageMetadata( - this.profileOnion, this.conversationIdentifier, this.messageID, this.timestamp, this.senderHandle, this.senderImage, this.signature, this._attributes, this._ackd, this._error, this.isAuto); + this.profileOnion, this.conversationIdentifier, this.messageID, this.timestamp, this.senderHandle, this.senderImage, this.signature, this._attributes, this._ackd, this._error, this.isAuto, this.contenthash); } diff --git a/lib/models/messagecache.dart b/lib/models/messagecache.dart index b357ad52..d45b7036 100644 --- a/lib/models/messagecache.dart +++ b/lib/models/messagecache.dart @@ -1,58 +1,147 @@ +import 'dart:async'; + import 'package:flutter/foundation.dart'; import 'message.dart'; class MessageInfo { - final MessageMetadata metadata; - final String wrapper; + late MessageMetadata metadata; + late String wrapper; + MessageInfo(this.metadata, this.wrapper); } +class LocalIndexMessage { + late bool cacheOnly; + late bool isLoading; + late Future loaded; + late Completer loader; + + late int? messageId; + + + LocalIndexMessage(int? messageId, {cacheOnly = false, isLoading = false}) { + this.messageId = messageId; + this.cacheOnly = cacheOnly; + this.isLoading = isLoading; + if (isLoading) { + loader = Completer(); + loaded = loader.future; + } + } + + void finishLoad(int messageId) { + this.messageId = messageId; + isLoading = false; + loader.complete(true); + } + + void failLoad() { + this.messageId = null; + isLoading = false; + loader.complete(true); + } + + Future waitForLoad() { + return loaded; + } + + Future get() async { + if (isLoading) { + await waitForLoad(); + } + return messageId; + } +} + +// Message cache stores messages for use by the UI and uses MessageHandler and associated ByX loaders +// the cache stores messages in a cache indexed by their storage Id, and has two secondary indexes into it, content hash, and local index +// Index is the primary way to access the cache as it is a sequential ordered access and is used by the message pane +// contentHash is used for fetching replies +// by Id is used when composing a reply +// cacheByIndex supports additional features than just a direct index into the cache (byID) +// it allows locking of ranges in order to support bulk sequential loading (see ByIndex in message.dart) +// cacheByIndex allows allows inserting temporarily non storage backed messages so that Send Message can be respected instantly and then updated upon insertion into backend +// the message cache needs storageMessageCount maintained by the system so it can inform bulk loading when it's reaching the end of fetchable messages class MessageCache extends ChangeNotifier { + // cache of MessageId to Message late Map cache; - late List cacheByIndex; + + // local index to MessageId + late List cacheByIndex; + + // map of content hash to MessageId late Map cacheByHash; - MessageCache() { + late int _storageMessageCount; + + MessageCache(int storageMessageCount) { cache = {}; cacheByIndex = List.empty(growable: true); cacheByHash = {}; + this._storageMessageCount = storageMessageCount; } int get indexedLength => cacheByIndex.length; + int get storageMessageCount => _storageMessageCount; + set storageMessageCount(int newval) { + this._storageMessageCount = newval; + } + MessageInfo? getById(int id) => cache[id]; - MessageInfo? getByIndex(int index) { + + Future getByIndex(int index) async { if (index >= cacheByIndex.length) { return null; } - return cache[cacheByIndex[index]]; + var id = await cacheByIndex[index].get(); + if (id == null) { + return Future.value(null); + } + return cache[id]; } MessageInfo? getByContentHash(String contenthash) => cache[cacheByHash[contenthash]]; - void addNew(String profileOnion, int conversation, int messageID, DateTime timestamp, String senderHandle, String senderImage, bool isAuto, String data, String? contenthash) { - this.cache[messageID] = MessageInfo(MessageMetadata(profileOnion, conversation, messageID, timestamp, senderHandle, senderImage, "", {}, false, false, isAuto), data); - this.cacheByIndex.insert(0, messageID); + void addNew(String profileOnion, int conversation, int messageID, DateTime timestamp, String senderHandle, String senderImage, bool isAuto, String data, String contenthash) { + this.cache[messageID] = MessageInfo(MessageMetadata(profileOnion, conversation, messageID, timestamp, senderHandle, senderImage, "", {}, false, false, isAuto, contenthash), data); + this.cacheByIndex.insert(0, LocalIndexMessage(messageID)); if (contenthash != null && contenthash != "") { this.cacheByHash[contenthash] = messageID; } notifyListeners(); } - void add(MessageInfo messageInfo, int index, String? contenthash) { + void lockIndexs(int start, int end) { + for(var i = start; i < end; i++) { + this.cacheByIndex.insert(i, LocalIndexMessage(null, isLoading: true)); + } + } + + void malformIndexes(int start, int end) { + for(var i = start; i < end; i++) { + this.cacheByIndex[i].failLoad(); + } + } + + void addIndexed(MessageInfo messageInfo, int index) { this.cache[messageInfo.metadata.messageID] = messageInfo; - this.cacheByIndex.insert(index, messageInfo.metadata.messageID); - if (contenthash != null && contenthash != "") { - this.cacheByHash[contenthash] = messageInfo.metadata.messageID; + if (index < this.cacheByIndex.length ) { + this.cacheByIndex[index].finishLoad(messageInfo.metadata.messageID); + } else { + this.cacheByIndex.insert(index, LocalIndexMessage(messageInfo.metadata.messageID)); + } + if (messageInfo.metadata.contenthash != "") { + this.cacheByHash[messageInfo.metadata.contenthash] = messageInfo.metadata.messageID; } notifyListeners(); } - void addUnindexed(MessageInfo messageInfo, String? contenthash) { + void addUnindexed(MessageInfo messageInfo) { this.cache[messageInfo.metadata.messageID] = messageInfo; - if (contenthash != null && contenthash != "") { - this.cacheByHash[contenthash] = messageInfo.metadata.messageID; + if (messageInfo.metadata.contenthash != "") { + this.cacheByHash[messageInfo.metadata.contenthash] = messageInfo.metadata.messageID; } notifyListeners(); } diff --git a/lib/models/profile.dart b/lib/models/profile.dart index 6b7f6b0f..174193c4 100644 --- a/lib/models/profile.dart +++ b/lib/models/profile.dart @@ -202,7 +202,7 @@ class ProfileInfoState extends ChangeNotifier { } void newMessage( - int identifier, int messageID, DateTime timestamp, String senderHandle, String senderImage, bool isAuto, String data, String? contenthash, bool selectedProfile, bool selectedConversation) { + int identifier, int messageID, DateTime timestamp, String senderHandle, String senderImage, bool isAuto, String data, String contenthash, bool selectedProfile, bool selectedConversation) { if (!selectedProfile) { unreadMessages++; notifyListeners(); diff --git a/lib/views/messageview.dart b/lib/views/messageview.dart index 204a455f..187967cd 100644 --- a/lib/views/messageview.dart +++ b/lib/views/messageview.dart @@ -229,17 +229,15 @@ class _MessageViewState extends State { ChatMessage cm = new ChatMessage(o: QuotedMessageOverlay, d: quotedMessage); Provider.of(context, listen: false) .cwtch - .SendMessage(Provider.of(context, listen: false).profileOnion, Provider.of(context, listen: false).identifier, jsonEncode(cm)); + .SendMessage(Provider.of(context, listen: false).profileOnion, Provider.of(context, listen: false).identifier, jsonEncode(cm)).then(_sendMessageHandler); } catch (e) {} Provider.of(context, listen: false).selectedIndex = null; - _sendMessageHelper(); }); } else { ChatMessage cm = new ChatMessage(o: TextMessageOverlay, d: ctrlrCompose.value.text); Provider.of(context, listen: false) .cwtch - .SendMessage(Provider.of(context, listen: false).profileOnion, Provider.of(context, listen: false).identifier, jsonEncode(cm)); - _sendMessageHelper(); + .SendMessage(Provider.of(context, listen: false).profileOnion, Provider.of(context, listen: false).identifier, jsonEncode(cm)).then(_sendMessageHandler); } } } @@ -247,29 +245,40 @@ class _MessageViewState extends State { void _sendInvitation([String? ignoredParam]) { Provider.of(context, listen: false) .cwtch - .SendInvitation(Provider.of(context, listen: false).profileOnion, Provider.of(context, listen: false).identifier, this.selectedContact); - _sendMessageHelper(); + .SendInvitation(Provider.of(context, listen: false).profileOnion, Provider.of(context, listen: false).identifier, this.selectedContact).then(_sendMessageHandler); } void _sendFile(String filePath) { + Provider.of(context, listen: false) .cwtch - .ShareFile(Provider.of(context, listen: false).profileOnion, Provider.of(context, listen: false).identifier, filePath); - _sendMessageHelper(); + .ShareFile(Provider.of(context, listen: false).profileOnion, Provider.of(context, listen: false).identifier, filePath).then(_sendMessageHandler); } - void _sendMessageHelper() { + void _sendMessageHandler(dynamic messageJson) { + var cache = Provider.of(context, listen: false).messageCache; + var profileOnion = Provider.of(context, listen: false).profileOnion; + var identifier = Provider.of(context, listen: false).identifier; + var profile = Provider.of(context, listen: false); + + var messageInfo = messageJsonToInfo(profileOnion, identifier, messageJson); + if (messageInfo != null) { + profile.newMessage( + messageInfo.metadata.conversationIdentifier, + messageInfo.metadata.messageID, + messageInfo.metadata.timestamp, + messageInfo.metadata.senderHandle, + messageInfo.metadata.senderImage ?? "", + messageInfo.metadata.isAuto, + messageInfo.wrapper, + messageInfo.metadata.contenthash, + true, + true, + ); + } + ctrlrCompose.clear(); focusNode.requestFocus(); - Future.delayed(const Duration(milliseconds: 80), () { - var profile = Provider.of(context, listen: false).profileOnion; - var identifier = Provider.of(context, listen: false).identifier; - fetchAndCacheMessageInfo(context, profile, identifier, ByIndex(0)); - Provider.of(context, listen: false).newMarker++; - Provider.of(context, listen: false).totalMessages += 1; - // Resort the contact list... - Provider.of(context, listen: false).contactList.updateLastMessageTime(Provider.of(context, listen: false).identifier, DateTime.now()); - }); } Widget _buildComposeBox() { diff --git a/lib/widgets/messagerow.dart b/lib/widgets/messagerow.dart index 84efcf1c..14cf5393 100644 --- a/lib/widgets/messagerow.dart +++ b/lib/widgets/messagerow.dart @@ -223,10 +223,13 @@ class MessageRowState extends State with SingleTickerProviderStateMi mainAxisAlignment: MainAxisAlignment.center, children: widgetRow, ))))); + // TODO calculate newMark ID in CIS so we dont have to get here var mark = Provider.of(context).newMarker; + //var mi = await Provider.of(context).messageCache.getByIndex(mark - 1); + var markMatch = false; //mi?.metadata.messageID == Provider.of(context).messageID; if (mark > 0 && - Provider.of(context).messageCache.indexedLength > mark && - Provider.of(context).messageCache.getByIndex(mark - 1)?.metadata.messageID == Provider.of(context).messageID) { + Provider.of(context).messageCache.indexedLength > mark && markMatch) + { return Column(crossAxisAlignment: fromMe ? CrossAxisAlignment.end : CrossAxisAlignment.start, children: [Align(alignment: Alignment.center, child: _bubbleNew()), mr]); } else { return mr;