Implemented flag for workers

This commit is contained in:
Kanahia 2024-07-22 15:54:42 +05:30
parent 5b57c25110
commit 7255c994a6
13 changed files with 153 additions and 204 deletions

View file

@ -142,11 +142,6 @@ public class CommonsApplication extends MultiDexApplication {
@Inject @Inject
ContributionDao contributionDao; ContributionDao contributionDao;
/**
* In-memory list of contributions whose uploads have been paused by the user
*/
public static Map<String, Boolean> pauseUploads = new HashMap<>();
/** /**
* Used to declare and initialize various components and dependencies * Used to declare and initialize various components and dependencies
*/ */

View file

@ -101,7 +101,6 @@ data class Contribution constructor(
const val STATE_QUEUED = 2 const val STATE_QUEUED = 2
const val STATE_IN_PROGRESS = 3 const val STATE_IN_PROGRESS = 3
const val STATE_PAUSED = 4 const val STATE_PAUSED = 4
const val STATE_QUEUED_LIMITED_CONNECTION_MODE=5
/** /**
* Formatting captions to the Wikibase format for sending labels * Formatting captions to the Wikibase format for sending labels
@ -129,18 +128,6 @@ data class Contribution constructor(
return chunkInfo != null && chunkInfo!!.totalChunks == chunkInfo!!.indexOfNextChunkToUpload return chunkInfo != null && chunkInfo!!.totalChunks == chunkInfo!!.indexOfNextChunkToUpload
} }
fun isPaused(): Boolean {
return CommonsApplication.pauseUploads[pageId] ?: false
}
fun unpause() {
CommonsApplication.pauseUploads[pageId] = false
}
fun dateModifiedInMillis(): Long {
return dateModified!!.time
}
fun dateUploadStartedInMillis(): Long { fun dateUploadStartedInMillis(): Long {
return dateUploadStarted!!.time return dateUploadStarted!!.time
} }

View file

@ -13,6 +13,7 @@ import io.reactivex.Completable;
import io.reactivex.Single; import io.reactivex.Single;
import java.util.Calendar; import java.util.Calendar;
import java.util.List; import java.util.List;
import timber.log.Timber;
@Dao @Dao
public abstract class ContributionDao { public abstract class ContributionDao {
@ -86,6 +87,9 @@ public abstract class ContributionDao {
@Update @Update
public abstract void updateSynchronous(Contribution contribution); public abstract void updateSynchronous(Contribution contribution);
@Query("UPDATE contribution SET state = :newState WHERE state IN (:states)")
public abstract void updateContributionsState(List<Integer> states, int newState);
public Completable update(final Contribution contribution) { public Completable update(final Contribution contribution) {
return Completable return Completable
.fromAction(() -> { .fromAction(() -> {
@ -93,4 +97,11 @@ public abstract class ContributionDao {
updateSynchronous(contribution); updateSynchronous(contribution);
}); });
} }
public Completable updateContributionsWithStates(List<Integer> states, int newState) {
return Completable
.fromAction(() -> {
updateContributionsState(states, newState);
});
}
} }

View file

@ -809,8 +809,7 @@ public class ContributionsFragment
*/ */
public void retryUpload(Contribution contribution) { public void retryUpload(Contribution contribution) {
if (NetworkUtils.isInternetConnectionEstablished(getContext())) { if (NetworkUtils.isInternetConnectionEstablished(getContext())) {
if (contribution.getState() == STATE_PAUSED if (contribution.getState() == STATE_PAUSED) {
|| contribution.getState() == Contribution.STATE_QUEUED_LIMITED_CONNECTION_MODE) {
restartUpload(contribution); restartUpload(contribution);
} else if (contribution.getState() == STATE_FAILED) { } else if (contribution.getState() == STATE_FAILED) {
int retries = contribution.getRetries(); int retries = contribution.getRetries();

View file

@ -105,4 +105,8 @@ class ContributionsLocalDataSource {
public Completable updateContribution(final Contribution contribution) { public Completable updateContribution(final Contribution contribution) {
return contributionDao.update(contribution); return contributionDao.update(contribution);
} }
public Completable updateContributionsWithStates(List<Integer> states, int newState) {
return contributionDao.updateContributionsWithStates(states, newState);
}
} }

View file

@ -76,4 +76,8 @@ public class ContributionsRepository {
public Completable updateContribution(Contribution contribution) { public Completable updateContribution(Contribution contribution) {
return localDataSource.updateContribution(contribution); return localDataSource.updateContribution(contribution);
} }
public Completable updateContributionWithStates(List<Integer> states, int newState) {
return localDataSource.updateContributionsWithStates(states, newState);
}
} }

View file

@ -197,8 +197,7 @@ class FailedUploadsFragment : CommonsDaggerSupportFragment(), PendingUploadsCont
ViewUtil.showShortToast(context, R.string.cancelling_upload) ViewUtil.showShortToast(context, R.string.cancelling_upload)
uploadProgressActivity.hidePendingIcons() uploadProgressActivity.hidePendingIcons()
pendingUploadsPresenter.deleteUploads( pendingUploadsPresenter.deleteUploads(
listOf(Contribution.STATE_FAILED), listOf(Contribution.STATE_FAILED)
this.requireContext().applicationContext
) )
}, },
{} {}

View file

@ -42,14 +42,6 @@ class PendingUploadsFragment : CommonsDaggerSupportFragment(), PendingUploadsCon
@Inject @Inject
lateinit var pendingUploadsPresenter: PendingUploadsPresenter lateinit var pendingUploadsPresenter: PendingUploadsPresenter
@Inject
lateinit var mediaClient: MediaClient
@Inject
lateinit var sessionManager: SessionManager
private var userName: String? = null
private lateinit var binding: FragmentPendingUploadsBinding private lateinit var binding: FragmentPendingUploadsBinding
private lateinit var uploadProgressActivity: UploadProgressActivity private lateinit var uploadProgressActivity: UploadProgressActivity
@ -65,16 +57,6 @@ class PendingUploadsFragment : CommonsDaggerSupportFragment(), PendingUploadsCon
param1 = it.getString(ARG_PARAM1) param1 = it.getString(ARG_PARAM1)
param2 = it.getString(ARG_PARAM2) param2 = it.getString(ARG_PARAM2)
} }
//Now that we are allowing this fragment to be started for
// any userName- we expect it to be passed as an argument
if (arguments != null) {
userName = requireArguments().getString(ProfileActivity.KEY_USERNAME)
}
if (StringUtils.isEmpty(userName)) {
userName = sessionManager!!.getUserName()
}
} }
override fun onAttach(context: Context) { override fun onAttach(context: Context) {
@ -193,39 +175,39 @@ class PendingUploadsFragment : CommonsDaggerSupportFragment(), PendingUploadsCon
} }
fun pauseUploads() { fun pauseUploads() {
if (contributionsList != null) { pendingUploadsPresenter.pauseUploads(
pendingUploadsPresenter.pauseUploads( listOf(Contribution.STATE_QUEUED, Contribution.STATE_IN_PROGRESS),
contributionsList, Contribution.STATE_PAUSED
0, )
this.requireContext().applicationContext
)
}
} }
fun deleteUploads() { fun deleteUploads() {
if (contributionsList != null) { showAlertDialog(
showAlertDialog( requireActivity(),
requireActivity(), String.format(
String.format( Locale.getDefault(),
Locale.getDefault(), getString(R.string.cancelling_all_the_uploads)
getString(R.string.cancelling_all_the_uploads) ),
), String.format(
String.format( Locale.getDefault(),
Locale.getDefault(), getString(R.string.are_you_sure_that_you_want_cancel_all_the_uploads)
getString(R.string.are_you_sure_that_you_want_cancel_all_the_uploads) ),
), String.format(Locale.getDefault(), getString(R.string.yes)),
String.format(Locale.getDefault(), getString(R.string.yes)), String.format(Locale.getDefault(), getString(R.string.no)),
String.format(Locale.getDefault(), getString(R.string.no)), {
{ ViewUtil.showShortToast(context, R.string.cancelling_upload)
ViewUtil.showShortToast(context, R.string.cancelling_upload) uploadProgressActivity.hidePendingIcons()
uploadProgressActivity.hidePendingIcons() pendingUploadsPresenter.deleteUploads(
pendingUploadsPresenter.deleteUploads( listOf(
listOf(Contribution.STATE_QUEUED, Contribution.STATE_IN_PROGRESS, Contribution.STATE_PAUSED), Contribution.STATE_QUEUED,
this.requireContext().applicationContext Contribution.STATE_IN_PROGRESS,
Contribution.STATE_PAUSED
) )
}, )
{} },
) {}
} )
} }
} }

