I’m using parseq framework for asynchronous computation.
Consider the following code. It first queries the content of google.com and then map the content to it’s length. Finally, the length is printed.
The problem is that only the first task is ran. Why?
public class Main { public static void main(String[] args) throws Exception { OkHttpClient okHttpClient = new OkHttpClient(); final int numCores = Runtime.getRuntime().availableProcessors(); final ExecutorService taskScheduler = Executors.newFixedThreadPool(numCores + 1); final ScheduledExecutorService timerScheduler = Executors.newScheduledThreadPool(numCores + 1); final Engine engine = new EngineBuilder() .setTaskExecutor(taskScheduler) .setTimerScheduler(timerScheduler) .build(); Task<Integer> task = Task.async(() -> { SettablePromise<String> promise = Promises.settable(); Request request = new Request.Builder() .url("http://google.com") .build(); okHttpClient.newCall(request).enqueue(new Callback() { @Override public void onFailure(Call call, IOException e) { System.out.println("error"); } @Override public void onResponse(Call call, Response response) throws IOException { promise.done(response.body().string()); } }); return promise; }).map("map content to length", content -> content.length()) .andThen(System.out::println); engine.blockingRun(task); engine.blockingRun(task); } }
Advertisement
Answer
I was able to solve your problem with the use of HttpClient
instead of OkHttp
.
Below are the overall maven dependencies that i used for this code:
<dependency> <groupId>com.linkedin.parseq</groupId> <artifactId>parseq</artifactId> <version>3.0.11</version> </dependency> <dependency> <groupId>com.linkedin.parseq</groupId> <artifactId>parseq-http-client</artifactId> <version>3.0.11</version> </dependency>
import com.linkedin.parseq.Engine; import com.linkedin.parseq.EngineBuilder; import com.linkedin.parseq.Task; import com.linkedin.parseq.httpclient.HttpClient; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; public class Main { private static Task<Integer> fetchBody(String url) { Task<Integer> map = HttpClient.get(url).task().map("map content to length", content -> content.getResponseBody().length()); return map; } public static void main(String[] args) { final int numCores = Runtime.getRuntime().availableProcessors(); final ExecutorService taskScheduler = Executors.newFixedThreadPool(numCores + 1); final ScheduledExecutorService timerScheduler = Executors.newScheduledThreadPool(numCores + 1); final Engine engine = new EngineBuilder() .setTaskExecutor(taskScheduler) .setTimerScheduler(timerScheduler) .build(); final Task<Integer> stackOverFlow = fetchBody("http://www.stackoverflow.com"); final Task<Integer> google = fetchBody("http://www.google.com"); final Task<Integer> ethereum = fetchBody("http://ethereum.stackexchange.com"); final Task<String> plan = Task.par(stackOverFlow, google, ethereum) .map((s, g, e) -> "StackOverFlow Page: " + s + " n" + "Google Page: " + g + "n" + "Ethereum Page: " + e + "n") .andThen(System.out::println); engine.run(plan); } }
Output:
StackOverFlow Page: 149 Google Page: 13097 Ethereum Page: 152
This example is fully asynchronous. The home pages for StackOverflow , Google, and Ethereum are all fetched in parallel while the original thread has returned to the calling code. We used Tasks.par to tell the engine to parallelize these HTTP requests. Once all of the responses have been retrieved they are transformed into a
int
(string length)that is finally printed out.
Gist: https://gist.github.com/vishwaratna/26417f7467a4e827eadeee6923ddf3ae