Handle failures in chunk uploads (#3916)

* Handle failures in chunk uploads

* Fix failures

* Upload fixed

* Handle multiple file upload

* Increase request timeout
This commit is contained in:
Vivek Maskara 2020-10-24 23:56:48 -07:00 committed by GitHub
parent 0d5fa048a5
commit 6c55525a43
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 69 additions and 38 deletions

View file

@ -6,6 +6,7 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import okhttp3.Cache;
import okhttp3.Interceptor;
import okhttp3.OkHttpClient;
@ -36,6 +37,9 @@ public final class OkHttpConnectionFactory {
return new OkHttpClient.Builder()
.cookieJar(SharedPreferenceCookieManager.getInstance())
.cache(NET_CACHE)
.connectTimeout(60, TimeUnit.SECONDS)
.writeTimeout(60, TimeUnit.SECONDS)
.readTimeout(60, TimeUnit.SECONDS)
.addInterceptor(getLoggingInterceptor())
.addInterceptor(new UnsuccessfulResponseInterceptor())
.addInterceptor(new CommonHeaderRequestInterceptor())

View file

@ -6,20 +6,20 @@ import fr.free.nrw.commons.upload.UploadResult
data class ChunkInfo(
val uploadResult: UploadResult,
val lastChunkIndex: Int,
var isLastChunkUploaded: Boolean
val indexOfNextChunkToUpload: Int,
val totalChunks: Int
) : Parcelable {
constructor(parcel: Parcel) : this(
parcel.readParcelable(UploadResult::class.java.classLoader),
parcel.readInt(),
parcel.readByte() != 0.toByte()
parcel.readInt()
) {
}
override fun writeToParcel(parcel: Parcel, flags: Int) {
parcel.writeParcelable(uploadResult, flags)
parcel.writeInt(lastChunkIndex)
parcel.writeByte(if (isLastChunkUploaded) 1 else 0)
parcel.writeInt(indexOfNextChunkToUpload)
parcel.writeInt(totalChunks)
}
override fun describeContents(): Int {

View file

@ -10,7 +10,7 @@ import fr.free.nrw.commons.contributions.ContributionDao
* The database for accessing the respective DAOs
*
*/
@Database(entities = [Contribution::class], version = 5, exportSchema = false)
@Database(entities = [Contribution::class], version = 6, exportSchema = false)
@TypeConverters(Converters::class)
abstract class AppDatabase : RoomDatabase() {
abstract fun contributionDao(): ContributionDao

View file

@ -62,7 +62,8 @@ public class NetworkingModule {
public OkHttpClient provideOkHttpClient(Context context,
HttpLoggingInterceptor httpLoggingInterceptor) {
File dir = new File(context.getCacheDir(), "okHttpCache");
return new OkHttpClient.Builder().connectTimeout(60, TimeUnit.SECONDS)
return new OkHttpClient.Builder()
.connectTimeout(60, TimeUnit.SECONDS)
.writeTimeout(60, TimeUnit.SECONDS)
.addInterceptor(httpLoggingInterceptor)
.readTimeout(60, TimeUnit.SECONDS)

View file

@ -45,7 +45,7 @@ public class FileUtilsWrapper {
/**
* Takes a file as input and returns an Observable of files with the specified chunk size
*/
public Observable<File> getFileChunks(Context context, File file, final int chunkSize)
public List<File> getFileChunks(Context context, File file, final int chunkSize)
throws IOException {
final byte[] buffer = new byte[chunkSize];
@ -58,7 +58,7 @@ public class FileUtilsWrapper {
buffers.add(writeToFile(context, Arrays.copyOf(buffer, size), file.getName(),
getFileExt(file.getName())));
}
return Observable.fromIterable(buffers);
return buffers;
}
}

View file

@ -15,6 +15,10 @@ import io.reactivex.disposables.CompositeDisposable;
import java.io.File;
import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.inject.Inject;
@ -41,7 +45,8 @@ public class UploadClient {
private final PageContentsCreator pageContentsCreator;
private final FileUtilsWrapper fileUtilsWrapper;
private final Gson gson;
private boolean pauseUploads = false;
private Map<String, Boolean> pauseUploads;
private final CompositeDisposable compositeDisposable = new CompositeDisposable();
@ -55,6 +60,7 @@ public class UploadClient {
this.pageContentsCreator = pageContentsCreator;
this.fileUtilsWrapper = fileUtilsWrapper;
this.gson = gson;
this.pauseUploads = new HashMap<>();
}
/**
@ -64,32 +70,50 @@ public class UploadClient {
Observable<StashUploadResult> uploadFileToStash(
final Context context, final String filename, final Contribution contribution,
final NotificationUpdateProgressListener notificationUpdater) throws IOException {
if (contribution.getChunkInfo() != null && contribution.getChunkInfo().isLastChunkUploaded()) {
if (contribution.getChunkInfo() != null
&& contribution.getChunkInfo().getTotalChunks() == contribution.getChunkInfo()
.getIndexOfNextChunkToUpload()) {
return Observable.just(new StashUploadResult(StashUploadState.SUCCESS,
contribution.getChunkInfo().getUploadResult().getFilekey()));
}
pauseUploads = false;
File file = new File(contribution.getLocalUri().getPath());
final Observable<File> fileChunks = fileUtilsWrapper.getFileChunks(context, file, CHUNK_SIZE);
pauseUploads.put(contribution.getPageId(), false);
final File file = new File(contribution.getLocalUri().getPath());
final List<File> fileChunks = fileUtilsWrapper.getFileChunks(context, file, CHUNK_SIZE);
final int totalChunks = fileChunks.size();
final MediaType mediaType = MediaType
.parse(FileUtils.getMimeType(context, Uri.parse(file.getPath())));
final AtomicInteger index = new AtomicInteger();
final AtomicReference<ChunkInfo> chunkInfo = new AtomicReference<>();
Timber.d("Chunk info");
if (contribution.getChunkInfo() != null && isStashValid(contribution)) {
if (isStashValid(contribution)) {
chunkInfo.set(contribution.getChunkInfo());
Timber.d("Chunk: Next Chunk: %s, Total Chunks: %s",
contribution.getChunkInfo().getIndexOfNextChunkToUpload(),
contribution.getChunkInfo().getTotalChunks());
}
compositeDisposable.add(fileChunks.forEach(chunkFile -> {
if (pauseUploads) {
final AtomicInteger index = new AtomicInteger();
final AtomicBoolean failures = new AtomicBoolean();
compositeDisposable.add(Observable.fromIterable(fileChunks).forEach(chunkFile -> {
if (pauseUploads.get(contribution.getPageId()) || failures.get()) {
return;
}
if (chunkInfo.get() != null && index.get() < chunkInfo.get().getLastChunkIndex()) {
index.getAndIncrement();
if (chunkInfo.get() != null && index.get() < chunkInfo.get().getIndexOfNextChunkToUpload()) {
index.incrementAndGet();
Timber.d("Chunk: Increment and return: %s", index.get());
return;
}
index.getAndIncrement();
final int offset =
chunkInfo.get() != null ? chunkInfo.get().getUploadResult().getOffset() : 0;
Timber.d("Chunk: Sending Chunk number: %s, offset: %s", index.get(), offset);
final String filekey =
chunkInfo.get() != null ? chunkInfo.get().getUploadResult().getFilekey() : null;
@ -104,17 +128,20 @@ public class UploadClient {
offset,
filekey,
countingRequestBody).subscribe(uploadResult -> {
chunkInfo.set(new ChunkInfo(uploadResult, index.incrementAndGet(), false));
Timber.d("Chunk: Received Chunk number: %s, offset: %s", index.get(),
uploadResult.getOffset());
chunkInfo.set(
new ChunkInfo(uploadResult, index.get(), totalChunks));
notificationUpdater.onChunkUploaded(contribution, chunkInfo.get());
}, throwable -> {
Timber.e(throwable, "Error occurred in uploading chunk");
failures.set(true);
}));
}));
chunkInfo.get().setLastChunkUploaded(true);
notificationUpdater.onChunkUploaded(contribution, chunkInfo.get());
if (pauseUploads) {
if (pauseUploads.get(contribution.getPageId())) {
return Observable.just(new StashUploadResult(StashUploadState.PAUSED, null));
} else if (failures.get()) {
return Observable.just(new StashUploadResult(StashUploadState.FAILED, null));
} else if (chunkInfo.get() != null) {
return Observable.just(new StashUploadResult(StashUploadState.SUCCESS,
chunkInfo.get().getUploadResult().getFilekey()));
@ -129,8 +156,9 @@ public class UploadClient {
* @return
*/
private boolean isStashValid(Contribution contribution) {
return contribution.getDateModified()
.after(new Date(System.currentTimeMillis() - MAX_CHUNK_AGE));
return contribution.getChunkInfo() != null &&
contribution.getDateModified()
.after(new Date(System.currentTimeMillis() - MAX_CHUNK_AGE));
}
/**
@ -166,9 +194,10 @@ public class UploadClient {
/**
* Dispose the active disposable and sets the pause variable
* @param pageId
*/
public void pauseUpload() {
pauseUploads = true;
public void pauseUpload(String pageId) {
pauseUploads.put(pageId, true);
if (!compositeDisposable.isDisposed()) {
compositeDisposable.dispose();
}

View file

@ -148,7 +148,7 @@ public class UploadService extends CommonsDaggerService {
* @param contribution
*/
public void pauseUpload(Contribution contribution) {
uploadClient.pauseUpload();
uploadClient.pauseUpload(contribution.getPageId());
contribution.setState(Contribution.STATE_PAUSED);
compositeDisposable.add(contributionDao.update(contribution)
.subscribeOn(ioThreadScheduler)
@ -312,8 +312,6 @@ public class UploadService extends CommonsDaggerService {
.flatMap(uploadStash -> {
notificationManager.cancel(notificationTag, NOTIFICATION_UPLOAD_IN_PROGRESS);
Timber.d("Stash upload response 1 is %s", uploadStash.toString());
if (uploadStash.getState() == StashUploadState.SUCCESS) {
Timber.d("making sure of uniqueness of name: %s", filename);
String uniqueFilename = findUniqueFilename(filename);
@ -332,7 +330,6 @@ public class UploadService extends CommonsDaggerService {
}
});
} else if (uploadStash.getState() == StashUploadState.PAUSED) {
Timber.d("Contribution upload paused");
showPausedNotification(contribution);
return Observable.never();
} else {
@ -359,7 +356,6 @@ public class UploadService extends CommonsDaggerService {
private void onUpload(Contribution contribution, String notificationTag,
UploadResult uploadResult) {
Timber.d("Stash upload response 2 is %s", uploadResult.toString());
notificationManager.cancel(notificationTag, NOTIFICATION_UPLOAD_IN_PROGRESS);
@ -401,7 +397,7 @@ public class UploadService extends CommonsDaggerService {
@SuppressLint("StringFormatInvalid")
@SuppressWarnings("deprecation")
private void showFailedNotification(Contribution contribution) {
private void showFailedNotification(final Contribution contribution) {
final String displayTitle = contribution.getMedia().getDisplayTitle();
curNotification.setTicker(getString(R.string.upload_failed_notification_title, displayTitle))
.setContentTitle(getString(R.string.upload_failed_notification_title, displayTitle))
@ -412,6 +408,7 @@ public class UploadService extends CommonsDaggerService {
curNotification.build());
contribution.setState(Contribution.STATE_FAILED);
contribution.setChunkInfo(null);
compositeDisposable.add(contributionDao
.update(contribution)
@ -419,7 +416,7 @@ public class UploadService extends CommonsDaggerService {
.subscribe());
}
private void showPausedNotification(Contribution contribution) {
private void showPausedNotification(final Contribution contribution) {
final String displayTitle = contribution.getMedia().getDisplayTitle();
curNotification.setTicker(getString(R.string.upload_paused_notification_title, displayTitle))
.setContentTitle(getString(R.string.upload_paused_notification_title, displayTitle))