With chunked uploads (#3855)

This commit is contained in:
Vivek Maskara 2020-06-30 09:31:25 -07:00 committed by GitHub
parent f26784e9c3
commit 3361155fad
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 193 additions and 75 deletions

View file

@ -12,7 +12,12 @@ import java.io.IOException
*
* @author Ashish Kumar
*/
class CountingRequestBody(protected var delegate: RequestBody, protected var listener: Listener) : RequestBody() {
class CountingRequestBody(
protected var delegate: RequestBody,
protected var listener: Listener,
var offset: Long,
var totalContentLength: Long
) : RequestBody() {
protected var countingSink: CountingSink? = null
override fun contentType(): MediaType? {
return delegate.contentType()
@ -37,11 +42,12 @@ class CountingRequestBody(protected var delegate: RequestBody, protected var lis
protected inner class CountingSink(delegate: Sink?) : ForwardingSink(delegate!!) {
private var bytesWritten: Long = 0
@Throws(IOException::class)
override fun write(source: Buffer, byteCount: Long) {
super.write(source, byteCount)
bytesWritten += byteCount
listener.onRequestProgress(bytesWritten, contentLength())
listener.onRequestProgress(offset + bytesWritten, totalContentLength)
}
}

View file

@ -1,33 +1,84 @@
package fr.free.nrw.commons.upload;
import android.content.Context;
import io.reactivex.Observable;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import javax.inject.Inject;
import javax.inject.Singleton;
import timber.log.Timber;
@Singleton
public class FileUtilsWrapper {
@Inject
public FileUtilsWrapper() {
@Inject
public FileUtilsWrapper() {
}
}
public String getFileExt(String fileName) {
return FileUtils.getFileExt(fileName);
}
public String getFileExt(String fileName) {
return FileUtils.getFileExt(fileName);
}
public String getSHA1(InputStream is) {
return FileUtils.getSHA1(is);
}
public String getSHA1(InputStream is) {
return FileUtils.getSHA1(is);
}
public FileInputStream getFileInputStream(String filePath) throws FileNotFoundException {
return FileUtils.getFileInputStream(filePath);
}
public FileInputStream getFileInputStream(String filePath) throws FileNotFoundException {
return FileUtils.getFileInputStream(filePath);
}
public String getGeolocationOfFile(String filePath) {
return FileUtils.getGeolocationOfFile(filePath);
public String getGeolocationOfFile(String filePath) {
return FileUtils.getGeolocationOfFile(filePath);
}
/**
* Takes a file as input and returns an Observable of files with the specified chunk size
*/
public Observable<File> getFileChunks(Context context, File file, final int chunkSize)
throws IOException {
final byte[] buffer = new byte[chunkSize];
//try-with-resources to ensure closing stream
try (final FileInputStream fis = new FileInputStream(file);
final BufferedInputStream bis = new BufferedInputStream(fis)) {
final List<File> buffers = new ArrayList<>();
int size;
while ((size = bis.read(buffer)) > 0) {
buffers.add(writeToFile(context, Arrays.copyOf(buffer, size), file.getName(),
getFileExt(file.getName())));
}
return Observable.fromIterable(buffers);
}
}
/**
* Create a temp file containing the passed byte data.
*/
private File writeToFile(Context context, final byte[] data, final String fileName,
String fileExtension)
throws IOException {
final File file = File.createTempFile(fileName, fileExtension, context.getCacheDir());
try {
if (!file.exists()) {
file.createNewFile();
}
final FileOutputStream fos = new FileOutputStream(file);
fos.write(data);
fos.close();
} catch (final Exception throwable) {
Timber.e(throwable, "Failed to create file");
}
return file;
}
}

View file

@ -4,11 +4,15 @@ import static fr.free.nrw.commons.di.NetworkingModule.NAMED_COMMONS_CSRF;
import android.content.Context;
import android.net.Uri;
import androidx.annotation.Nullable;
import fr.free.nrw.commons.CommonsApplication;
import fr.free.nrw.commons.contributions.Contribution;
import fr.free.nrw.commons.upload.UploadService.NotificationUpdateProgressListener;
import io.reactivex.Observable;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
@ -16,59 +20,112 @@ import okhttp3.MediaType;
import okhttp3.MultipartBody;
import okhttp3.RequestBody;
import org.wikipedia.csrf.CsrfTokenClient;
import timber.log.Timber;
@Singleton
public class UploadClient {
private final UploadInterface uploadInterface;
private final CsrfTokenClient csrfTokenClient;
private final PageContentsCreator pageContentsCreator;
private final int CHUNK_SIZE = 256 * 1024; // 256 KB
@Inject
public UploadClient(UploadInterface uploadInterface,
@Named(NAMED_COMMONS_CSRF) CsrfTokenClient csrfTokenClient,
PageContentsCreator pageContentsCreator) {
this.uploadInterface = uploadInterface;
this.csrfTokenClient = csrfTokenClient;
this.pageContentsCreator = pageContentsCreator;
private final UploadInterface uploadInterface;
private final CsrfTokenClient csrfTokenClient;
private final PageContentsCreator pageContentsCreator;
private final FileUtilsWrapper fileUtilsWrapper;
@Inject
public UploadClient(final UploadInterface uploadInterface,
@Named(NAMED_COMMONS_CSRF) final CsrfTokenClient csrfTokenClient,
final PageContentsCreator pageContentsCreator,
final FileUtilsWrapper fileUtilsWrapper) {
this.uploadInterface = uploadInterface;
this.csrfTokenClient = csrfTokenClient;
this.pageContentsCreator = pageContentsCreator;
this.fileUtilsWrapper = fileUtilsWrapper;
}
/**
* Upload file to stash in chunks of specified size. Uploading files in chunks will make handling
* of large files easier. Also, it will be useful in supporting pause/resume of uploads
*/
Observable<UploadResult> uploadFileToStash(
final Context context, final String filename, final File file,
final NotificationUpdateProgressListener notificationUpdater) throws IOException {
final Observable<File> fileChunks = fileUtilsWrapper.getFileChunks(context, file, CHUNK_SIZE);
final MediaType mediaType = MediaType
.parse(FileUtils.getMimeType(context, Uri.parse(file.getPath())));
final long[] offset = {0};
final String[] fileKey = {null};
final AtomicReference<UploadResult> result = new AtomicReference<>();
fileChunks.blockingForEach(chunkFile -> {
final RequestBody requestBody = RequestBody
.create(mediaType, chunkFile);
final CountingRequestBody countingRequestBody = new CountingRequestBody(requestBody,
notificationUpdater::onProgress, offset[0], file.length());
uploadChunkToStash(filename,
file.length(),
offset[0],
fileKey[0],
countingRequestBody).blockingSubscribe(uploadResult -> {
result.set(uploadResult);
offset[0] = uploadResult.getOffset();
fileKey[0] = uploadResult.getFilekey();
});
});
return Observable.just(result.get());
}
/**
* Uploads a file chunk to stash
*
* @param filename The name of the file being uploaded
* @param fileSize The total size of the file
* @param offset The offset returned by the previous chunk upload
* @param fileKey The filekey returned by the previous chunk upload
* @param countingRequestBody Request body with chunk file
* @return
*/
Observable<UploadResult> uploadChunkToStash(final String filename,
final long fileSize,
final long offset,
final String fileKey,
final CountingRequestBody countingRequestBody) {
final MultipartBody.Part filePart = MultipartBody.Part
.createFormData("chunk", filename, countingRequestBody);
try {
return uploadInterface.uploadFileToStash(toRequestBody(filename),
toRequestBody(String.valueOf(fileSize)),
toRequestBody(String.valueOf(offset)),
toRequestBody(fileKey),
toRequestBody(csrfTokenClient.getTokenBlocking()),
filePart)
.map(UploadResponse::getUpload);
} catch (final Throwable throwable) {
Timber.e(throwable, "Failed to upload chunk to stash");
return Observable.error(throwable);
}
}
Observable<UploadResult> uploadFileToStash(Context context, String filename, File file,
NotificationUpdateProgressListener notificationUpdater) {
RequestBody requestBody = RequestBody
.create(MediaType.parse(FileUtils.getMimeType(context, Uri.parse(file.getPath()))), file);
@Nullable
private RequestBody toRequestBody(@Nullable final String value) {
return value == null ? null : RequestBody.create(okhttp3.MultipartBody.FORM, value);
}
CountingRequestBody countingRequestBody = new CountingRequestBody(requestBody,
(bytesWritten, contentLength) -> notificationUpdater
.onProgress(bytesWritten, contentLength));
MultipartBody.Part filePart = MultipartBody.Part.createFormData("file", filename, countingRequestBody);
RequestBody fileNameRequestBody = RequestBody.create(okhttp3.MultipartBody.FORM, filename);
RequestBody tokenRequestBody;
try {
tokenRequestBody = RequestBody.create(MultipartBody.FORM, csrfTokenClient.getTokenBlocking());
return uploadInterface.uploadFileToStash(fileNameRequestBody, tokenRequestBody, filePart)
.map(stashUploadResponse -> stashUploadResponse.getUpload());
} catch (Throwable throwable) {
throwable.printStackTrace();
return Observable.error(throwable);
}
}
Observable<UploadResult> uploadFileFromStash(Context context,
Contribution contribution,
String uniqueFileName,
String fileKey) {
try {
return uploadInterface
.uploadFileFromStash(csrfTokenClient.getTokenBlocking(),
pageContentsCreator.createFrom(contribution),
CommonsApplication.DEFAULT_EDIT_SUMMARY,
uniqueFileName,
fileKey).map(uploadResponse -> uploadResponse.getUpload());
} catch (Throwable throwable) {
throwable.printStackTrace();
return Observable.error(throwable);
}
Observable<UploadResult> uploadFileFromStash(final Context context,
final Contribution contribution,
final String uniqueFileName,
final String fileKey) {
try {
return uploadInterface
.uploadFileFromStash(csrfTokenClient.getTokenBlocking(),
pageContentsCreator.createFrom(contribution),
CommonsApplication.DEFAULT_EDIT_SUMMARY,
uniqueFileName,
fileKey).map(UploadResponse::getUpload);
} catch (final Throwable throwable) {
throwable.printStackTrace();
return Observable.error(throwable);
}
}
}

View file

@ -16,19 +16,22 @@ import static org.wikipedia.dataclient.Service.MW_API_PREFIX;
public interface UploadInterface {
@Multipart
@POST(MW_API_PREFIX + "action=upload&stash=1&ignorewarnings=1")
Observable<UploadResponse> uploadFileToStash(@Part("filename") RequestBody filename,
@Part("token") RequestBody token,
@Part MultipartBody.Part filePart);
@Multipart
@POST(MW_API_PREFIX + "action=upload&stash=1&ignorewarnings=1")
Observable<UploadResponse> uploadFileToStash(@Part("filename") RequestBody filename,
@Part("filesize") RequestBody totalFileSize,
@Part("offset") RequestBody offset,
@Part("filekey") RequestBody fileKey,
@Part("token") RequestBody token,
@Part MultipartBody.Part filePart);
@Headers("Cache-Control: no-cache")
@POST(MW_API_PREFIX + "action=upload&ignorewarnings=1")
@FormUrlEncoded
@NonNull
Observable<UploadResponse> uploadFileFromStash(@NonNull @Field("token") String token,
@NonNull @Field("text") String text,
@NonNull @Field("comment") String comment,
@NonNull @Field("filename") String filename,
@NonNull @Field("filekey") String filekey);
@Headers("Cache-Control: no-cache")
@POST(MW_API_PREFIX + "action=upload&ignorewarnings=1")
@FormUrlEncoded
@NonNull
Observable<UploadResponse> uploadFileFromStash(@NonNull @Field("token") String token,
@NonNull @Field("text") String text,
@NonNull @Field("comment") String comment,
@NonNull @Field("filename") String filename,
@NonNull @Field("filekey") String filekey);
}

View file

@ -7,6 +7,7 @@ private const val RESULT_SUCCESS = "Success"
data class UploadResult(
val result: String,
val filekey: String,
val offset: Int,
val filename: String,
val sessionkey: String,
val imageinfo: ImageInfo