From 8d4b2e2a704ff6672f797b4408deeb0c8467f911 Mon Sep 17 00:00:00 2001 From: David Luhmer Date: Sun, 28 May 2017 23:31:01 +0200 Subject: Fix App Crash (UndeliverableException - RxJava2) --- .../reader/nextcloud/RssItemObservable.java | 202 +++++++++++---------- .../services/OwnCloudSyncService.java | 54 ++++-- 2 files changed, 146 insertions(+), 110 deletions(-) (limited to 'News-Android-App') diff --git a/News-Android-App/src/main/java/de/luhmer/owncloudnewsreader/reader/nextcloud/RssItemObservable.java b/News-Android-App/src/main/java/de/luhmer/owncloudnewsreader/reader/nextcloud/RssItemObservable.java index d00800f6..f9d73602 100644 --- a/News-Android-App/src/main/java/de/luhmer/owncloudnewsreader/reader/nextcloud/RssItemObservable.java +++ b/News-Android-App/src/main/java/de/luhmer/owncloudnewsreader/reader/nextcloud/RssItemObservable.java @@ -7,6 +7,9 @@ import com.google.gson.JsonObject; import com.google.gson.stream.JsonReader; import com.google.gson.stream.JsonToken; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; + import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; @@ -31,7 +34,7 @@ import okio.BufferedSource; /** * onNext returns the current amount of synced items */ -public class RssItemObservable extends Observable { +public class RssItemObservable implements Publisher { private DatabaseConnectionOrm mDbConn; private API mApi; @@ -46,127 +49,130 @@ public class RssItemObservable extends Observable { } @Override - protected void subscribeActual(Observer observer) { - + public void subscribe(Subscriber s) { try { - mDbConn.clearDatabaseOverSize(); + //throw new RuntimeException(""); + sync(s); + s.onComplete(); + } catch (Exception ex) { + s.onError(ex); + } + } - //String authKey = AuthenticationManager.getGoogleAuthKey(username, password); - //SharedPreferences mPrefs = PreferenceManager.getDefaultSharedPreferences(context); - //int maxItemsInDatabase = Integer.parseInt(mPrefs.getString(SettingsActivity.SP_MAX_ITEMS_SYNC, "200")); + public void sync(Subscriber subscriber) throws IOException { - long lastModified = mDbConn.getLastModified(); - //dbConn.clearDatabaseOverSize(); + mDbConn.clearDatabaseOverSize(); - int requestCount = 0; - int totalCount = 0; - int maxSyncSize = maxSizePerSync; + //String authKey = AuthenticationManager.getGoogleAuthKey(username, password); + //SharedPreferences mPrefs = PreferenceManager.getDefaultSharedPreferences(context); + //int maxItemsInDatabase = Integer.parseInt(mPrefs.getString(SettingsActivity.SP_MAX_ITEMS_SYNC, "200")); - if(lastModified == 0)//Only on first sync - { - long offset = 0; + long lastModified = mDbConn.getLastModified(); + //dbConn.clearDatabaseOverSize(); - Log.v(TAG, "First sync!!"); - int maxItemsInDatabase = Constants.maxItemsCount; + int requestCount = 0; + int totalCount = 0; + int maxSyncSize = maxSizePerSync; - do { - Log.v(TAG, "offset=" + offset + ", requestCount=" + requestCount + ""); - List buffer = (mApi.items(maxSyncSize, offset, Integer.valueOf(FeedItemTags.ALL.toString()), 0, false, true).execute().body()); + if(lastModified == 0)//Only on first sync + { + long offset = 0; - requestCount = 0; - if(buffer != null) { - requestCount = buffer.size(); - performDatabaseBatchInsert(mDbConn, buffer); - } + Log.v(TAG, "First sync!!"); + int maxItemsInDatabase = Constants.maxItemsCount; - if(requestCount > 0) - offset = mDbConn.getHighestItemId(); - totalCount += requestCount; + do { + Log.v(TAG, "offset=" + offset + ", requestCount=" + requestCount + ""); + List buffer = (mApi.items(maxSyncSize, offset, Integer.valueOf(FeedItemTags.ALL.toString()), 0, false, true).execute().body()); - observer.onNext(totalCount); - } while(requestCount == maxSyncSize); + requestCount = 0; + if(buffer != null) { + requestCount = buffer.size(); + performDatabaseBatchInsert(mDbConn, buffer); + } - Log.v(TAG, "offset=" + offset + ", requestCount=" + requestCount + ", maxSyncSize=" + maxSyncSize); + if(requestCount > 0) + offset = mDbConn.getHighestItemId(); + totalCount += requestCount; - Log.v(TAG, "Sync all items done - Starting starred now"); + subscriber.onNext(totalCount); + } while(requestCount == maxSyncSize); - mPrefs.edit().putInt(Constants.LAST_UPDATE_NEW_ITEMS_COUNT_STRING, totalCount).apply(); + Log.v(TAG, "offset=" + offset + ", requestCount=" + requestCount + ", maxSyncSize=" + maxSyncSize); - do { - offset = mDbConn.getLowestItemId(true); - List buffer = mApi.items(maxSyncSize, offset, Integer.valueOf(FeedItemTags.ALL_STARRED.toString()), 0, false, true).execute().body(); + Log.v(TAG, "Sync all items done - Starting starred now"); - requestCount = 0; - if(buffer != null) { - requestCount = buffer.size(); - performDatabaseBatchInsert(mDbConn, buffer); - } - //if(requestCount > 0) - // offset = dbConn.getLowestItemId(true); - totalCount += requestCount; + mPrefs.edit().putInt(Constants.LAST_UPDATE_NEW_ITEMS_COUNT_STRING, totalCount).apply(); - observer.onNext(totalCount); - } while(requestCount == maxSyncSize && totalCount < maxItemsInDatabase); - } - else - { - Log.v(TAG, "Incremental sync!!"); - //First reset the count of last updated items - mPrefs.edit().putInt(Constants.LAST_UPDATE_NEW_ITEMS_COUNT_STRING, 0).apply(); - - long highestItemIdBeforeSync = mDbConn.getHighestItemId(); - - //Get all updated items - mApi.updatedItems(lastModified+1, Integer.valueOf(FeedItemTags.ALL.toString()), highestItemIdBeforeSync) - .flatMap(new Function>() { - @Override - public ObservableSource apply(@NonNull ResponseBody responseBody) throws Exception { - return events(responseBody.source()); - } - }) - .subscribe(new Observer() { - int totalUpdatedUnreadItemCount = 0; - final int bufferSize = 150; - List buffer = new ArrayList<>(bufferSize); //Buffer of size X - - @Override - public void onSubscribe(@NonNull Disposable d) { - Log.v(TAG, "onSubscribe() called with: d = [" + d + "]"); - } + do { + offset = mDbConn.getLowestItemId(true); + List buffer = mApi.items(maxSyncSize, offset, Integer.valueOf(FeedItemTags.ALL_STARRED.toString()), 0, false, true).execute().body(); - @Override - public void onNext(@NonNull RssItem rssItem) { - if(!rssItem.getRead()) { //If updates item is unread - totalUpdatedUnreadItemCount++; - } + requestCount = 0; + if(buffer != null) { + requestCount = buffer.size(); + performDatabaseBatchInsert(mDbConn, buffer); + } + //if(requestCount > 0) + // offset = dbConn.getLowestItemId(true); + totalCount += requestCount; - buffer.add(rssItem); - if (buffer.size() >= bufferSize) { - performDatabaseBatchInsert(mDbConn, buffer); - } - } + subscriber.onNext(totalCount); + } while(requestCount == maxSyncSize && totalCount < maxItemsInDatabase); + } + else + { + Log.v(TAG, "Incremental sync!!"); + //First reset the count of last updated items + mPrefs.edit().putInt(Constants.LAST_UPDATE_NEW_ITEMS_COUNT_STRING, 0).apply(); + + long highestItemIdBeforeSync = mDbConn.getHighestItemId(); + + //Get all updated items + mApi.updatedItems(lastModified+1, Integer.valueOf(FeedItemTags.ALL.toString()), highestItemIdBeforeSync) + .flatMap(new Function>() { + @Override + public ObservableSource apply(@NonNull ResponseBody responseBody) throws Exception { + return events(responseBody.source()); + } + }) + .subscribe(new Observer() { + int totalUpdatedUnreadItemCount = 0; + final int bufferSize = 150; + List buffer = new ArrayList<>(bufferSize); //Buffer of size X + + @Override + public void onSubscribe(@NonNull Disposable d) { + Log.v(TAG, "onSubscribe() called with: d = [" + d + "]"); + } - @Override - public void onError(@NonNull Throwable e) { - Log.v(TAG, "onError() called with: e = [" + e + "]"); + @Override + public void onNext(@NonNull RssItem rssItem) { + if(!rssItem.getRead()) { //If updates item is unread + totalUpdatedUnreadItemCount++; } - @Override - public void onComplete() { - Log.v(TAG, "onComplete() called"); + buffer.add(rssItem); + if (buffer.size() >= bufferSize) { performDatabaseBatchInsert(mDbConn, buffer); - - //If no exception occurs, set the number of updated items - mPrefs.edit().putInt(Constants.LAST_UPDATE_NEW_ITEMS_COUNT_STRING, totalUpdatedUnreadItemCount).apply(); } - }); - } - } catch (Exception ex) { - ex.printStackTrace(); + } - } + @Override + public void onError(@NonNull Throwable e) { + Log.v(TAG, "onError() called with: e = [" + e + "]"); + } + + @Override + public void onComplete() { + Log.v(TAG, "onComplete() called"); + performDatabaseBatchInsert(mDbConn, buffer); - observer.onComplete(); + //If no exception occurs, set the number of updated items + mPrefs.edit().putInt(Constants.LAST_UPDATE_NEW_ITEMS_COUNT_STRING, totalUpdatedUnreadItemCount).apply(); + } + }); + } } public static boolean performDatabaseBatchInsert(DatabaseConnectionOrm dbConn, List buffer) { @@ -197,7 +203,6 @@ public class RssItemObservable extends Observable { } } - reader.beginArray(); while (reader.hasNext()) { JsonObject jsonObj = getJsonObjectFromReader(reader); @@ -275,5 +280,4 @@ public class RssItemObservable extends Observable { } return null; } - } diff --git a/News-Android-App/src/main/java/de/luhmer/owncloudnewsreader/services/OwnCloudSyncService.java b/News-Android-App/src/main/java/de/luhmer/owncloudnewsreader/services/OwnCloudSyncService.java index 48d91141..4da97b13 100644 --- a/News-Android-App/src/main/java/de/luhmer/owncloudnewsreader/services/OwnCloudSyncService.java +++ b/News-Android-App/src/main/java/de/luhmer/owncloudnewsreader/services/OwnCloudSyncService.java @@ -36,9 +36,10 @@ import android.widget.Toast; import org.apache.commons.lang3.time.StopWatch; import org.greenrobot.eventbus.EventBus; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; import java.util.List; -import java.util.concurrent.Callable; import javax.inject.Inject; @@ -58,6 +59,7 @@ import de.luhmer.owncloudnewsreader.reader.nextcloud.RssItemObservable; import de.luhmer.owncloudnewsreader.services.events.SyncFailedEvent; import de.luhmer.owncloudnewsreader.services.events.SyncFinishedEvent; import de.luhmer.owncloudnewsreader.services.events.SyncStartedEvent; +import de.luhmer.owncloudnewsreader.ssl.MemorizingTrustManager; import de.luhmer.owncloudnewsreader.ssl.OkHttpSSLClient; import de.luhmer.owncloudnewsreader.widget.WidgetProvider; import io.reactivex.Observable; @@ -94,6 +96,7 @@ public class OwnCloudSyncService extends Service { @Inject SharedPreferences mPrefs; @Inject ApiProvider mApi; + @Inject MemorizingTrustManager mMTM; public void startSync() { @@ -133,12 +136,14 @@ public class OwnCloudSyncService extends Service { } private class SyncResult { - SyncResult(List folders, List feeds) { + SyncResult(List folders, List feeds, Boolean stateSyncSuccessful) { this.folders = folders; this.feeds = feeds; + this.stateSyncSuccessful = stateSyncSuccessful; } List folders; List feeds; + boolean stateSyncSuccessful; } //Sync state of items e.g. read/unread/starred/unstarred @@ -146,15 +151,41 @@ public class OwnCloudSyncService extends Service { syncStopWatch = new StopWatch(); syncStopWatch.start(); - final DatabaseConnectionOrm dbConn = new DatabaseConnectionOrm(OwnCloudSyncService.this); - Observable rssStateSync = Observable.fromCallable(new Callable() { - @Override - public Boolean call() throws Exception { - ItemStateSync.PerformItemStateSync(mApi.getAPI(), dbConn); - return true; + + //Delete all pinned/stored SSL Certificates + /* + final ArrayList aliases = Collections.list(mMTM.getCertificates()); + for(int i = 0; i < aliases.size(); i++) { + try { + mMTM.deleteCertificate(aliases.get(i)); + } catch (KeyStoreException e) { + e.printStackTrace(); } - }).subscribeOn(Schedulers.newThread()); + }*/ + + + + + + final DatabaseConnectionOrm dbConn = new DatabaseConnectionOrm(OwnCloudSyncService.this); + + + + + Observable rssStateSync = Observable.fromPublisher( + new Publisher() { + @Override + public void subscribe(Subscriber s) { + try { + ItemStateSync.PerformItemStateSync(mApi.getAPI(), dbConn); + s.onNext(true); + s.onComplete(); + } catch(Exception ex) { + s.onError(ex); + } + } + }).subscribeOn(Schedulers.newThread()); // First sync Feeds and Folders and rss item states (in parallel) Observable> folderObservable = mApi @@ -171,7 +202,8 @@ public class OwnCloudSyncService extends Service { Observable combined = Observable.zip(folderObservable, feedsObservable, rssStateSync, new Function3, List, Boolean, SyncResult>() { @Override public SyncResult apply(@NonNull List folders, @NonNull List feeds, @NonNull Boolean mRes) throws Exception { - return new SyncResult(folders, feeds); + Log.v(TAG, "apply() called with: folders = [" + folders + "], feeds = [" + feeds + "], mRes = [" + mRes + "]"); + return new SyncResult(folders, feeds, mRes); } }); @@ -198,7 +230,7 @@ public class OwnCloudSyncService extends Service { private void syncRssItems(final DatabaseConnectionOrm dbConn) { - new RssItemObservable(dbConn, mApi.getAPI(), mPrefs) + Observable.fromPublisher(new RssItemObservable(dbConn, mApi.getAPI(), mPrefs)) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer() { -- cgit v1.2.3