From 80cf70eeba03d954e346a1bedeec7a490579309e Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Fri, 23 Jul 2010 23:10:42 -0700 Subject: [PATCH] skip uploading blobs that server already has. --- .../src/com/danga/camli/UploadService.java | 15 ++--- .../src/com/danga/camli/UploadThread.java | 57 +++++++++++++++++-- 2 files changed, 59 insertions(+), 13 deletions(-) diff --git a/clients/android/uploader/src/com/danga/camli/UploadService.java b/clients/android/uploader/src/com/danga/camli/UploadService.java index 0f3eda64b..ff06a9692 100644 --- a/clients/android/uploader/src/com/danga/camli/UploadService.java +++ b/clients/android/uploader/src/com/danga/camli/UploadService.java @@ -41,8 +41,9 @@ public class UploadService extends Service { } } - void onUploadThreadEnding() { + private void onUploadThreadEnded() { synchronized (this) { + Log.d(TAG, "UploadThread ended."); mUploadThread = null; mUploading = false; } @@ -50,11 +51,8 @@ public class UploadService extends Service { void onUploadComplete(QueuedFile qf) { synchronized (this) { - boolean removedSet = mQueueSet.remove(qf); - boolean removedList = mQueueList.remove(qf); // TODO: ghetto, linear - // scan - Log.d(TAG, "removing of " + qf + "; removedSet=" + removedSet + "; removedList=" - + removedList); + mQueueSet.remove(qf); + mQueueList.remove(qf); // TODO: ghetto, linear scan } } @@ -113,7 +111,6 @@ public class UploadService extends Service { if (mUploadThread != null) { return false; } - mUploading = true; SharedPreferences sp = getSharedPreferences(Preferences.NAME, 0); HostPort hp = new HostPort(sp.getString(Preferences.HOST, "")); @@ -122,9 +119,12 @@ public class UploadService extends Service { } String password = sp.getString(Preferences.PASSWORD, ""); + PowerManager pm = (PowerManager) getSystemService(Context.POWER_SERVICE); final PowerManager.WakeLock wl = pm.newWakeLock(PowerManager.PARTIAL_WAKE_LOCK, "Camli Upload"); wl.acquire(); + + mUploading = true; mUploadThread = new UploadThread(UploadService.this, hp, password); @@ -138,6 +138,7 @@ public class UploadService extends Service { } Log.d(TAG, "UploadThread done; releasing the wakelock"); wl.release(); + onUploadThreadEnded(); } }.start(); mUploadThread.start(); diff --git a/clients/android/uploader/src/com/danga/camli/UploadThread.java b/clients/android/uploader/src/com/danga/camli/UploadThread.java index 5cc09181b..a2080d4f0 100644 --- a/clients/android/uploader/src/com/danga/camli/UploadThread.java +++ b/clients/android/uploader/src/com/danga/camli/UploadThread.java @@ -10,6 +10,7 @@ import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; +import java.util.ListIterator; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.http.Header; @@ -26,6 +27,7 @@ import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.client.DefaultHttpClient; import org.apache.http.message.BasicHeader; import org.apache.http.message.BasicNameValuePair; +import org.json.JSONArray; import org.json.JSONException; import org.json.JSONObject; @@ -70,22 +72,17 @@ public class UploadThread extends Thread { JSONObject preUpload = doPreUpload(); if (preUpload == null) { Log.w(TAG, "Preupload failed, ending UploadThread."); - mService.onUploadThreadEnding(); return; } Log.d(TAG, "Starting upload of " + mQueue.size() + " files."); if (!doUpload(preUpload)) { Log.w(TAG, "Upload failed, ending UploadThread."); - mService.onUploadThreadEnding(); return; } - - Log.d(TAG, "Did upload. Queue size is now " + mQueue.size() + " files."); } Log.d(TAG, "Queue empty; done."); - mService.onUploadThreadEnding(); } private JSONObject doPreUpload() { @@ -136,6 +133,12 @@ public class UploadThread extends Thread { .optString("uploadUrl", "http://" + mHostPort + "/camli/upload"); Log.d(TAG, "uploadURL is: " + uploadUrl); + // Which ones do we already have, so don't have to upload again? + filterOutAlreadyUploadedBlobs(preUpload.optJSONArray("alreadyHave")); + if (mQueue.isEmpty()) { + return true; + } + HttpPost uploadReq = new HttpPost(uploadUrl); MultipartEntity entity = new MultipartEntity(); uploadReq.setEntity(entity); @@ -168,6 +171,39 @@ public class UploadThread extends Thread { return true; } + private void filterOutAlreadyUploadedBlobs(JSONArray alreadyHave) { + if (alreadyHave == null) { + return; + } + for (int i = 0; i < alreadyHave.length(); ++i) { + JSONObject o = alreadyHave.optJSONObject(i); + if (o == null) { + // Malformed response; ignore. + continue; + } + String blobRef = o.optString("blobRef"); + if (blobRef == null) { + // Malformed response; ignore + continue; + } + filterOutBlobRef(blobRef); + } + } + + private void filterOutBlobRef(String blobRef) { + // TODO: kinda lame, iterating over whole list. + ListIterator iter = mQueue.listIterator(); + while (iter.hasNext()) { + QueuedFile qf = iter.next(); + if (qf.getContentName().equals(blobRef)) { + iter.remove(); + // TODO: signal back to service that this wasn't _actually_ + // uploaded, but rather just skipped? Good enough for now... + mService.onUploadComplete(qf); + } + } + } + private class MultipartEntity implements HttpEntity { private boolean mDone = false; @@ -234,10 +270,13 @@ public class UploadThread extends Thread { int bytesWritten = 0; long timeStarted = SystemClock.uptimeMillis(); + long lastLogUpdate = 0; for (QueuedFile qf : mQueue) { Log.d(TAG, "begin writeTo of " + qf); ParcelFileDescriptor pfd = mService.getFileDescriptor(qf.getUri()); + long totalFileBytes = pfd.getStatSize(); + long uploadedFileBytes = 0; if (pfd == null) { // TODO: report some error up to user? mQueue.removeFirst(); @@ -254,7 +293,13 @@ public class UploadThread extends Thread { int n; while ((n = fis.read(buf)) != -1) { bytesWritten += n; - Log.d(TAG, "wrote " + n + " bytes to " + qf + ", total=" + bytesWritten); + uploadedFileBytes += n; + long now = SystemClock.uptimeMillis(); + if (now - lastLogUpdate > 1000) { + Log.d(TAG, "wrote " + uploadedFileBytes + "/" + totalFileBytes + " of " + + qf); + lastLogUpdate = now; + } bos.write(buf, 0, n); if (mStopRequested.get()) { Log.d(TAG, "Stopping upload pre-maturely.");