diff options
author | David Luhmer <david-dev@live.de> | 2017-05-29 00:31:01 +0300 |
---|---|---|
committer | David Luhmer <david-dev@live.de> | 2017-05-29 00:31:01 +0300 |
commit | 8d4b2e2a704ff6672f797b4408deeb0c8467f911 (patch) | |
tree | 3b5ae606ee7ede17f3d870a9e6f2e26b5b53d3ea | |
parent | d3c3b709a972534797be484156778ba4eb729d96 (diff) |
Fix App Crash (UndeliverableException - RxJava2)
2 files changed, 146 insertions, 110 deletions
diff --git a/News-Android-App/src/main/java/de/luhmer/owncloudnewsreader/reader/nextcloud/RssItemObservable.java b/News-Android-App/src/main/java/de/luhmer/owncloudnewsreader/reader/nextcloud/RssItemObservable.java index d00800f6..f9d73602 100644 --- a/News-Android-App/src/main/java/de/luhmer/owncloudnewsreader/reader/nextcloud/RssItemObservable.java +++ b/News-Android-App/src/main/java/de/luhmer/owncloudnewsreader/reader/nextcloud/RssItemObservable.java @@ -7,6 +7,9 @@ import com.google.gson.JsonObject; import com.google.gson.stream.JsonReader; import com.google.gson.stream.JsonToken; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; + import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; @@ -31,7 +34,7 @@ import okio.BufferedSource; /** * onNext returns the current amount of synced items */ -public class RssItemObservable extends Observable<Integer> { +public class RssItemObservable implements Publisher<Integer> { private DatabaseConnectionOrm mDbConn; private API mApi; @@ -46,127 +49,130 @@ public class RssItemObservable extends Observable<Integer> { } @Override - protected void subscribeActual(Observer<? super Integer> observer) { - + public void subscribe(Subscriber<? super Integer> s) { try { - mDbConn.clearDatabaseOverSize(); + //throw new RuntimeException(""); + sync(s); + s.onComplete(); + } catch (Exception ex) { + s.onError(ex); + } + } - //String authKey = AuthenticationManager.getGoogleAuthKey(username, password); - //SharedPreferences mPrefs = PreferenceManager.getDefaultSharedPreferences(context); - //int maxItemsInDatabase = Integer.parseInt(mPrefs.getString(SettingsActivity.SP_MAX_ITEMS_SYNC, "200")); + public void sync(Subscriber<? super Integer> subscriber) throws IOException { - long lastModified = mDbConn.getLastModified(); - //dbConn.clearDatabaseOverSize(); + mDbConn.clearDatabaseOverSize(); - int requestCount = 0; - int totalCount = 0; - int maxSyncSize = maxSizePerSync; + //String authKey = AuthenticationManager.getGoogleAuthKey(username, password); + //SharedPreferences mPrefs = PreferenceManager.getDefaultSharedPreferences(context); + //int maxItemsInDatabase = Integer.parseInt(mPrefs.getString(SettingsActivity.SP_MAX_ITEMS_SYNC, "200")); - if(lastModified == 0)//Only on first sync - { - long offset = 0; + long lastModified = mDbConn.getLastModified(); + //dbConn.clearDatabaseOverSize(); - Log.v(TAG, "First sync!!"); - int maxItemsInDatabase = Constants.maxItemsCount; + int requestCount = 0; + int totalCount = 0; + int maxSyncSize = maxSizePerSync; - do { - Log.v(TAG, "offset=" + offset + ", requestCount=" + requestCount + ""); - List<RssItem> buffer = (mApi.items(maxSyncSize, offset, Integer.valueOf(FeedItemTags.ALL.toString()), 0, false, true).execute().body()); + if(lastModified == 0)//Only on first sync + { + long offset = 0; - requestCount = 0; - if(buffer != null) { - requestCount = buffer.size(); - performDatabaseBatchInsert(mDbConn, buffer); - } + Log.v(TAG, "First sync!!"); + int maxItemsInDatabase = Constants.maxItemsCount; - if(requestCount > 0) - offset = mDbConn.getHighestItemId(); - totalCount += requestCount; + do { + Log.v(TAG, "offset=" + offset + ", requestCount=" + requestCount + ""); + List<RssItem> buffer = (mApi.items(maxSyncSize, offset, Integer.valueOf(FeedItemTags.ALL.toString()), 0, false, true).execute().body()); - observer.onNext(totalCount); - } while(requestCount == maxSyncSize); + requestCount = 0; + if(buffer != null) { + requestCount = buffer.size(); + performDatabaseBatchInsert(mDbConn, buffer); + } - Log.v(TAG, "offset=" + offset + ", requestCount=" + requestCount + ", maxSyncSize=" + maxSyncSize); + if(requestCount > 0) + offset = mDbConn.getHighestItemId(); + totalCount += requestCount; - Log.v(TAG, "Sync all items done - Starting starred now"); + subscriber.onNext(totalCount); + } while(requestCount == maxSyncSize); - mPrefs.edit().putInt(Constants.LAST_UPDATE_NEW_ITEMS_COUNT_STRING, totalCount).apply(); + Log.v(TAG, "offset=" + offset + ", requestCount=" + requestCount + ", maxSyncSize=" + maxSyncSize); - do { - offset = mDbConn.getLowestItemId(true); - List<RssItem> buffer = mApi.items(maxSyncSize, offset, Integer.valueOf(FeedItemTags.ALL_STARRED.toString()), 0, false, true).execute().body(); + Log.v(TAG, "Sync all items done - Starting starred now"); - requestCount = 0; - if(buffer != null) { - requestCount = buffer.size(); - performDatabaseBatchInsert(mDbConn, buffer); - } - //if(requestCount > 0) - // offset = dbConn.getLowestItemId(true); - totalCount += requestCount; + mPrefs.edit().putInt(Constants.LAST_UPDATE_NEW_ITEMS_COUNT_STRING, totalCount).apply(); - observer.onNext(totalCount); - } while(requestCount == maxSyncSize && totalCount < maxItemsInDatabase); - } - else - { - Log.v(TAG, "Incremental sync!!"); - //First reset the count of last updated items - mPrefs.edit().putInt(Constants.LAST_UPDATE_NEW_ITEMS_COUNT_STRING, 0).apply(); - - long highestItemIdBeforeSync = mDbConn.getHighestItemId(); - - //Get all updated items - mApi.updatedItems(lastModified+1, Integer.valueOf(FeedItemTags.ALL.toString()), highestItemIdBeforeSync) - .flatMap(new Function<ResponseBody, ObservableSource<RssItem>>() { - @Override - public ObservableSource<RssItem> apply(@NonNull ResponseBody responseBody) throws Exception { - return events(responseBody.source()); - } - }) - .subscribe(new Observer<RssItem>() { - int totalUpdatedUnreadItemCount = 0; - final int bufferSize = 150; - List<RssItem> buffer = new ArrayList<>(bufferSize); //Buffer of size X - - @Override - public void onSubscribe(@NonNull Disposable d) { - Log.v(TAG, "onSubscribe() called with: d = [" + d + "]"); - } + do { + offset = mDbConn.getLowestItemId(true); + List<RssItem> buffer = mApi.items(maxSyncSize, offset, Integer.valueOf(FeedItemTags.ALL_STARRED.toString()), 0, false, true).execute().body(); - @Override - public void onNext(@NonNull RssItem rssItem) { - if(!rssItem.getRead()) { //If updates item is unread - totalUpdatedUnreadItemCount++; - } + requestCount = 0; + if(buffer != null) { + requestCount = buffer.size(); + performDatabaseBatchInsert(mDbConn, buffer); + } + //if(requestCount > 0) + // offset = dbConn.getLowestItemId(true); + totalCount += requestCount; - buffer.add(rssItem); - if (buffer.size() >= bufferSize) { - performDatabaseBatchInsert(mDbConn, buffer); - } - } + subscriber.onNext(totalCount); + } while(requestCount == maxSyncSize && totalCount < maxItemsInDatabase); + } + else + { + Log.v(TAG, "Incremental sync!!"); + //First reset the count of last updated items + mPrefs.edit().putInt(Constants.LAST_UPDATE_NEW_ITEMS_COUNT_STRING, 0).apply(); + + long highestItemIdBeforeSync = mDbConn.getHighestItemId(); + + //Get all updated items + mApi.updatedItems(lastModified+1, Integer.valueOf(FeedItemTags.ALL.toString()), highestItemIdBeforeSync) + .flatMap(new Function<ResponseBody, ObservableSource<RssItem>>() { + @Override + public ObservableSource<RssItem> apply(@NonNull ResponseBody responseBody) throws Exception { + return events(responseBody.source()); + } + }) + .subscribe(new Observer<RssItem>() { + int totalUpdatedUnreadItemCount = 0; + final int bufferSize = 150; + List<RssItem> buffer = new ArrayList<>(bufferSize); //Buffer of size X + + @Override + public void onSubscribe(@NonNull Disposable d) { + Log.v(TAG, "onSubscribe() called with: d = [" + d + "]"); + } - @Override - public void onError(@NonNull Throwable e) { - Log.v(TAG, "onError() called with: e = [" + e + "]"); + @Override + public void onNext(@NonNull RssItem rssItem) { + if(!rssItem.getRead()) { //If updates item is unread + totalUpdatedUnreadItemCount++; } - @Override - public void onComplete() { - Log.v(TAG, "onComplete() called"); + buffer.add(rssItem); + if (buffer.size() >= bufferSize) { performDatabaseBatchInsert(mDbConn, buffer); - - //If no exception occurs, set the number of updated items - mPrefs.edit().putInt(Constants.LAST_UPDATE_NEW_ITEMS_COUNT_STRING, totalUpdatedUnreadItemCount).apply(); } - }); - } - } catch (Exception ex) { - ex.printStackTrace(); + } - } + @Override + public void onError(@NonNull Throwable e) { + Log.v(TAG, "onError() called with: e = [" + e + "]"); + } + + @Override + public void onComplete() { + Log.v(TAG, "onComplete() called"); + performDatabaseBatchInsert(mDbConn, buffer); - observer.onComplete(); + //If no exception occurs, set the number of updated items + mPrefs.edit().putInt(Constants.LAST_UPDATE_NEW_ITEMS_COUNT_STRING, totalUpdatedUnreadItemCount).apply(); + } + }); + } } public static boolean performDatabaseBatchInsert(DatabaseConnectionOrm dbConn, List<RssItem> buffer) { @@ -197,7 +203,6 @@ public class RssItemObservable extends Observable<Integer> { } } - reader.beginArray(); while (reader.hasNext()) { JsonObject jsonObj = getJsonObjectFromReader(reader); @@ -275,5 +280,4 @@ public class RssItemObservable extends Observable<Integer> { } return null; } - } diff --git a/News-Android-App/src/main/java/de/luhmer/owncloudnewsreader/services/OwnCloudSyncService.java b/News-Android-App/src/main/java/de/luhmer/owncloudnewsreader/services/OwnCloudSyncService.java index 48d91141..4da97b13 100644 --- a/News-Android-App/src/main/java/de/luhmer/owncloudnewsreader/services/OwnCloudSyncService.java +++ b/News-Android-App/src/main/java/de/luhmer/owncloudnewsreader/services/OwnCloudSyncService.java @@ -36,9 +36,10 @@ import android.widget.Toast; import org.apache.commons.lang3.time.StopWatch; import org.greenrobot.eventbus.EventBus; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; import java.util.List; -import java.util.concurrent.Callable; import javax.inject.Inject; @@ -58,6 +59,7 @@ import de.luhmer.owncloudnewsreader.reader.nextcloud.RssItemObservable; import de.luhmer.owncloudnewsreader.services.events.SyncFailedEvent; import de.luhmer.owncloudnewsreader.services.events.SyncFinishedEvent; import de.luhmer.owncloudnewsreader.services.events.SyncStartedEvent; +import de.luhmer.owncloudnewsreader.ssl.MemorizingTrustManager; import de.luhmer.owncloudnewsreader.ssl.OkHttpSSLClient; import de.luhmer.owncloudnewsreader.widget.WidgetProvider; import io.reactivex.Observable; @@ -94,6 +96,7 @@ public class OwnCloudSyncService extends Service { @Inject SharedPreferences mPrefs; @Inject ApiProvider mApi; + @Inject MemorizingTrustManager mMTM; public void startSync() { @@ -133,12 +136,14 @@ public class OwnCloudSyncService extends Service { } private class SyncResult { - SyncResult(List<Folder> folders, List<Feed> feeds) { + SyncResult(List<Folder> folders, List<Feed> feeds, Boolean stateSyncSuccessful) { this.folders = folders; this.feeds = feeds; + this.stateSyncSuccessful = stateSyncSuccessful; } List<Folder> folders; List<Feed> feeds; + boolean stateSyncSuccessful; } //Sync state of items e.g. read/unread/starred/unstarred @@ -146,15 +151,41 @@ public class OwnCloudSyncService extends Service { syncStopWatch = new StopWatch(); syncStopWatch.start(); - final DatabaseConnectionOrm dbConn = new DatabaseConnectionOrm(OwnCloudSyncService.this); - Observable rssStateSync = Observable.fromCallable(new Callable<Boolean>() { - @Override - public Boolean call() throws Exception { - ItemStateSync.PerformItemStateSync(mApi.getAPI(), dbConn); - return true; + + //Delete all pinned/stored SSL Certificates + /* + final ArrayList<String> aliases = Collections.list(mMTM.getCertificates()); + for(int i = 0; i < aliases.size(); i++) { + try { + mMTM.deleteCertificate(aliases.get(i)); + } catch (KeyStoreException e) { + e.printStackTrace(); } - }).subscribeOn(Schedulers.newThread()); + }*/ + + + + + + final DatabaseConnectionOrm dbConn = new DatabaseConnectionOrm(OwnCloudSyncService.this); + + + + + Observable rssStateSync = Observable.fromPublisher( + new Publisher() { + @Override + public void subscribe(Subscriber s) { + try { + ItemStateSync.PerformItemStateSync(mApi.getAPI(), dbConn); + s.onNext(true); + s.onComplete(); + } catch(Exception ex) { + s.onError(ex); + } + } + }).subscribeOn(Schedulers.newThread()); // First sync Feeds and Folders and rss item states (in parallel) Observable<List<Folder>> folderObservable = mApi @@ -171,7 +202,8 @@ public class OwnCloudSyncService extends Service { Observable<SyncResult> combined = Observable.zip(folderObservable, feedsObservable, rssStateSync, new Function3<List<Folder>, List<Feed>, Boolean, SyncResult>() { @Override public SyncResult apply(@NonNull List<Folder> folders, @NonNull List<Feed> feeds, @NonNull Boolean mRes) throws Exception { - return new SyncResult(folders, feeds); + Log.v(TAG, "apply() called with: folders = [" + folders + "], feeds = [" + feeds + "], mRes = [" + mRes + "]"); + return new SyncResult(folders, feeds, mRes); } }); @@ -198,7 +230,7 @@ public class OwnCloudSyncService extends Service { private void syncRssItems(final DatabaseConnectionOrm dbConn) { - new RssItemObservable(dbConn, mApi.getAPI(), mPrefs) + Observable.fromPublisher(new RssItemObservable(dbConn, mApi.getAPI(), mPrefs)) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<Integer>() { |