Fixes 4344 - Duplicate Uploads (#4442)

* Fixes 4344
- Update the retention policy of the Work Manager to ExistingWorkPolicy.APPEND_OR_REPLACE- which would append the new work to the end of existing one. This helps remove the while loop in UploadWorker which was meant to handle the cases where a new worker would be created for retries. The while loop seemed to have race conditions uploading duplicate entries.

* Update states to IN_PROGRESS before uploads are processed
This commit is contained in:
Ashish 2021-06-14 17:27:15 +05:30 committed by GitHub
parent 09cacde670
commit 10ed6678b3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 59 additions and 56 deletions

View file

@ -318,7 +318,7 @@ public class MainActivity extends BaseActivity
} else { } else {
WorkManager.getInstance(getApplicationContext()).enqueueUniqueWork( WorkManager.getInstance(getApplicationContext()).enqueueUniqueWork(
UploadWorker.class.getSimpleName(), UploadWorker.class.getSimpleName(),
ExistingWorkPolicy.KEEP, OneTimeWorkRequest.from(UploadWorker.class)); ExistingWorkPolicy.APPEND_OR_REPLACE, OneTimeWorkRequest.from(UploadWorker.class));
viewUtilWrapper viewUtilWrapper
.showShortToast(getBaseContext(), getString(R.string.limited_connection_disabled)); .showShortToast(getBaseContext(), getString(R.string.limited_connection_disabled));

View file

@ -289,7 +289,7 @@ public class UploadActivity extends BaseActivity implements UploadContract.View,
public void makeUploadRequest() { public void makeUploadRequest() {
WorkManager.getInstance(getApplicationContext()).enqueueUniqueWork( WorkManager.getInstance(getApplicationContext()).enqueueUniqueWork(
UploadWorker.class.getSimpleName(), UploadWorker.class.getSimpleName(),
ExistingWorkPolicy.KEEP, OneTimeWorkRequest.from(UploadWorker.class)); ExistingWorkPolicy.APPEND_OR_REPLACE, OneTimeWorkRequest.from(UploadWorker.class));
} }
@Override @Override

View file

@ -7,6 +7,7 @@ import androidx.core.app.NotificationCompat
import androidx.core.app.NotificationManagerCompat import androidx.core.app.NotificationManagerCompat
import androidx.work.CoroutineWorker import androidx.work.CoroutineWorker
import androidx.work.WorkerParameters import androidx.work.WorkerParameters
import com.google.gson.Gson
import com.mapbox.mapboxsdk.plugins.localization.BuildConfig import com.mapbox.mapboxsdk.plugins.localization.BuildConfig
import dagger.android.ContributesAndroidInjector import dagger.android.ContributesAndroidInjector
import fr.free.nrw.commons.CommonsApplication import fr.free.nrw.commons.CommonsApplication
@ -146,15 +147,12 @@ class UploadWorker(var appContext: Context, workerParams: WorkerParameters) :
CommonsApplication.NOTIFICATION_CHANNEL_ID_ALL CommonsApplication.NOTIFICATION_CHANNEL_ID_ALL
)!! )!!
withContext(Dispatchers.IO) { withContext(Dispatchers.IO) {
//Doing this so that retry requests do not create new work requests and while a work is
// already running, all the requests should go through this, so kind of a queue
while (contributionDao.getContribution(statesToProcess)
.blockingGet().isNotEmpty()
) {
val queuedContributions = contributionDao.getContribution(statesToProcess) val queuedContributions = contributionDao.getContribution(statesToProcess)
.blockingGet() .blockingGet()
//Showing initial notification for the number of uploads being processed //Showing initial notification for the number of uploads being processed
Timber.e("Queued Contributions: "+ queuedContributions.size)
processingUploads.setContentTitle(appContext.getString(R.string.starting_uploads)) processingUploads.setContentTitle(appContext.getString(R.string.starting_uploads))
processingUploads.setContentText( processingUploads.setContentText(
appContext.resources.getQuantityString( appContext.resources.getQuantityString(
@ -169,6 +167,15 @@ class UploadWorker(var appContext: Context, workerParams: WorkerParameters) :
processingUploads.build() processingUploads.build()
) )
/**
* To avoid race condition when multiple of these workers are working, assign this state
so that the next one does not process these contribution again
*/
queuedContributions.forEach {
it.state=Contribution.STATE_IN_PROGRESS
contributionDao.saveSynchronous(it)
}
queuedContributions.asFlow().map { contribution -> queuedContributions.asFlow().map { contribution ->
/** /**
* If the limited connection mode is on, lets iterate through the queued * If the limited connection mode is on, lets iterate through the queued
@ -179,12 +186,12 @@ class UploadWorker(var appContext: Context, workerParams: WorkerParameters) :
if (isLimitedConnectionModeEnabled()) { if (isLimitedConnectionModeEnabled()) {
if (contribution.state == Contribution.STATE_QUEUED) { if (contribution.state == Contribution.STATE_QUEUED) {
contribution.state = Contribution.STATE_QUEUED_LIMITED_CONNECTION_MODE contribution.state = Contribution.STATE_QUEUED_LIMITED_CONNECTION_MODE
contributionDao.save(contribution) contributionDao.saveSynchronous(contribution)
} }
} else { } else {
contribution.transferred = 0 contribution.transferred = 0
contribution.state = Contribution.STATE_IN_PROGRESS contribution.state = Contribution.STATE_IN_PROGRESS
contributionDao.save(contribution) contributionDao.saveSynchronous(contribution)
uploadContribution(contribution = contribution) uploadContribution(contribution = contribution)
} }
}.collect() }.collect()
@ -194,13 +201,6 @@ class UploadWorker(var appContext: Context, workerParams: WorkerParameters) :
PROCESSING_UPLOADS_NOTIFICATION_TAG, PROCESSING_UPLOADS_NOTIFICATION_TAG,
PROCESSING_UPLOADS_NOTIFICATION_ID PROCESSING_UPLOADS_NOTIFICATION_ID
) )
//No need to keep looking if the limited connection mode is on,
//If the user toggles it, the work manager will be started again
if(isLimitedConnectionModeEnabled()){
break;
}
}
} }
//TODO make this smart, think of handling retries in the future //TODO make this smart, think of handling retries in the future
return Result.success() return Result.success()
@ -307,6 +307,7 @@ class UploadWorker(var appContext: Context, workerParams: WorkerParameters) :
Timber.e(exception) Timber.e(exception)
Timber.e("Upload from stash failed for contribution : $filename") Timber.e("Upload from stash failed for contribution : $filename")
showFailedNotification(contribution) showFailedNotification(contribution)
contribution.state=Contribution.STATE_FAILED
if (STASH_ERROR_CODES.contains(exception.message)) { if (STASH_ERROR_CODES.contains(exception.message)) {
clearChunks(contribution) clearChunks(contribution)
} }
@ -315,26 +316,28 @@ class UploadWorker(var appContext: Context, workerParams: WorkerParameters) :
StashUploadState.PAUSED -> { StashUploadState.PAUSED -> {
showPausedNotification(contribution) showPausedNotification(contribution)
contribution.state = Contribution.STATE_PAUSED contribution.state = Contribution.STATE_PAUSED
contributionDao.save(contribution).blockingGet() contributionDao.saveSynchronous(contribution)
} }
else -> { else -> {
Timber.e("""upload file to stash failed with status: ${stashUploadResult.state}""") Timber.e("""upload file to stash failed with status: ${stashUploadResult.state}""")
showFailedNotification(contribution) showFailedNotification(contribution)
contribution.state = Contribution.STATE_FAILED contribution.state = Contribution.STATE_FAILED
contribution.chunkInfo = null contribution.chunkInfo = null
contributionDao.save(contribution).blockingAwait() contributionDao.saveSynchronous(contribution)
} }
} }
}catch (exception: Exception){ }catch (exception: Exception){
Timber.e(exception) Timber.e(exception)
Timber.e("Stash upload failed for contribution: $filename") Timber.e("Stash upload failed for contribution: $filename")
showFailedNotification(contribution) showFailedNotification(contribution)
contribution.state=Contribution.STATE_FAILED
clearChunks(contribution)
} }
} }
private fun clearChunks(contribution: Contribution) { private fun clearChunks(contribution: Contribution) {
contribution.chunkInfo=null contribution.chunkInfo=null
contributionDao.save(contribution).blockingAwait() contributionDao.saveSynchronous(contribution)
} }
/** /**