message cache allows index locking, rework messageHandler to use bulk fetching, sendMessage flow with no sleep; move some core getMessages/SendMessage handlers from FlwtchWorker to MainActivity

This commit is contained in:
Dan Ballard 2022-03-23 16:08:19 -07:00
parent 5a1c66bc25
commit ff3e60a750
13 changed files with 434 additions and 258 deletions

View File

@ -290,25 +290,7 @@ class FlwtchWorker(context: Context, parameters: WorkerParameters) :
val conversation = a.getInt("conversation").toLong()
Cwtch.unblockContact(profile, conversation)
}
"SendMessage" -> {
val profile = (a.get("ProfileOnion") as? String) ?: ""
val conversation = a.getInt("conversation").toLong()
val message = (a.get("message") as? String) ?: ""
Log.i(TAG, "SendMessage: $message")
Cwtch.sendMessage(profile, conversation, message)
}
"SendInvitation" -> {
val profile = (a.get("ProfileOnion") as? String) ?: ""
val conversation = a.getInt("conversation").toLong()
val target = a.getInt("target").toLong()
Cwtch.sendInvitation(profile, conversation, target)
}
"ShareFile" -> {
val profile = (a.get("ProfileOnion") as? String) ?: ""
val conversation = a.getInt("conversation").toLong()
val filepath = (a.get("filepath") as? String) ?: ""
Cwtch.shareFile(profile, conversation, filepath)
}
"DownloadFile" -> {
val profile = (a.get("ProfileOnion") as? String) ?: ""
val conversation = a.getInt("conversation").toLong()

View File

@ -35,6 +35,9 @@ import android.os.Environment
import android.database.Cursor
import android.provider.MediaStore
import cwtch.Cwtch
class MainActivity: FlutterActivity() {
override fun provideSplashScreen(): SplashScreen? = SplashView()
@ -168,84 +171,117 @@ class MainActivity: FlutterActivity() {
// receives messages from the ForegroundService (which provides, ironically enough, the backend)
private fun handleCwtch(@NonNull call: MethodCall, @NonNull result: Result) {
var method = call.method
// todo change usage patern to match that in FlwtchWorker
// Unsafe for anything using int args, causes access time attempt to cast to string which will fail
val argmap: Map<String, String> = call.arguments as Map<String, String>
// the frontend calls Start every time it fires up, but we don't want to *actually* call Cwtch.Start()
// in case the ForegroundService is still running. in both cases, however, we *do* want to re-register
// the eventbus listener.
if (call.method == "Start") {
val uniqueTag = argmap["torPath"] ?: "nullEventBus"
when (call.method) {
"Start" -> {
val uniqueTag = argmap["torPath"] ?: "nullEventBus"
// note: because the ForegroundService is specified as UniquePeriodicWork, it can't actually get
// accidentally duplicated. however, we still need to manually check if it's running or not, so
// that we can divert this method call to ReconnectCwtchForeground instead if so.
val works = WorkManager.getInstance(this).getWorkInfosByTag(WORKER_TAG).get()
for (workInfo in works) {
WorkManager.getInstance(this).cancelWorkById(workInfo.id)
}
WorkManager.getInstance(this).pruneWork()
Log.i("MainActivity.kt", "Start() launching foregroundservice")
// this is where the eventbus ForegroundService gets launched. WorkManager should keep it alive after this
val data: Data = Data.Builder().putString(FlwtchWorker.KEY_METHOD, call.method).putString(FlwtchWorker.KEY_ARGS, JSONObject(argmap).toString()).build()
// 15 minutes is the shortest interval you can request
val workRequest = PeriodicWorkRequestBuilder<FlwtchWorker>(15, TimeUnit.MINUTES).setInputData(data).addTag(WORKER_TAG).addTag(uniqueTag).build()
WorkManager.getInstance(this).enqueueUniquePeriodicWork("req_$uniqueTag", ExistingPeriodicWorkPolicy.REPLACE, workRequest)
return
} else if (call.method == "CreateDownloadableFile") {
this.dlToProfile = argmap["ProfileOnion"] ?: ""
this.dlToHandle = argmap["handle"] ?: ""
val suggestedName = argmap["filename"] ?: "filename.ext"
this.dlToFileKey = argmap["filekey"] ?: ""
val intent = Intent(Intent.ACTION_CREATE_DOCUMENT).apply {
addCategory(Intent.CATEGORY_OPENABLE)
type = "application/octet-stream"
putExtra(Intent.EXTRA_TITLE, suggestedName)
}
startActivityForResult(intent, FILEPICKER_REQUEST_CODE)
return
} else if (call.method == "ExportPreviewedFile") {
this.exportFromPath = argmap["Path"] ?: ""
val suggestion = argmap["FileName"] ?: "filename.ext"
var imgType = "jpeg"
if (suggestion.endsWith("png")) {
imgType = "png"
} else if (suggestion.endsWith("webp")) {
imgType = "webp"
} else if (suggestion.endsWith("bmp")) {
imgType = "bmp"
} else if (suggestion.endsWith("gif")) {
imgType = "gif"
}
val intent = Intent(Intent.ACTION_CREATE_DOCUMENT).apply {
addCategory(Intent.CATEGORY_OPENABLE)
type = "image/" + imgType
putExtra(Intent.EXTRA_TITLE, suggestion)
}
startActivityForResult(intent, PREVIEW_EXPORT_REQUEST_CODE)
return
} else if (call.method == "ExportProfile") {
this.exportFromPath = argmap["file"] ?: ""
val intent = Intent(Intent.ACTION_CREATE_DOCUMENT).apply {
addCategory(Intent.CATEGORY_OPENABLE)
type = "application/gzip"
putExtra(Intent.EXTRA_TITLE, argmap["file"])
}
startActivityForResult(intent, PROFILE_EXPORT_REQUEST_CODE)
}
// ...otherwise fallthru to a normal ffi method call (and return the result using the result callback)
val data: Data = Data.Builder().putString(FlwtchWorker.KEY_METHOD, method).putString(FlwtchWorker.KEY_ARGS, JSONObject(argmap).toString()).build()
val workRequest = OneTimeWorkRequestBuilder<FlwtchWorker>().setInputData(data).build()
WorkManager.getInstance(this).enqueue(workRequest)
WorkManager.getInstance(applicationContext).getWorkInfoByIdLiveData(workRequest.id).observe(
this, Observer { workInfo ->
if (workInfo != null && workInfo.state == WorkInfo.State.SUCCEEDED) {
val res = workInfo.outputData.keyValueMap.toString()
result.success(workInfo.outputData.getString("result"))
// note: because the ForegroundService is specified as UniquePeriodicWork, it can't actually get
// accidentally duplicated. however, we still need to manually check if it's running or not, so
// that we can divert this method call to ReconnectCwtchForeground instead if so.
val works = WorkManager.getInstance(this).getWorkInfosByTag(WORKER_TAG).get()
for (workInfo in works) {
WorkManager.getInstance(this).cancelWorkById(workInfo.id)
}
WorkManager.getInstance(this).pruneWork()
Log.i("MainActivity.kt", "Start() launching foregroundservice")
// this is where the eventbus ForegroundService gets launched. WorkManager should keep it alive after this
val data: Data = Data.Builder().putString(FlwtchWorker.KEY_METHOD, call.method).putString(FlwtchWorker.KEY_ARGS, JSONObject(argmap).toString()).build()
// 15 minutes is the shortest interval you can request
val workRequest = PeriodicWorkRequestBuilder<FlwtchWorker>(15, TimeUnit.MINUTES).setInputData(data).addTag(WORKER_TAG).addTag(uniqueTag).build()
WorkManager.getInstance(this).enqueueUniquePeriodicWork("req_$uniqueTag", ExistingPeriodicWorkPolicy.REPLACE, workRequest)
}
)
"CreateDownloadableFile" -> {
this.dlToProfile = argmap["ProfileOnion"] ?: ""
this.dlToHandle = argmap["handle"] ?: ""
val suggestedName = argmap["filename"] ?: "filename.ext"
this.dlToFileKey = argmap["filekey"] ?: ""
val intent = Intent(Intent.ACTION_CREATE_DOCUMENT).apply {
addCategory(Intent.CATEGORY_OPENABLE)
type = "application/octet-stream"
putExtra(Intent.EXTRA_TITLE, suggestedName)
}
startActivityForResult(intent, FILEPICKER_REQUEST_CODE)
}
"ExportPreviewedFile" -> {
this.exportFromPath = argmap["Path"] ?: ""
val suggestion = argmap["FileName"] ?: "filename.ext"
var imgType = "jpeg"
if (suggestion.endsWith("png")) {
imgType = "png"
} else if (suggestion.endsWith("webp")) {
imgType = "webp"
} else if (suggestion.endsWith("bmp")) {
imgType = "bmp"
} else if (suggestion.endsWith("gif")) {
imgType = "gif"
}
val intent = Intent(Intent.ACTION_CREATE_DOCUMENT).apply {
addCategory(Intent.CATEGORY_OPENABLE)
type = "image/" + imgType
putExtra(Intent.EXTRA_TITLE, suggestion)
}
startActivityForResult(intent, PREVIEW_EXPORT_REQUEST_CODE)
}
"ExportProfile" -> {
this.exportFromPath = argmap["file"] ?: ""
val intent = Intent(Intent.ACTION_CREATE_DOCUMENT).apply {
addCategory(Intent.CATEGORY_OPENABLE)
type = "application/gzip"
putExtra(Intent.EXTRA_TITLE, argmap["file"])
}
startActivityForResult(intent, PROFILE_EXPORT_REQUEST_CODE)
}
"GetMessages" -> {
Log.d("MainActivity.kt", "Cwtch GetMessages")
val profile = argmap["ProfileOnion"] ?: ""
val conversation: Int = call.argument("conversation") ?: 0
val indexI: Int = call.argument("index") ?: 0
val count: Int = call.argument("count") ?: 1
result.success(Cwtch.getMessages(profile, conversation.toLong(), indexI.toLong(), count.toLong()))
}
"SendMessage" -> {
val profile: String = call.argument("ProfileOnion") ?: ""
val conversation: Int = call.argument("conversation") ?: 0
val message: String = call.argument("message") ?: ""
result.success(Cwtch.sendMessage(profile, conversation.toLong(), message))
}
"SendInvitation" -> {
val profile: String = call.argument("ProfileOnion") ?: ""
val conversation: Int = call.argument("conversation") ?: 0
val target: Int = call.argument("target") ?: 0
result.success(Cwtch.sendInvitation(profile, conversation.toLong(), target.toLong()))
}
"ShareFile" -> {
val profile: String = call.argument("ProfileOnion") ?: ""
val conversation: Int = call.argument("conversation") ?: 0
val filepath: String = call.argument("filepath") ?: ""
result.success(Cwtch.shareFile(profile, conversation.toLong(), filepath))
}
else -> {
// ...otherwise fallthru to a normal ffi method call (and return the result using the result callback)
val data: Data = Data.Builder().putString(FlwtchWorker.KEY_METHOD, method).putString(FlwtchWorker.KEY_ARGS, JSONObject(argmap).toString()).build()
val workRequest = OneTimeWorkRequestBuilder<FlwtchWorker>().setInputData(data).build()
WorkManager.getInstance(this).enqueue(workRequest)
WorkManager.getInstance(applicationContext).getWorkInfoByIdLiveData(workRequest.id).observe(
this, Observer { workInfo ->
if (workInfo != null && workInfo.state == WorkInfo.State.SUCCEEDED) {
val res = workInfo.outputData.keyValueMap.toString()
result.success(workInfo.outputData.getString("result"))
}
}
)
}
}
}
// using onresume/onstop for broadcastreceiver because of extended discussion on https://stackoverflow.com/questions/7439041/how-to-unregister-broadcastreceiver

View File

@ -48,12 +48,15 @@ abstract class Cwtch {
Future<dynamic> GetMessageByContentHash(String profile, int handle, String contentHash);
// ignore: non_constant_identifier_names
void SendMessage(String profile, int handle, String message);
// ignore: non_constant_identifier_names
void SendInvitation(String profile, int handle, int target);
Future<dynamic> GetMessages(String profile, int handle, int index, int count);
// ignore: non_constant_identifier_names
void ShareFile(String profile, int handle, String filepath);
Future<dynamic> SendMessage(String profile, int handle, String message);
// ignore: non_constant_identifier_names
Future<dynamic> SendInvitation(String profile, int handle, int target);
// ignore: non_constant_identifier_names
Future<dynamic> ShareFile(String profile, int handle, String filepath);
// ignore: non_constant_identifier_names
void DownloadFile(String profile, int handle, String filepath, String manifestpath, String filekey);
// android-only

View File

@ -159,7 +159,7 @@ class CwtchNotifier {
var senderHandle = data['RemotePeer'];
var senderImage = data['picture'];
var isAuto = data['Auto'] == "true";
String? contenthash = data['ContentHash'];
String contenthash = data['ContentHash'];
var selectedProfile = appState.selectedProfile == data["ProfileOnion"];
var selectedConversation = selectedProfile && appState.selectedConversation == identifier;
var notification = data["notification"];
@ -211,7 +211,7 @@ class CwtchNotifier {
var contact = profileCN.getProfile(data["ProfileOnion"])?.contactList.getContact(identifier);
var currentTotal = contact!.totalMessages;
var isAuto = data['Auto'] == "true";
String? contenthash = data['ContentHash'];
String contenthash = data['ContentHash'];
var selectedProfile = appState.selectedProfile == data["ProfileOnion"];
var selectedConversation = selectedProfile && appState.selectedConversation == identifier;
var notification = data["notification"];

View File

@ -61,6 +61,9 @@ typedef VoidFromStringIntFn = void Function(Pointer<Utf8>, int, int);
typedef get_json_blob_string_function = Pointer<Utf8> Function(Pointer<Utf8> str, Int32 length);
typedef GetJsonBlobStringFn = Pointer<Utf8> Function(Pointer<Utf8> str, int len);
typedef get_json_blob_from_string_int_string_function = Pointer<Utf8> Function(Pointer<Utf8>, Int32 , Int32, Pointer<Utf8>, Int32);
typedef GetJsonBlobFromStrIntStrFn = Pointer<Utf8> Function(Pointer<Utf8>, int, int, Pointer<Utf8>, int);
//func GetMessage(profile_ptr *C.char, profile_len C.int, handle_ptr *C.char, handle_len C.int, message_index C.int) *C.char {
typedef get_json_blob_from_str_str_int_function = Pointer<Utf8> Function(Pointer<Utf8>, Int32, Pointer<Utf8>, Int32, Int32);
typedef GetJsonBlobFromStrStrIntFn = Pointer<Utf8> Function(Pointer<Utf8>, int, Pointer<Utf8>, int, int);
@ -68,6 +71,9 @@ typedef GetJsonBlobFromStrStrIntFn = Pointer<Utf8> Function(Pointer<Utf8>, int,
typedef get_json_blob_from_str_int_int_function = Pointer<Utf8> Function(Pointer<Utf8>, Int32, Int32, Int32);
typedef GetJsonBlobFromStrIntIntFn = Pointer<Utf8> Function(Pointer<Utf8>, int, int, int);
typedef get_json_blob_from_str_int_int_int_function = Pointer<Utf8> Function(Pointer<Utf8>, Int32, Int32, Int32, Int32);
typedef GetJsonBlobFromStrIntIntIntFn = Pointer<Utf8> Function(Pointer<Utf8>, int, int, int, int);
typedef get_json_blob_from_str_int_string_function = Pointer<Utf8> Function(Pointer<Utf8>, Int32, Int32, Pointer<Utf8>, Int32);
typedef GetJsonBlobFromStrIntStringFn = Pointer<Utf8> Function(
Pointer<Utf8>,
@ -300,6 +306,19 @@ class CwtchFfi implements Cwtch {
return jsonMessage;
}
// ignore: non_constant_identifier_names
Future<dynamic> GetMessages(String profile, int handle, int index, int count) async {
var getMessagesC = library.lookup<NativeFunction<get_json_blob_from_str_int_int_int_function>>("c_GetMessages");
// ignore: non_constant_identifier_names
final GetMessages = getMessagesC.asFunction<GetJsonBlobFromStrIntIntIntFn>();
final utf8profile = profile.toNativeUtf8();
Pointer<Utf8> jsonMessageBytes = GetMessages(utf8profile, utf8profile.length, handle, index, count);
String jsonMessage = jsonMessageBytes.toDartString();
_UnsafeFreePointerAnyUseOfThisFunctionMustBeDoubleApproved(jsonMessageBytes);
malloc.free(utf8profile);
return jsonMessage;
}
@override
// ignore: non_constant_identifier_names
void SendProfileEvent(String onion, String json) {
@ -359,39 +378,48 @@ class CwtchFfi implements Cwtch {
@override
// ignore: non_constant_identifier_names
void SendMessage(String profileOnion, int contactHandle, String message) {
var sendMessage = library.lookup<NativeFunction<void_from_string_int_string_function>>("c_SendMessage");
Future<dynamic> SendMessage(String profileOnion, int contactHandle, String message) async {
var sendMessage = library.lookup<NativeFunction<get_json_blob_from_string_int_string_function>>("c_SendMessage");
// ignore: non_constant_identifier_names
final SendMessage = sendMessage.asFunction<VoidFromStringIntStringFn>();
final SendMessage = sendMessage.asFunction<GetJsonBlobFromStrIntStrFn>();
final u1 = profileOnion.toNativeUtf8();
final u3 = message.toNativeUtf8();
SendMessage(u1, u1.length, contactHandle, u3, u3.length);
Pointer<Utf8> jsonMessageBytes = SendMessage(u1, u1.length, contactHandle, u3, u3.length);
String jsonMessage = jsonMessageBytes.toDartString();
_UnsafeFreePointerAnyUseOfThisFunctionMustBeDoubleApproved(jsonMessageBytes);
malloc.free(u1);
malloc.free(u3);
return jsonMessage;
}
@override
// ignore: non_constant_identifier_names
void SendInvitation(String profileOnion, int contactHandle, int target) {
var sendInvitation = library.lookup<NativeFunction<void_from_string_int_int_function>>("c_SendInvitation");
Future<dynamic> SendInvitation(String profileOnion, int contactHandle, int target) async {
var sendInvitation = library.lookup<NativeFunction<get_json_blob_from_str_int_int_function>>("c_SendInvitation");
// ignore: non_constant_identifier_names
final SendInvitation = sendInvitation.asFunction<VoidFromStringIntIntFn>();
final SendInvitation = sendInvitation.asFunction<GetJsonBlobFromStrIntIntFn>();
final u1 = profileOnion.toNativeUtf8();
SendInvitation(u1, u1.length, contactHandle, target);
Pointer<Utf8> jsonMessageBytes = SendInvitation(u1, u1.length, contactHandle, target);
String jsonMessage = jsonMessageBytes.toDartString();
_UnsafeFreePointerAnyUseOfThisFunctionMustBeDoubleApproved(jsonMessageBytes);
malloc.free(u1);
return jsonMessage;
}
@override
// ignore: non_constant_identifier_names
void ShareFile(String profileOnion, int contactHandle, String filepath) {
var shareFile = library.lookup<NativeFunction<void_from_string_int_string_function>>("c_ShareFile");
Future<dynamic> ShareFile(String profileOnion, int contactHandle, String filepath) async {
var shareFile = library.lookup<NativeFunction<get_json_blob_from_string_int_string_function>>("c_ShareFile");
// ignore: non_constant_identifier_names
final ShareFile = shareFile.asFunction<VoidFromStringIntStringFn>();
final ShareFile = shareFile.asFunction<GetJsonBlobFromStrIntStrFn>();
final u1 = profileOnion.toNativeUtf8();
final u3 = filepath.toNativeUtf8();
ShareFile(u1, u1.length, contactHandle, u3, u3.length);
Pointer<Utf8> jsonMessageBytes = ShareFile(u1, u1.length, contactHandle, u3, u3.length);
String jsonMessage = jsonMessageBytes.toDartString();
_UnsafeFreePointerAnyUseOfThisFunctionMustBeDoubleApproved(jsonMessageBytes);
malloc.free(u1);
malloc.free(u3);
return jsonMessage;
}
@override

View File

@ -94,6 +94,11 @@ class CwtchGomobile implements Cwtch {
return cwtchPlatform.invokeMethod("GetMessageByID", {"ProfileOnion": profile, "conversation": conversation, "id": id});
}
// ignore: non_constant_identifier_names
Future<dynamic> GetMessages(String profile, int conversation, int index, int count) {
return cwtchPlatform.invokeMethod("GetMessages", {"ProfileOnion": profile, "conversation": conversation, "index": index, "count": count});
}
@override
// ignore: non_constant_identifier_names
void SendProfileEvent(String onion, String jsonEvent) {
@ -129,20 +134,20 @@ class CwtchGomobile implements Cwtch {
@override
// ignore: non_constant_identifier_names
void SendMessage(String profileOnion, int conversation, String message) {
cwtchPlatform.invokeMethod("SendMessage", {"ProfileOnion": profileOnion, "conversation": conversation, "message": message});
Future<dynamic> SendMessage(String profileOnion, int conversation, String message) {
return cwtchPlatform.invokeMethod("SendMessage", {"ProfileOnion": profileOnion, "conversation": conversation, "message": message});
}
@override
// ignore: non_constant_identifier_names
void SendInvitation(String profileOnion, int conversation, int target) {
cwtchPlatform.invokeMethod("SendInvitation", {"ProfileOnion": profileOnion, "conversation": conversation, "target": target});
Future<dynamic> SendInvitation(String profileOnion, int conversation, int target) {
return cwtchPlatform.invokeMethod("SendInvitation", {"ProfileOnion": profileOnion, "conversation": conversation, "target": target});
}
@override
// ignore: non_constant_identifier_names
void ShareFile(String profileOnion, int conversation, String filepath) {
cwtchPlatform.invokeMethod("ShareFile", {"ProfileOnion": profileOnion, "conversation": conversation, "filepath": filepath});
Future<dynamic> ShareFile(String profileOnion, int conversation, String filepath) {
return cwtchPlatform.invokeMethod("ShareFile", {"ProfileOnion": profileOnion, "conversation": conversation, "filepath": filepath});
}
@override

View File

@ -82,7 +82,7 @@ class ContactInfoState extends ChangeNotifier {
this._server = server;
this._archived = archived;
this._notificationPolicy = notificationPolicyFromString(notificationPolicy);
this.messageCache = new MessageCache();
this.messageCache = new MessageCache(_totalMessages);
keys = Map<String, GlobalKey<MessageRowState>>();
}
@ -183,6 +183,7 @@ class ContactInfoState extends ChangeNotifier {
set totalMessages(int newVal) {
this._totalMessages = newVal;
this.messageCache.storageMessageCount = newVal;
notifyListeners();
}
@ -251,7 +252,7 @@ class ContactInfoState extends ChangeNotifier {
return ret;
}
void newMessage(int identifier, int messageID, DateTime timestamp, String senderHandle, String senderImage, bool isAuto, String data, String? contenthash, bool selectedConversation) {
void newMessage(int identifier, int messageID, DateTime timestamp, String senderHandle, String senderImage, bool isAuto, String data, String contenthash, bool selectedConversation) {
if (!selectedConversation) {
unreadMessages++;
} else {

View File

@ -123,7 +123,7 @@ class ContactListState extends ChangeNotifier {
return idx >= 0 ? _contacts[idx] : null;
}
void newMessage(int identifier, int messageID, DateTime timestamp, String senderHandle, String senderImage, bool isAuto, String data, String? contenthash, bool selectedConversation) {
void newMessage(int identifier, int messageID, DateTime timestamp, String senderHandle, String senderImage, bool isAuto, String data, String contenthash, bool selectedConversation) {
getContact(identifier)?.newMessage(identifier, messageID, timestamp, senderHandle, senderImage, isAuto, data, contenthash, selectedConversation);
updateLastMessageTime(identifier, DateTime.now());
}

View File

@ -63,9 +63,11 @@ Message compileOverlay(MessageMetadata metadata, String messageData) {
}
abstract class CacheHandler {
MessageInfo? lookup(MessageCache cache);
Future<dynamic> fetch(Cwtch cwtch, String profileOnion, int conversationIdentifier);
void add(MessageCache cache, MessageInfo messageInfo, String contenthash);
//Future<MessageInfo?> lookup(MessageCache cache);
//Future<MessageInfo?> fetch(Cwtch cwtch, String profileOnion, int conversationIdentifier, MessageCache cache);
Future<MessageInfo?> get(Cwtch cwtch, String profileOnion, int conversationIdentifier, MessageCache cache);
//void add(MessageCache cache, MessageInfo messageInfo);
}
class ByIndex implements CacheHandler {
@ -73,16 +75,44 @@ class ByIndex implements CacheHandler {
ByIndex(this.index);
MessageInfo? lookup(MessageCache cache) {
Future<MessageInfo?> lookup(MessageCache cache) async {
var msg = cache.getByIndex(index);
return msg;
}
Future<MessageInfo?> get( Cwtch cwtch, String profileOnion, int conversationIdentifier, MessageCache cache) async {
var chunk = 40;
if (chunk > cache.storageMessageCount - index) {
chunk = cache.storageMessageCount - index;
}
if (index < cache.cacheByIndex.length) {
return cache.getByIndex(index);
}
cache.lockIndexs(index, index+chunk);
var msgs = await cwtch.GetMessages(profileOnion, conversationIdentifier, index, chunk);
int i = 0; // declared here for use in finally to unlock
try {
List<dynamic> messagesWrapper = jsonDecode(msgs);
for(; i < messagesWrapper.length; i++) {
var messageInfo = messageWrapperToInfo(profileOnion, conversationIdentifier, messagesWrapper[i]);
cache.addIndexed(messageInfo, index + i);
}
//messageWrapperToInfo
} catch (e, stacktrace) {
EnvironmentConfig.debugLog("Error: Getting indexed messages $index to ${index+chunk} failed parsing: " + e.toString() + " " + stacktrace.toString());
} finally {
// todo unlock remaining and mark malformed
if (i != chunk) {
cache.malformIndexes(index+i, index+chunk);
}
}
return cache.getByIndex(index);
}
Future<dynamic> fetch(Cwtch cwtch, String profileOnion, int conversationIdentifier) {
return cwtch.GetMessage(profileOnion, conversationIdentifier, index);
}
void add(MessageCache cache, MessageInfo messageInfo, String contenthash) {
cache.add(messageInfo, index, contenthash);
void add(MessageCache cache, MessageInfo messageInfo) {
cache.addIndexed(messageInfo, index);
}
}
@ -91,17 +121,28 @@ class ById implements CacheHandler {
ById(this.id);
MessageInfo? lookup(MessageCache cache) {
return cache.getById(id);
Future<MessageInfo?> lookup(MessageCache cache) {
return Future<MessageInfo?>.value(cache.getById(id));
}
Future<dynamic> fetch(Cwtch cwtch, String profileOnion, int conversationIdentifier) {
return cwtch.GetMessageByID(profileOnion, conversationIdentifier, id);
Future<MessageInfo?> fetch(Cwtch cwtch, String profileOnion, int conversationIdentifier, MessageCache cache) async {
var rawMessageEnvelope = await cwtch.GetMessageByID(profileOnion, conversationIdentifier, id);
var messageInfo = messageJsonToInfo(profileOnion, conversationIdentifier, rawMessageEnvelope);
if (messageInfo == null) {
return Future.value(null);
}
cache.addUnindexed(messageInfo);
return Future.value(messageInfo);
}
void add(MessageCache cache, MessageInfo messageInfo, String contenthash) {
cache.addUnindexed(messageInfo, contenthash);
Future<MessageInfo?> get(Cwtch cwtch, String profileOnion, int conversationIdentifier, MessageCache cache) async {
var messageInfo = await lookup(cache);
if (messageInfo != null) {
return Future.value(messageInfo);
}
return fetch(cwtch, profileOnion, conversationIdentifier, cache);
}
}
class ByContentHash implements CacheHandler {
@ -109,113 +150,91 @@ class ByContentHash implements CacheHandler {
ByContentHash(this.hash);
MessageInfo? lookup(MessageCache cache) {
return cache.getByContentHash(hash);
Future<MessageInfo?> lookup(MessageCache cache) {
return Future<MessageInfo?>.value(cache.getByContentHash(hash));
}
Future<dynamic> fetch(Cwtch cwtch, String profileOnion, int conversationIdentifier) {
return cwtch.GetMessageByContentHash(profileOnion, conversationIdentifier, hash);
}
void add(MessageCache cache, MessageInfo messageInfo, String contenthash) {
cache.addUnindexed(messageInfo, contenthash);
}
}
Future<Message> messageHandler(BuildContext context, String profileOnion, int conversationIdentifier, CacheHandler cacheHandler) {
var malformedMetadata = MessageMetadata(profileOnion, conversationIdentifier, 0, DateTime.now(), "", "", "", <String, String>{}, false, true, false);
// Hit cache
MessageInfo? messageInfo = getMessageInfoFromCache(context, profileOnion, conversationIdentifier, cacheHandler);
if (messageInfo != null) {
return Future.value(compileOverlay(messageInfo.metadata, messageInfo.wrapper));
}
// Fetch and Cache
var messageInfoFuture = fetchAndCacheMessageInfo(context, profileOnion, conversationIdentifier, cacheHandler);
return messageInfoFuture.then((MessageInfo? messageInfo) {
if (messageInfo != null) {
return compileOverlay(messageInfo.metadata, messageInfo.wrapper);
} else {
return MalformedMessage(malformedMetadata);
Future<MessageInfo?> fetch(Cwtch cwtch, String profileOnion, int conversationIdentifier, MessageCache cache) async {
var rawMessageEnvelope = await cwtch.GetMessageByContentHash(profileOnion, conversationIdentifier, hash);
var messageInfo = messageJsonToInfo(profileOnion, conversationIdentifier, rawMessageEnvelope);
if (messageInfo == null) {
return Future.value(null);
}
});
cache.addUnindexed(messageInfo);
return Future.value(messageInfo);
}
Future<MessageInfo?> get( Cwtch cwtch, String profileOnion, int conversationIdentifier, MessageCache cache) async {
var messageInfo = await lookup(cache);
if (messageInfo != null) {
return Future.value(messageInfo);
}
return fetch(cwtch, profileOnion, conversationIdentifier, cache);
}
}
MessageInfo? getMessageInfoFromCache(BuildContext context, String profileOnion, int conversationIdentifier, CacheHandler cacheHandler) {
// Hit cache
Future<Message> messageHandler(BuildContext context, String profileOnion, int conversationIdentifier, CacheHandler cacheHandler) async {
var malformedMetadata = MessageMetadata(profileOnion, conversationIdentifier, 0, DateTime.now(), "", "", "", <String, String>{}, false, true, false, "");
var cwtch = Provider.of<FlwtchState>(context, listen: false).cwtch;
MessageCache? cache;
try {
var cache = Provider.of<ProfileInfoState>(context, listen: false).contactList.getContact(conversationIdentifier)?.messageCache;
if (cache != null) {
MessageInfo? messageInfo = cacheHandler.lookup(cache);
if (messageInfo != null) {
return messageInfo;
}
cache = Provider
.of<ProfileInfoState>(context, listen: false)
.contactList
.getContact(conversationIdentifier)
?.messageCache;
if (cache == null) {
EnvironmentConfig.debugLog("error: cannot get message cache for profile: $profileOnion conversation: $conversationIdentifier");
return MalformedMessage(malformedMetadata);
}
} catch (e) {
EnvironmentConfig.debugLog("message handler exception on get from cache: $e");
// provider check failed...make an expensive call...
return MalformedMessage(malformedMetadata);
}
MessageInfo? messageInfo = await cacheHandler.get(cwtch, profileOnion, conversationIdentifier, cache);
if (messageInfo != null) {
return compileOverlay(messageInfo.metadata, messageInfo.wrapper);
} else {
return MalformedMessage(malformedMetadata);
}
return null;
}
Future<MessageInfo?> fetchAndCacheMessageInfo(BuildContext context, String profileOnion, int conversationIdentifier, CacheHandler cacheHandler) {
// Load and cache
var profileInfostate = Provider.of<ProfileInfoState>(context, listen: false);
MessageInfo? messageJsonToInfo(String profileOnion, int conversationIdentifier, dynamic messageJson) {
try {
Future<dynamic> rawMessageEnvelopeFuture;
dynamic messageWrapper = jsonDecode(messageJson);
rawMessageEnvelopeFuture = cacheHandler.fetch(Provider.of<FlwtchState>(context, listen: false).cwtch, profileOnion, conversationIdentifier);
if (messageWrapper == null || messageWrapper['Message'] == '' || messageWrapper['Message'] == '{}') {
return null;
}
return rawMessageEnvelopeFuture.then((dynamic rawMessageEnvelope) {
try {
dynamic messageWrapper = jsonDecode(rawMessageEnvelope);
// There are 2 conditions in which this error condition can be met:
// 1. The application == nil, in which case this instance of the UI is already
// broken beyond repair, and will either be replaced by a new version, or requires a complete
// restart.
// 2. This index was incremented and we happened to fetch the timeline prior to the messages inclusion.
// This should be rare as Timeline addition/fetching is mutex protected and Dart itself will pipeline the
// calls to libCwtch-go - however because we use goroutines on the backend there is always a chance that one
// will find itself delayed.
// The second case is recoverable by tail-recursing this future.
if (messageWrapper['Message'] == null || messageWrapper['Message'] == '' || messageWrapper['Message'] == '{}') {
return Future.delayed(Duration(seconds: 2), () {
print("Tail recursive call to messageHandler called. This should be a rare event. If you see multiples of this log over a short period of time please log it as a bug.");
return fetchAndCacheMessageInfo(context, profileOnion, conversationIdentifier, cacheHandler);
});
}
// Construct the initial metadata
var messageID = messageWrapper['ID'];
var timestamp = DateTime.tryParse(messageWrapper['Timestamp'])!;
var senderHandle = messageWrapper['PeerID'];
var senderImage = messageWrapper['ContactImage'];
var attributes = messageWrapper['Attributes'];
var ackd = messageWrapper['Acknowledged'];
var error = messageWrapper['Error'] != null;
var signature = messageWrapper['Signature'];
var contenthash = messageWrapper['ContentHash'];
var localIndex = messageWrapper['LocalIndex'];
var metadata = MessageMetadata(profileOnion, conversationIdentifier, messageID, timestamp, senderHandle, senderImage, signature, attributes, ackd, error, false);
var messageInfo = new MessageInfo(metadata, messageWrapper['Message']);
var cache = profileInfostate.contactList.getContact(conversationIdentifier)?.messageCache;
if (cache != null) {
cacheHandler.add(cache, messageInfo, contenthash);
}
return messageInfo;
} catch (e, stacktrace) {
EnvironmentConfig.debugLog("message handler exception on parse message and cache: " + e.toString() + " " + stacktrace.toString());
return null;
}
});
} catch (e) {
EnvironmentConfig.debugLog("message handler exeption on get message: $e");
return Future.value(null);
return messageWrapperToInfo(profileOnion, conversationIdentifier, messageWrapper);
} catch (e, stacktrace) {
EnvironmentConfig.debugLog("message handler exception on parse message and cache: " + e.toString() + " " + stacktrace.toString());
return null;
}
}
MessageInfo messageWrapperToInfo(String profileOnion, int conversationIdentifier, dynamic messageWrapper) {
// Construct the initial metadata
var messageID = messageWrapper['ID'];
var timestamp = DateTime.tryParse(messageWrapper['Timestamp'])!;
var senderHandle = messageWrapper['PeerID'];
var senderImage = messageWrapper['ContactImage'];
var attributes = messageWrapper['Attributes'];
var ackd = messageWrapper['Acknowledged'];
var error = messageWrapper['Error'] != null;
var signature = messageWrapper['Signature'];
var contenthash = messageWrapper['ContentHash'];
var metadata = MessageMetadata(profileOnion, conversationIdentifier, messageID, timestamp, senderHandle, senderImage, signature, attributes, ackd, error, false, contenthash);
var messageInfo = new MessageInfo(metadata, messageWrapper['Message']);
return messageInfo;
}
class MessageMetadata extends ChangeNotifier {
// meta-metadata
final String profileOnion;
@ -231,6 +250,7 @@ class MessageMetadata extends ChangeNotifier {
final bool isAuto;
final String? signature;
final String contenthash;
dynamic get attributes => this._attributes;
@ -249,5 +269,5 @@ class MessageMetadata extends ChangeNotifier {
}
MessageMetadata(
this.profileOnion, this.conversationIdentifier, this.messageID, this.timestamp, this.senderHandle, this.senderImage, this.signature, this._attributes, this._ackd, this._error, this.isAuto);
this.profileOnion, this.conversationIdentifier, this.messageID, this.timestamp, this.senderHandle, this.senderImage, this.signature, this._attributes, this._ackd, this._error, this.isAuto, this.contenthash);
}

View File

@ -1,58 +1,147 @@
import 'dart:async';
import 'package:flutter/foundation.dart';
import 'message.dart';
class MessageInfo {
final MessageMetadata metadata;
final String wrapper;
late MessageMetadata metadata;
late String wrapper;
MessageInfo(this.metadata, this.wrapper);
}
class LocalIndexMessage {
late bool cacheOnly;
late bool isLoading;
late Future<void> loaded;
late Completer<void> loader;
late int? messageId;
LocalIndexMessage(int? messageId, {cacheOnly = false, isLoading = false}) {
this.messageId = messageId;
this.cacheOnly = cacheOnly;
this.isLoading = isLoading;
if (isLoading) {
loader = Completer<void>();
loaded = loader.future;
}
}
void finishLoad(int messageId) {
this.messageId = messageId;
isLoading = false;
loader.complete(true);
}
void failLoad() {
this.messageId = null;
isLoading = false;
loader.complete(true);
}
Future<void> waitForLoad() {
return loaded;
}
Future<int?> get() async {
if (isLoading) {
await waitForLoad();
}
return messageId;
}
}
// Message cache stores messages for use by the UI and uses MessageHandler and associated ByX loaders
// the cache stores messages in a cache indexed by their storage Id, and has two secondary indexes into it, content hash, and local index
// Index is the primary way to access the cache as it is a sequential ordered access and is used by the message pane
// contentHash is used for fetching replies
// by Id is used when composing a reply
// cacheByIndex supports additional features than just a direct index into the cache (byID)
// it allows locking of ranges in order to support bulk sequential loading (see ByIndex in message.dart)
// cacheByIndex allows allows inserting temporarily non storage backed messages so that Send Message can be respected instantly and then updated upon insertion into backend
// the message cache needs storageMessageCount maintained by the system so it can inform bulk loading when it's reaching the end of fetchable messages
class MessageCache extends ChangeNotifier {
// cache of MessageId to Message
late Map<int, MessageInfo> cache;
late List<int?> cacheByIndex;
// local index to MessageId
late List<LocalIndexMessage> cacheByIndex;
// map of content hash to MessageId
late Map<String, int> cacheByHash;
MessageCache() {
late int _storageMessageCount;
MessageCache(int storageMessageCount) {
cache = {};
cacheByIndex = List.empty(growable: true);
cacheByHash = {};
this._storageMessageCount = storageMessageCount;
}
int get indexedLength => cacheByIndex.length;
int get storageMessageCount => _storageMessageCount;
set storageMessageCount(int newval) {
this._storageMessageCount = newval;
}
MessageInfo? getById(int id) => cache[id];
MessageInfo? getByIndex(int index) {
Future<MessageInfo?> getByIndex(int index) async {
if (index >= cacheByIndex.length) {
return null;
}
return cache[cacheByIndex[index]];
var id = await cacheByIndex[index].get();
if (id == null) {
return Future<MessageInfo?>.value(null);
}
return cache[id];
}
MessageInfo? getByContentHash(String contenthash) => cache[cacheByHash[contenthash]];
void addNew(String profileOnion, int conversation, int messageID, DateTime timestamp, String senderHandle, String senderImage, bool isAuto, String data, String? contenthash) {
this.cache[messageID] = MessageInfo(MessageMetadata(profileOnion, conversation, messageID, timestamp, senderHandle, senderImage, "", {}, false, false, isAuto), data);
this.cacheByIndex.insert(0, messageID);
void addNew(String profileOnion, int conversation, int messageID, DateTime timestamp, String senderHandle, String senderImage, bool isAuto, String data, String contenthash) {
this.cache[messageID] = MessageInfo(MessageMetadata(profileOnion, conversation, messageID, timestamp, senderHandle, senderImage, "", {}, false, false, isAuto, contenthash), data);
this.cacheByIndex.insert(0, LocalIndexMessage(messageID));
if (contenthash != null && contenthash != "") {
this.cacheByHash[contenthash] = messageID;
}
notifyListeners();
}
void add(MessageInfo messageInfo, int index, String? contenthash) {
void lockIndexs(int start, int end) {
for(var i = start; i < end; i++) {
this.cacheByIndex.insert(i, LocalIndexMessage(null, isLoading: true));
}
}
void malformIndexes(int start, int end) {
for(var i = start; i < end; i++) {
this.cacheByIndex[i].failLoad();
}
}
void addIndexed(MessageInfo messageInfo, int index) {
this.cache[messageInfo.metadata.messageID] = messageInfo;
this.cacheByIndex.insert(index, messageInfo.metadata.messageID);
if (contenthash != null && contenthash != "") {
this.cacheByHash[contenthash] = messageInfo.metadata.messageID;
if (index < this.cacheByIndex.length ) {
this.cacheByIndex[index].finishLoad(messageInfo.metadata.messageID);
} else {
this.cacheByIndex.insert(index, LocalIndexMessage(messageInfo.metadata.messageID));
}
if (messageInfo.metadata.contenthash != "") {
this.cacheByHash[messageInfo.metadata.contenthash] = messageInfo.metadata.messageID;
}
notifyListeners();
}
void addUnindexed(MessageInfo messageInfo, String? contenthash) {
void addUnindexed(MessageInfo messageInfo) {
this.cache[messageInfo.metadata.messageID] = messageInfo;
if (contenthash != null && contenthash != "") {
this.cacheByHash[contenthash] = messageInfo.metadata.messageID;
if (messageInfo.metadata.contenthash != "") {
this.cacheByHash[messageInfo.metadata.contenthash] = messageInfo.metadata.messageID;
}
notifyListeners();
}

View File

@ -202,7 +202,7 @@ class ProfileInfoState extends ChangeNotifier {
}
void newMessage(
int identifier, int messageID, DateTime timestamp, String senderHandle, String senderImage, bool isAuto, String data, String? contenthash, bool selectedProfile, bool selectedConversation) {
int identifier, int messageID, DateTime timestamp, String senderHandle, String senderImage, bool isAuto, String data, String contenthash, bool selectedProfile, bool selectedConversation) {
if (!selectedProfile) {
unreadMessages++;
notifyListeners();

View File

@ -229,17 +229,15 @@ class _MessageViewState extends State<MessageView> {
ChatMessage cm = new ChatMessage(o: QuotedMessageOverlay, d: quotedMessage);
Provider.of<FlwtchState>(context, listen: false)
.cwtch
.SendMessage(Provider.of<ContactInfoState>(context, listen: false).profileOnion, Provider.of<ContactInfoState>(context, listen: false).identifier, jsonEncode(cm));
.SendMessage(Provider.of<ContactInfoState>(context, listen: false).profileOnion, Provider.of<ContactInfoState>(context, listen: false).identifier, jsonEncode(cm)).then(_sendMessageHandler);
} catch (e) {}
Provider.of<AppState>(context, listen: false).selectedIndex = null;
_sendMessageHelper();
});
} else {
ChatMessage cm = new ChatMessage(o: TextMessageOverlay, d: ctrlrCompose.value.text);
Provider.of<FlwtchState>(context, listen: false)
.cwtch
.SendMessage(Provider.of<ContactInfoState>(context, listen: false).profileOnion, Provider.of<ContactInfoState>(context, listen: false).identifier, jsonEncode(cm));
_sendMessageHelper();
.SendMessage(Provider.of<ContactInfoState>(context, listen: false).profileOnion, Provider.of<ContactInfoState>(context, listen: false).identifier, jsonEncode(cm)).then(_sendMessageHandler);
}
}
}
@ -247,29 +245,40 @@ class _MessageViewState extends State<MessageView> {
void _sendInvitation([String? ignoredParam]) {
Provider.of<FlwtchState>(context, listen: false)
.cwtch
.SendInvitation(Provider.of<ContactInfoState>(context, listen: false).profileOnion, Provider.of<ContactInfoState>(context, listen: false).identifier, this.selectedContact);
_sendMessageHelper();
.SendInvitation(Provider.of<ContactInfoState>(context, listen: false).profileOnion, Provider.of<ContactInfoState>(context, listen: false).identifier, this.selectedContact).then(_sendMessageHandler);
}
void _sendFile(String filePath) {
Provider.of<FlwtchState>(context, listen: false)
.cwtch
.ShareFile(Provider.of<ContactInfoState>(context, listen: false).profileOnion, Provider.of<ContactInfoState>(context, listen: false).identifier, filePath);
_sendMessageHelper();
.ShareFile(Provider.of<ContactInfoState>(context, listen: false).profileOnion, Provider.of<ContactInfoState>(context, listen: false).identifier, filePath).then(_sendMessageHandler);
}
void _sendMessageHelper() {
void _sendMessageHandler(dynamic messageJson) {
var cache = Provider.of<ContactInfoState>(context, listen: false).messageCache;
var profileOnion = Provider.of<ContactInfoState>(context, listen: false).profileOnion;
var identifier = Provider.of<ContactInfoState>(context, listen: false).identifier;
var profile = Provider.of<ProfileInfoState>(context, listen: false);
var messageInfo = messageJsonToInfo(profileOnion, identifier, messageJson);
if (messageInfo != null) {
profile.newMessage(
messageInfo.metadata.conversationIdentifier,
messageInfo.metadata.messageID,
messageInfo.metadata.timestamp,
messageInfo.metadata.senderHandle,
messageInfo.metadata.senderImage ?? "",
messageInfo.metadata.isAuto,
messageInfo.wrapper,
messageInfo.metadata.contenthash,
true,
true,
);
}
ctrlrCompose.clear();
focusNode.requestFocus();
Future.delayed(const Duration(milliseconds: 80), () {
var profile = Provider.of<ContactInfoState>(context, listen: false).profileOnion;
var identifier = Provider.of<ContactInfoState>(context, listen: false).identifier;
fetchAndCacheMessageInfo(context, profile, identifier, ByIndex(0));
Provider.of<ContactInfoState>(context, listen: false).newMarker++;
Provider.of<ContactInfoState>(context, listen: false).totalMessages += 1;
// Resort the contact list...
Provider.of<ProfileInfoState>(context, listen: false).contactList.updateLastMessageTime(Provider.of<ContactInfoState>(context, listen: false).identifier, DateTime.now());
});
}
Widget _buildComposeBox() {

View File

@ -223,10 +223,13 @@ class MessageRowState extends State<MessageRow> with SingleTickerProviderStateMi
mainAxisAlignment: MainAxisAlignment.center,
children: widgetRow,
)))));
// TODO calculate newMark ID in CIS so we dont have to get here
var mark = Provider.of<ContactInfoState>(context).newMarker;
//var mi = await Provider.of<ContactInfoState>(context).messageCache.getByIndex(mark - 1);
var markMatch = false; //mi?.metadata.messageID == Provider.of<MessageMetadata>(context).messageID;
if (mark > 0 &&
Provider.of<ContactInfoState>(context).messageCache.indexedLength > mark &&
Provider.of<ContactInfoState>(context).messageCache.getByIndex(mark - 1)?.metadata.messageID == Provider.of<MessageMetadata>(context).messageID) {
Provider.of<ContactInfoState>(context).messageCache.indexedLength > mark && markMatch)
{
return Column(crossAxisAlignment: fromMe ? CrossAxisAlignment.end : CrossAxisAlignment.start, children: [Align(alignment: Alignment.center, child: _bubbleNew()), mr]);
} else {
return mr;