public void autoSync(final String token, Observer observer){
Map<Integer,Long> mapLocalVersion=new HashMap<>();
List<Phonebook> listNeedToSyncOrganizationId=new ArrayList<>();
getLocalPhonebookDataSource().getVersion().doOnNext(new Consumer<List<Phonebook>>() {
@Override
public void accept(List<Phonebook> phonebooks) throws Exception {
System.out.println("do on next");
}
}).
flatMap(new Function<List<Phonebook>, ObservableSource<PhonebookDataSource.PhonebookVersionResponse>>() {
@Override
public ObservableSource<PhonebookDataSource.PhonebookVersionResponse> apply(@NonNull List<Phonebook> phonebooks) throws Exception {
for(Phonebook phonebook:phonebooks){
mapLocalVersion.put(phonebook.getId(),phonebook.getVersion());
}
return getRemotePhonebookDataSource().getVersion(token);
}
}).flatMap(new Function<PhonebookDataSource.PhonebookVersionResponse, ObservableSource<PhonebookDataSource.PhonebookHierarchyResponse>>() {
@Override
public ObservableSource<PhonebookDataSource.PhonebookHierarchyResponse> apply(@NonNull PhonebookDataSource.PhonebookVersionResponse phonebookVersionResponse) throws Exception {
for(Phonebook phonebook:phonebookVersionResponse.getPhonebooks()){
long remoteVersion=phonebook.getVersion();
long localVersion=0;
if(mapLocalVersion.containsKey(phonebook.getId())){
localVersion=mapLocalVersion.get(phonebook.getId());
}
if(remoteVersion>localVersion){
listNeedToSyncOrganizationId.add(phonebook);
}
}
return getRemotePhonebookDataSource().getHierarchy(token);
}
}).flatMap(new Function<PhonebookDataSource.PhonebookHierarchyResponse, Observable<Boolean>>() {
@Override
public Observable<Boolean> apply(@NonNull PhonebookDataSource.PhonebookHierarchyResponse phonebookHierarchyResponse) throws Exception {
return getLocalPhonebookDataSource().saveOrganizations(phonebookHierarchyResponse.getOrganizations(),listNeedToSyncOrganizationId);
}
}).flatMap(new Function<Boolean, Observable<List<Organization>>>() {
@Override
public Observable<List<Organization>> apply(@NonNull Boolean aBoolean) throws Exception {
return getLocalPhonebookDataSource().getOrganizations(0);
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(observer);
然后这些observable一直无限次自动重复执行,一直自动输出doOnNext
应该是getLocalPhonebookDataSource一直在接收的问题,你试试distinct或者take