Zip does not seem to wait for Observable to output data

I am trying to run 2 parallel queries using a volley to update the database (using DBFlow). One table in the database can be filled only after both queries have been made and their data (FK) has been saved.

In the example below, I want both sectors / employees to fetch / paste in parallel, and then after the inserts have been completed to save the contracts.

/**
 * Update the sectors, employees and their contracts
 * @return An Observable to watch for the process to complete
 *
 *  sectorsFetch______sectorsInsert________contractsInsert
 *  employeesFetch____employeesInsert___/
 */
public static Observable<Void> updateEverything() {
    try {
        Log.d(TAG, "Starting update...");
        Observable<JSONArray> employeesFetch = Observable.from(ForumAPI.getInstance().getEmployees());
        Observable<List<Contract>> employeesInsert = employeesFetch.flatMap(new Func1<JSONArray, Observable<List<Contract>>>() {
            @Override
            public Observable<List<Contract>> call(JSONArray employees) {
                Log.d(TAG, "Employee list fetched");
                return saveEmployees(employees);
            }
        });

        Observable<JSONArray> sectorsFetch = Observable.from(ForumAPI.getInstance().getSectors());
        Observable<Void> sectorsInsert = sectorsFetch.flatMap(new Func1<JSONArray, Observable<Void>>() {
            @Override
            public Observable<Void> call(JSONArray sectors) {
                Log.d(TAG, "Sector list fetched");
                return saveSectors(sectors);
            }
        });

        return Observable.zip(sectorsInsert, employeesInsert, new Func2<Void, List<Contract>, Void>() {
            @Override
            public Void call(Void aVoid, List<Contract> contracts) {
                Log.d(TAG, "Sectors and employees saved. Saving contracts");
                return saveContracts(contracts);
            }
        });

    } catch (InterruptedException | ExecutionException e) {
        Log.e(TAG, e.getMessage());
        return Observable.error(e);
    }
}

Note: ForumAPI getEmployees / Sectors returns the future as.

Bellow are conservation methods.

/**
 * Parse and save an array of sectors
 * @param jsonSectors The array of sector to save
 * @return An Observable to watch for the process to complete.
 */
private static Observable<Void> saveSectors(JSONArray jsonSectors) {
    Log.d(TAG, "Transforming JSON sectors to object");
    List<Sector> sectList = new ArrayList<>();
    try {
        for (int i = 0; i < jsonSectors.length(); i++) {
            JSONObject jsonSect = jsonSectors.getJSONObject(i);
            Sector sect = Sector.build(jsonSect);
            sectList.add(sect);
        }
        Log.d(TAG, sectList.size() + " sectors fetched. Saving...");
        ForumDB.getDB().executeTransaction(
                FastStoreModelTransaction.insertBuilder(
                        FlowManager.getModelAdapter(Sector.class)
                ).addAll(sectList).build());

        Log.d(TAG, "Sector list saved");
    } catch (JSONException e) {
        Log.e(TAG, "Unable to parse sector list. " + e.getMessage());
        return Observable.error(e);
    }
    return Observable.empty();
}
/**
 * Parse and save an array of sectors
 * @param jsonSectors The array of sector to save
 * @return An Observable to watch for the process to complete.
 */
private static Observable<Void> saveSectors(JSONArray jsonSectors) {
    Log.d(TAG, "Transforming JSON sectors to object");
    List<Sector> sectList = new ArrayList<>();
    try {
        for (int i = 0; i < jsonSectors.length(); i++) {
            JSONObject jsonSect = jsonSectors.getJSONObject(i);
            Sector sect = Sector.build(jsonSect);
            sectList.add(sect);
        }
        Log.d(TAG, sectList.size() + " sectors fetched. Saving...");
        ForumDB.getDB().executeTransaction(
                FastStoreModelTransaction.insertBuilder(
                        FlowManager.getModelAdapter(Sector.class)
                ).addAll(sectList).build());

        Log.d(TAG, "Sector list saved");
    } catch (JSONException e) {
        Log.e(TAG, "Unable to parse sector list. " + e.getMessage());
        return Observable.error(e);
    }
    return Observable.empty();
}

/**
 * Parse and save an array of employees
 * @param jsonEmployees The array of employee to save
 * @return An Observable to watch for the process to complete.
 */
private static Observable<List<Contract>> saveEmployees(JSONArray jsonEmployees) {
    Log.d(TAG, "Transforming JSON employees to object");
    List<Person> empList = new ArrayList<>();
    List<Contract> contractList = new ArrayList<>();
    try {
        for (int i = 0; i < jsonEmployees.length(); i++) {
            JSONObject jsonEmp = jsonEmployees.getJSONObject(i);
            Person emp = Person.build(jsonEmp);
            empList.add(emp);
            JSONArray jsonContracts = jsonEmp.getJSONArray("sectors");
            for (int j = 0; j <  jsonContracts.length(); j++) {
                Contract contract = new Contract();
                contract.setSectorId(jsonContracts.getJSONObject(j).getInt("id"));
                contract.setPersonForumId(emp.getForumId());
                contractList.add(contract);
            }
        }
        Log.d(TAG, empList.size() + " employees fetched. Saving...");
        ForumDB.getDB().executeTransaction(
                FastStoreModelTransaction.insertBuilder(
                        FlowManager.getModelAdapter(Person.class)
                ).addAll(empList).build());
        Log.d(TAG, "Employee list saved");
    } catch (JSONException e) {
        Log.e(TAG, "Unable to parse employee list. " + e.getMessage());
        return Observable.error(e);
    }
    return Observable.just(contractList);
}

/**
 * Save a list of contract
 * @param contracts The list of contract to save
 * @return An Observable to watch for the process to complete.
 */
private static Void saveContracts(List contracts) {
    ForumDB.getDB().executeTransaction(
            FastStoreModelTransaction.insertBuilder(
                    FlowManager.getModelAdapter(Contract.class)
            ).addAll(contracts).build());
    Log.d(TAG, "Contract list saved");

    return null;
}

The problem is that when subscribing to this global observable from Android activity, my onCompleted observer is called right after the Fetch sector data is output (neither the Insert sector nor my zip are called).

The log is shown below.

D/com.xx.observable.DataUpdater: Starting update...
D/com.xx.helper.ForumAPI: Requesting employee list
D/com.xx.helper.ForumAPI: Request added to queue...
D/com.xx.helper.ForumAPI: Requesting sector list
D/com.xx.helper.ForumAPI: Request added to queue
D/com.xx.observable.DataUpdater: Sector list fetched
D/com.xx.observable.DataUpdater: Transforming JSON sectors to object
D/com.xx.observable.DataUpdater: 8 sectors fetched. Saving...
D/com.xx.observable.DataUpdater: Sector list saved
D/com.xx.activity.Startup: onCompleted reached

, . -, zip , ?

+4
1

zip 1.1.6 ​​ :

, , , . , , (, , doOnCompleted(). , , A , B - , A , B. : zip(Arrays.asList(range(1, 5).doOnCompleted(action1), range(6, 5).doOnCompleted(action2)), (a) -> a) action1 , action2 .

, empty(). zip Observable.<Void>just(null) .

+2

All Articles