View file

@ -101,49 +101,26 @@ public class PendingUploadsPresenter implements UserActionListener {
contributionBoundaryCallback.dispose(); contributionBoundaryCallback.dispose();
} }
/**
* Delete a failed contribution from the local db
*/
@Override @Override
public void deleteUpload(final Contribution contribution, Context context) { public void deleteUpload(final Contribution contribution, Context context) {
compositeDisposable.add(repository compositeDisposable.add(repository
.deleteContributionFromDB(contribution) .deleteContributionFromDB(contribution)
.subscribeOn(ioThreadScheduler) .subscribeOn(ioThreadScheduler)
.subscribe(() -> .subscribe());
WorkRequestHelper.Companion.makeOneTimeWorkRequest(
context, ExistingWorkPolicy.KEEP)
));
} }
public void pauseUploads(List<Contribution> contributionList, int index, Context context) { public void pauseUploads(List<Integer> states, int newState) {
if (index >= contributionList.size()) {
return;
}
Contribution it = contributionList.get(index);
CommonsApplication.pauseUploads.put(it.getPageId().toString(), true);
//Retain the paused state in DB
it.setState(Contribution.STATE_PAUSED);
compositeDisposable.add(repository compositeDisposable.add(repository
.save(it) .updateContributionWithStates(states, newState)
.subscribeOn(ioThreadScheduler) .subscribeOn(ioThreadScheduler)
.doOnComplete(() -> { .subscribe());
pauseUploads(contributionList, index + 1, context);
}
)
.subscribe(() ->
WorkRequestHelper.Companion.makeOneTimeWorkRequest(
context, ExistingWorkPolicy.KEEP)
));
} }
public void deleteUploads(List<Integer> states, Context context) { public void deleteUploads(List<Integer> states) {
compositeDisposable.add(repository compositeDisposable.add(repository
.deleteContributionsFromDBWithStates(states) .deleteContributionsFromDBWithStates(states)
.subscribeOn(ioThreadScheduler) .subscribeOn(ioThreadScheduler)
.subscribe(() -> .subscribe());
WorkRequestHelper.Companion.makeOneTimeWorkRequest(
context, ExistingWorkPolicy.KEEP)
));
} }
public void restartUploads(List<Contribution> contributionList, int index, Context context) { public void restartUploads(List<Contribution> contributionList, int index, Context context) {
@ -163,7 +140,6 @@ public class PendingUploadsPresenter implements UserActionListener {
.save(it) .save(it)
.subscribeOn(ioThreadScheduler) .subscribeOn(ioThreadScheduler)
.doOnComplete(() -> { .doOnComplete(() -> {
CommonsApplication.pauseUploads.put(it.getPageId().toString(), false);
restartUploads(contributionList, index + 1, context); restartUploads(contributionList, index + 1, context);
} }
) )
@ -202,18 +178,4 @@ public class PendingUploadsPresenter implements UserActionListener {
)); ));
} }
/**
* Update the contribution's state in the databse, upon completion, trigger the workmanager to
* process this contribution
*
* @param contribution
*/
public void saveContribution(Contribution contribution, Context context) {
compositeDisposable.add(repository
.save(contribution)
.subscribeOn(ioThreadScheduler)
.subscribe(() -> WorkRequestHelper.Companion.makeOneTimeWorkRequest(
context, ExistingWorkPolicy.KEEP)));
}
} }

