Rxjava2多次重复调用问题,求大神帮助

public void autoSync(final String token, Observer observer){
        Map<Integer,Long> mapLocalVersion=new HashMap<>();
        List<Phonebook> listNeedToSyncOrganizationId=new ArrayList<>();
        getLocalPhonebookDataSource().getVersion().doOnNext(new Consumer<List<Phonebook>>() {
            @Override
            public void accept(List<Phonebook> phonebooks) throws Exception {
                System.out.println("do on next");


            }
        }).
                flatMap(new Function<List<Phonebook>, ObservableSource<PhonebookDataSource.PhonebookVersionResponse>>() {
            @Override
            public ObservableSource<PhonebookDataSource.PhonebookVersionResponse> apply(@NonNull List<Phonebook> phonebooks) throws Exception {
                for(Phonebook phonebook:phonebooks){
                    mapLocalVersion.put(phonebook.getId(),phonebook.getVersion());
                }
                return getRemotePhonebookDataSource().getVersion(token);
            }
        }).flatMap(new Function<PhonebookDataSource.PhonebookVersionResponse, ObservableSource<PhonebookDataSource.PhonebookHierarchyResponse>>() {
            @Override
            public ObservableSource<PhonebookDataSource.PhonebookHierarchyResponse> apply(@NonNull PhonebookDataSource.PhonebookVersionResponse phonebookVersionResponse) throws Exception {
                for(Phonebook phonebook:phonebookVersionResponse.getPhonebooks()){
                    long remoteVersion=phonebook.getVersion();
                    long localVersion=0;
                    if(mapLocalVersion.containsKey(phonebook.getId())){
                        localVersion=mapLocalVersion.get(phonebook.getId());
                    }
                    if(remoteVersion>localVersion){
                        listNeedToSyncOrganizationId.add(phonebook);
                    }
                }
                return getRemotePhonebookDataSource().getHierarchy(token);
            }
        }).flatMap(new Function<PhonebookDataSource.PhonebookHierarchyResponse, Observable<Boolean>>() {
            @Override
            public Observable<Boolean> apply(@NonNull PhonebookDataSource.PhonebookHierarchyResponse phonebookHierarchyResponse) throws Exception {

                return getLocalPhonebookDataSource().saveOrganizations(phonebookHierarchyResponse.getOrganizations(),listNeedToSyncOrganizationId);
            }
        }).flatMap(new Function<Boolean, Observable<List<Organization>>>() {
            @Override
            public Observable<List<Organization>> apply(@NonNull Boolean aBoolean) throws Exception {
                return getLocalPhonebookDataSource().getOrganizations(0);
            }
        }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(observer);

然后这些observable一直无限次自动重复执行,一直自动输出doOnNext

应该是getLocalPhonebookDataSource一直在接收的问题,你试试distinct或者take