@@ -24,11 +24,13 @@ private struct FileMessage {
2424 var files : [ FileUpload ]
2525 var message : String ?
2626 var txId : String ?
27+ var adamantMessage : AdamantMessage ?
2728}
2829
2930@MainActor
3031final class ChatFileService : ChatFileProtocol , Sendable {
3132 typealias UploadResult = ( decodedData: Data , encodedData: Data , nonce: String , cid: String )
33+ typealias UploadFileResult = ( file: UploadResult , preview: UploadResult ? )
3234
3335 // MARK: Dependencies
3436
@@ -47,6 +49,7 @@ final class ChatFileService: ChatFileProtocol, Sendable {
4749 private var fileDownloadAttemptsCount : [ String : Int ] = [ : ]
4850 private var uploadingFilesDictionary : [ String : FileMessage ] = [ : ]
4951 private var previewDownloadsAttemps : [ String : Int ] = [ : ]
52+ private var uploadTasks : [ String : Task < UploadFileResult , Error > ] = [ : ]
5053 private let synchronizer = AsyncStreamSender < @MainActor ( ) -> Void > ( )
5154 private let _updateFileFields = ObservableSender < FileUpdateProperties > ( )
5255
@@ -199,6 +202,19 @@ final class ChatFileService: ChatFileProtocol, Sendable {
199202 return decodedData
200203 }
201204
205+ func cancelUpload( messageId: String , fileId: String ) async {
206+ if let task = uploadTasks [ fileId] {
207+ task. cancel ( )
208+ uploadTasks [ fileId] = nil
209+ uploadingFiles. removeAll { $0 == fileId }
210+ } else {
211+ await removeFromRichFile (
212+ oldId: fileId,
213+ txId: messageId
214+ )
215+ }
216+ }
217+
202218 func isDownloadPreviewLimitReached( for fileId: String ) -> Bool {
203219 let count = previewDownloadsAttemps [ fileId] ?? . zero
204220 guard count < maxDownloadPreivewAttemptsCount else { return true }
@@ -725,6 +741,7 @@ private extension ChatFileService {
725741 replyMessage: replyMessage,
726742 storageProtocol: storageProtocol
727743 )
744+ fileMessage. adamantMessage = messageLocally
728745
729746 cachePreviewFiles ( files)
730747
@@ -743,27 +760,25 @@ private extension ChatFileService {
743760
744761 do {
745762 try await processFilesUpload (
746- fileMessage: & fileMessage,
763+ fileMessage: fileMessage,
747764 chatroom: chatroom,
748765 keyPair: keyPair,
749766 storageProtocol: storageProtocol,
750767 ownerId: ownerId,
751768 partnerAddress: partnerAddress,
752769 saveEncrypted: saveEncrypted,
753- txId: txId,
754- richFiles: & richFiles,
755- messageLocally: messageLocally
770+ txId: txId
756771 )
757772
758- let message = createAdamantMessage (
759- with : richFiles ,
760- text : text ,
761- replyMessage : replyMessage ,
762- storageProtocol : storageProtocol
763- )
773+ guard let fileMessage = uploadingFilesDictionary [ txId ] ,
774+ let adamantMessage = fileMessage . adamantMessage ,
775+ !fileMessage . files . isEmpty
776+ else {
777+ return await chatsProvider . removeMessage ( with : txId )
778+ }
764779
765780 _ = try await chatsProvider. sendFileMessage (
766- message ,
781+ adamantMessage ,
767782 recipientId: partnerAddress,
768783 transactionLocalyId: txId,
769784 from: chatroom
@@ -886,23 +901,14 @@ private extension ChatFileService {
886901 )
887902 }
888903
889- func processFilesUpload (
890- fileMessage : inout FileMessage ,
904+ func createFileUploadTask (
905+ file : FileResult ,
891906 chatroom: Chatroom ? ,
892907 keyPair: Keypair ,
893908 storageProtocol: NetworkFileProtocolType ,
894- ownerId: String ,
895- partnerAddress: String ,
896- saveEncrypted: Bool ,
897- txId: String ,
898- richFiles: inout [ RichMessageFile . File ] ,
899- messageLocally: AdamantMessage
900- ) async throws {
901- let files = fileMessage. files
902-
903- for i in files. indices where !files[ i] . isUploaded {
904- let file = files [ i] . file
905-
909+ saveEncrypted: Bool
910+ ) -> Task < UploadFileResult , Error > {
911+ return Task {
906912 let uploadProgress : @Sendable ( Int ) -> Void = { [ synchronizer, file] value in
907913 synchronizer. send { [ weak self] in
908914 self ? . sendProgress (
@@ -916,34 +922,76 @@ private extension ChatFileService {
916922 file: file,
917923 recipientPublicKey: chatroom? . partner? . publicKey ?? . empty,
918924 senderPrivateKey: keyPair. privateKey,
919- storageProtocol: storageProtocol,
925+ storageProtocol: storageProtocol,
920926 progress: uploadProgress
921927 )
922928
923- sendProgress (
924- for: result. file. cid,
925- progress: 100
926- )
929+ return result
930+ }
931+ }
932+
933+ func processFilesUpload(
934+ fileMessage: FileMessage ,
935+ chatroom: Chatroom ? ,
936+ keyPair: Keypair ,
937+ storageProtocol: NetworkFileProtocolType ,
938+ ownerId: String ,
939+ partnerAddress: String ,
940+ saveEncrypted: Bool ,
941+ txId: String
942+ ) async throws {
943+ let files = fileMessage. files
944+
945+ for i in files. indices where !files[ i] . isUploaded {
946+ let file = files [ i] . file
947+
948+ // We possible already cancelled uploading this file, but effectively we didn't start uploading it
949+ guard uploadingFiles. contains ( file. url. absoluteString) else { continue }
927950
928- try cacheUploadedFile (
929- fileResult: result. file,
930- previewResult: result. preview,
951+ let uploadTask = createFileUploadTask (
931952 file: file,
932- ownerId: ownerId,
933- partnerAddress: partnerAddress,
953+ chatroom: chatroom,
954+ keyPair: keyPair,
955+ storageProtocol: storageProtocol,
934956 saveEncrypted: saveEncrypted
935957 )
936958
937- await updateRichFile (
938- oldId: file. url. absoluteString,
939- fileResult: result. file,
940- previewResult: result. preview,
941- fileMessage: & fileMessage,
942- richFiles: & richFiles,
943- file: file,
944- txId: txId,
945- messageLocally: messageLocally
946- )
959+ uploadTasks [ file. url. absoluteString] = uploadTask
960+
961+ defer {
962+ uploadTasks [ file. url. absoluteString] = nil
963+ }
964+
965+ do {
966+ let result = try await uploadTask. value
967+
968+ sendProgress (
969+ for: result. file. cid,
970+ progress: 100
971+ )
972+
973+ try cacheUploadedFile (
974+ fileResult: result. file,
975+ previewResult: result. preview,
976+ file: file,
977+ ownerId: ownerId,
978+ partnerAddress: partnerAddress,
979+ saveEncrypted: saveEncrypted
980+ )
981+
982+ await updateRichFile (
983+ oldId: file. url. absoluteString,
984+ fileResult: result. file,
985+ previewResult: result. preview,
986+ file: file,
987+ txId: txId
988+ )
989+ } catch is CancellationError {
990+ await removeFromRichFile (
991+ oldId: file. url. absoluteString,
992+ txId: txId
993+ )
994+ }
947995 }
948996 }
949997
@@ -989,11 +1037,8 @@ private extension ChatFileService {
9891037 oldId: String ,
9901038 fileResult: UploadResult ,
9911039 previewResult: UploadResult ? ,
992- fileMessage: inout FileMessage ,
993- richFiles: inout [ RichMessageFile . File ] ,
9941040 file: FileResult ,
995- txId: String ,
996- messageLocally: AdamantMessage
1041+ txId: String
9971042 ) async {
9981043 let cached = filesStorage. isCachedLocally ( fileResult. cid)
9991044 uploadingFiles. removeAll { $0 == oldId }
@@ -1019,6 +1064,11 @@ private extension ChatFileService {
10191064 )
10201065 }
10211066
1067+ guard var ( fileMessage, richMessage) = uploadingFilesDictionary [ richMessageId: txId]
1068+ else { return }
1069+
1070+ var richFiles = richMessage. files
1071+
10221072 if let index = richFiles. firstIndex ( where: { $0. id == oldId } ) {
10231073 richFiles [ index] . id = fileResult. cid
10241074 richFiles [ index] . nonce = fileResult. nonce
@@ -1035,10 +1085,6 @@ private extension ChatFileService {
10351085 uploadingFilesDictionary [ txId] = fileMessage
10361086 }
10371087
1038- guard case let . richMessage( payload) = messageLocally,
1039- var richMessage = payload as? RichMessageFile
1040- else { return }
1041-
10421088 richMessage. files = richFiles
10431089
10441090 try ? await chatsProvider. updateTxMessageContent (
@@ -1047,6 +1093,34 @@ private extension ChatFileService {
10471093 )
10481094 }
10491095
1096+ func removeFromRichFile(
1097+ oldId: String ,
1098+ txId: String
1099+ ) async {
1100+ uploadingFiles. removeAll { $0 == oldId }
1101+
1102+ guard var ( fileMessage, richMessage) = uploadingFilesDictionary [ richMessageId: txId]
1103+ else { return }
1104+
1105+ richMessage. files = richMessage. files. filter { $0. id != oldId }
1106+ fileMessage. adamantMessage = . richMessage( payload: richMessage)
1107+
1108+ let updatedFiles = fileMessage. files. filter { $0. file. url. absoluteString != oldId }
1109+
1110+ fileMessage. files = updatedFiles
1111+ uploadingFilesDictionary [ txId] = fileMessage
1112+
1113+ if !updatedFiles. isEmpty {
1114+ // skip double update which causes bugs
1115+ // first update: here
1116+ // second update: if fileMessages is empty in sendFile method
1117+ try ? await chatsProvider. updateTxMessageContent (
1118+ txId: txId,
1119+ richMessage: richMessage
1120+ )
1121+ }
1122+ }
1123+
10501124 func handleUploadError(
10511125 for richFiles: [ RichMessageFile . File ] ,
10521126 txId: String
@@ -1136,12 +1210,35 @@ private extension ChatFileService {
11361210 throw FileManagerError . cantEncryptFile
11371211 }
11381212
1139- let cid = try await filesNetworkManager. uploadFiles (
1213+ try Task . checkCancellation ( )
1214+
1215+ let result = await filesNetworkManager. uploadFiles (
11401216 encodedData,
11411217 type: storageProtocol,
11421218 uploadProgress: uploadProgress
1143- ) . get ( )
1219+ )
11441220
1145- return ( data, encodedData, nonce, cid)
1221+ switch result {
1222+ case let . success( cid) :
1223+ return ( data, encodedData, nonce, cid)
1224+ case let . failure( error) :
1225+ if case . apiError( let apiError) = error,
1226+ apiError == . requestCancelled {
1227+ try Task . checkCancellation ( )
1228+ }
1229+
1230+ throw error
1231+ }
1232+ }
1233+ }
1234+
1235+ private extension Dictionary where Key == String , Value == FileMessage {
1236+ subscript( richMessageId txId: String ) -> ( FileMessage , RichMessageFile ) ? {
1237+ guard let fileMessage = self [ txId] ,
1238+ case let . richMessage( payload) = fileMessage. adamantMessage,
1239+ let richMessage = payload as? RichMessageFile
1240+ else { return nil }
1241+
1242+ return ( fileMessage, richMessage)
11461243 }
11471244}
0 commit comments