View file

@ -79,7 +79,7 @@ class UploadClient @Inject constructor(
val errorMessage = AtomicReference<String>() val errorMessage = AtomicReference<String>()
compositeDisposable.add( compositeDisposable.add(
Observable.fromIterable(fileChunks).forEach { chunkFile: File -> Observable.fromIterable(fileChunks).forEach { chunkFile: File ->
if (canProcess(contribution, failures)) { if (canProcess(contributionDao, contribution, failures)) {
if (contributionDao.getContribution(contribution.pageId) == null) { if (contributionDao.getContribution(contribution.pageId) == null) {
compositeDisposable.clear() compositeDisposable.clear()
return@forEach return@forEach
@ -106,8 +106,8 @@ class UploadClient @Inject constructor(
contributionDao.getContribution(contribution.pageId) == null -> { contributionDao.getContribution(contribution.pageId) == null -> {
return Observable.just(StashUploadResult(StashUploadState.CANCELLED, null, "Upload cancelled")) return Observable.just(StashUploadResult(StashUploadState.CANCELLED, null, "Upload cancelled"))
} }
contribution.isPaused() -> { contributionDao.getContribution(contribution.pageId).state == Contribution.STATE_PAUSED -> {
Timber.d("Upload stash paused %s", contribution.pageId) Timber.tag("PRINT").d("Upload stash paused %s", contribution.pageId)
Observable.just(StashUploadResult(StashUploadState.PAUSED, null, null)) Observable.just(StashUploadResult(StashUploadState.PAUSED, null, null))
} }
failures.get() -> { failures.get() -> {
@ -265,10 +265,16 @@ class UploadClient @Inject constructor(
} }
} }
private fun canProcess(contribution: Contribution, failures: AtomicBoolean): Boolean { private fun canProcess(
contributionDao: ContributionDao,
contribution: Contribution,
failures: AtomicBoolean
): Boolean {
// As long as the contribution hasn't been paused and there are no errors, // As long as the contribution hasn't been paused and there are no errors,
// we can process the current chunk. // we can process the current chunk.
return !(contribution.isPaused() || failures.get()) Timber.tag("PRINT").e("oyee" + contributionDao.getContribution(contribution.pageId).state)
return !(contributionDao.getContribution(contribution.pageId).state == Contribution.STATE_PAUSED
|| failures.get())
} }
private fun shouldSkip( private fun shouldSkip(

View file

@ -109,7 +109,6 @@ class UploadWorker(var appContext: Context, workerParams: WorkerParameters) :
getNotificationBuilder(CommonsApplication.NOTIFICATION_CHANNEL_ID_ALL)!! getNotificationBuilder(CommonsApplication.NOTIFICATION_CHANNEL_ID_ALL)!!
statesToProcess.add(Contribution.STATE_QUEUED) statesToProcess.add(Contribution.STATE_QUEUED)
statesToProcess.add(Contribution.STATE_QUEUED_LIMITED_CONNECTION_MODE)
} }
@dagger.Module @dagger.Module
@ -169,85 +168,81 @@ class UploadWorker(var appContext: Context, workerParams: WorkerParameters) :
} }
override suspend fun doWork(): Result { override suspend fun doWork(): Result {
var countUpload = 0 try {
// Start a foreground service var countUpload = 0
setForeground(createForegroundInfo()) // Start a foreground service
notificationManager = NotificationManagerCompat.from(appContext) setForeground(createForegroundInfo())
val processingUploads = getNotificationBuilder( notificationManager = NotificationManagerCompat.from(appContext)
CommonsApplication.NOTIFICATION_CHANNEL_ID_ALL val processingUploads = getNotificationBuilder(
)!! CommonsApplication.NOTIFICATION_CHANNEL_ID_ALL
withContext(Dispatchers.IO) { )!!
withContext(Dispatchers.IO) {
while (contributionDao.getContribution(statesToProcess)
.blockingGet().size > 0
) {
/*
queuedContributions receives the results from a one-shot query.
This means that once the list has been fetched from the database,
it does not get updated even if some changes (insertions, deletions, etc.)
are made to the contribution table afterwards.
//TODO: Implement Worker Flags Related issues (fixed):
/* https://github.com/commons-app/apps-android-commons/issues/5136
queuedContributions receives the results from a one-shot query. https://github.com/commons-app/apps-android-commons/issues/5346
This means that once the list has been fetched from the database,
it does not get updated even if some changes (insertions, deletions, etc.)
are made to the contribution table afterwards.
Related issues (fixed):
https://github.com/commons-app/apps-android-commons/issues/5136
https://github.com/commons-app/apps-android-commons/issues/5346
*/
while (contributionDao.getContribution(statesToProcess)
.blockingGet().size > 0
) {
val queuedContributions = contributionDao.getContribution(statesToProcess)
.blockingGet()
//Showing initial notification for the number of uploads being processed
processingUploads.setContentTitle(appContext.getString(R.string.starting_uploads))
processingUploads.setContentText(
appContext.resources.getQuantityString(
R.plurals.starting_multiple_uploads,
queuedContributions.size,
queuedContributions.size
)
)
notificationManager?.notify(
PROCESSING_UPLOADS_NOTIFICATION_TAG,
PROCESSING_UPLOADS_NOTIFICATION_ID,
processingUploads.build()
)
val sortedQueuedContributionsList: List<Contribution> =
queuedContributions.sortedBy { it.dateUploadStartedInMillis() }
/**
* To avoid race condition when multiple of these workers are working, assign this state
so that the next one does not process these contribution again
*/ */
// sortedQueuedContributionsList.forEach { val queuedContributions = contributionDao.getContribution(statesToProcess)
// it.state = Contribution.STATE_IN_PROGRESS .blockingGet()
// contributionDao.saveSynchronous(it) //Showing initial notification for the number of uploads being processed
// }
var contribution = sortedQueuedContributionsList.first() processingUploads.setContentTitle(appContext.getString(R.string.starting_uploads))
processingUploads.setContentText(
appContext.resources.getQuantityString(
R.plurals.starting_multiple_uploads,
queuedContributions.size,
queuedContributions.size
)
)
notificationManager?.notify(
PROCESSING_UPLOADS_NOTIFICATION_TAG,
PROCESSING_UPLOADS_NOTIFICATION_ID,
processingUploads.build()
)
if (contributionDao.getContribution(contribution.pageId) != null) { val sortedQueuedContributionsList: List<Contribution> =
contribution.transferred = 0 queuedContributions.sortedBy { it.dateUploadStartedInMillis() }
contribution.state = Contribution.STATE_IN_PROGRESS
contributionDao.saveSynchronous(contribution) var contribution = sortedQueuedContributionsList.first()
setProgressAsync(Data.Builder().putInt("progress", countUpload).build())
countUpload++ if (contributionDao.getContribution(contribution.pageId) != null) {
uploadContribution(contribution = contribution) contribution.transferred = 0
contribution.state = Contribution.STATE_IN_PROGRESS
contributionDao.saveSynchronous(contribution)
setProgressAsync(Data.Builder().putInt("progress", countUpload).build())
countUpload++
uploadContribution(contribution = contribution)
}
} }
//Dismiss the global notification
notificationManager?.cancel(
PROCESSING_UPLOADS_NOTIFICATION_TAG,
PROCESSING_UPLOADS_NOTIFICATION_ID
)
}
// Trigger WorkManager to process any new contributions that may have been added to the queue
val updatedContributionQueue = withContext(Dispatchers.IO) {
contributionDao.getContribution(statesToProcess).blockingGet()
}
if (updatedContributionQueue.isNotEmpty()) {
return Result.retry()
} }
//Dismiss the global notification
notificationManager?.cancel(
PROCESSING_UPLOADS_NOTIFICATION_TAG,
PROCESSING_UPLOADS_NOTIFICATION_ID
)
}
// Trigger WorkManager to process any new contributions that may have been added to the queue
val updatedContributionQueue = withContext(Dispatchers.IO) {
contributionDao.getContribution(statesToProcess).blockingGet()
}
if (updatedContributionQueue.isNotEmpty()) {
return Result.retry()
}
return Result.success() return Result.success()
} catch (e: Exception) {
Timber.e(e, "UploadWorker encountered an error.")
return Result.failure()
} finally {
WorkRequestHelper.markUploadWorkerAsStopped()
}
} }
/** /**

View file

@ -3,6 +3,7 @@ package fr.free.nrw.commons.upload.worker
import android.content.Context import android.content.Context
import androidx.work.* import androidx.work.*
import androidx.work.WorkRequest.Companion.MIN_BACKOFF_MILLIS import androidx.work.WorkRequest.Companion.MIN_BACKOFF_MILLIS
import timber.log.Timber
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
/** /**
@ -11,7 +12,16 @@ import java.util.concurrent.TimeUnit
class WorkRequestHelper { class WorkRequestHelper {
companion object { companion object {
@Volatile
private var isUploadWorkerRunning = false
fun makeOneTimeWorkRequest(context: Context, existingWorkPolicy: ExistingWorkPolicy) { fun makeOneTimeWorkRequest(context: Context, existingWorkPolicy: ExistingWorkPolicy) {
if (isUploadWorkerRunning) {
Timber.e("UploadWorker is already running. Cannot start another instance.")
return
}
/* Set backoff criteria for the work request /* Set backoff criteria for the work request
The default backoff policy is EXPONENTIAL, but while testing we found that it The default backoff policy is EXPONENTIAL, but while testing we found that it
too long for the uploads to finish. So, set the backoff policy as LINEAR with the too long for the uploads to finish. So, set the backoff policy as LINEAR with the
@ -35,7 +45,13 @@ class WorkRequestHelper {
WorkManager.getInstance(context).enqueueUniqueWork( WorkManager.getInstance(context).enqueueUniqueWork(
UploadWorker::class.java.simpleName, existingWorkPolicy, uploadRequest UploadWorker::class.java.simpleName, existingWorkPolicy, uploadRequest
) )
isUploadWorkerRunning = true
}
fun markUploadWorkerAsStopped() {
isUploadWorkerRunning = false
} }
} }
}
}

View file

@ -171,17 +171,6 @@ class ContributionViewHolderUnitTests {
contributionViewHolder.init(0, contribution) contributionViewHolder.init(0, contribution)
} }
@Test
@Throws(Exception::class)
fun testInitCaseNonNull_STATE_QUEUED_LIMITED_CONNECTION_MODE() {
Shadows.shadowOf(Looper.getMainLooper()).idle()
`when`(contribution.state).thenReturn(Contribution.STATE_QUEUED_LIMITED_CONNECTION_MODE)
`when`(contribution.media).thenReturn(media)
`when`(media.mostRelevantCaption).thenReturn("")
`when`(media.author).thenReturn("")
contributionViewHolder.init(0, contribution)
}
@Test @Test
@Throws(Exception::class) @Throws(Exception::class)
fun testInitCaseNonNull_STATE_IN_PROGRESS() { fun testInitCaseNonNull_STATE_IN_PROGRESS() {