Future/CompletableFuture与Lambda

Future和CompletableFuture

Runnable task = () -> System.out.println("Hello World"); ExecutorService service = Executors.newSingleThreadExecutor(); //直接通过ExecutorService执行 Future future = service.submit(task); //或者通过CompletableFuture调用ExecutorService执行 CompletableFuture completableFuture = CompletableFuture.runAsync(task, service); //还可以写为直接调用task,不通过ExecutorService CompletableFuture completableFuture = CompletableFuture.runAsync(task); //执行完毕后需要关闭,否则线程仍然卡着 service.shutdown(); //输出 Hello World //Callable方式 Supplier task = () -> { System.out.println("Hello World!"); return "Hello World!"; }; ExecutorService service = Executors.newSingleThreadExecutor(); CompletableFuture completableFuture = CompletableFuture.supplyAsync(task, service); //或者写为不通过ExecutorService的方式 CompletableFuture completableFuture = CompletableFuture.supplyAsync(task); //获取执行结果 String string = completableFuture.join(); System.out.println(string); service.shutdown(); //输出 Hello World Hello World

Future有五个方法
//阻塞等待获取结果 T get() //设定超时时间获取结果,否则返回excepiton T get(Long timeout,TimeUnit unit) //取消该线程 void cancel() //获取是否已经完成 boolean isDone() //获取是否被取消了 boolean isCancelled()

CompletableFuture也有五个方法
//同Future的get()方法 T join() //检测线程的执行状态,如果完成返回结果,未完成则返回默认的valueIfAbsent的值 T getNow(T valueIfAbsent) //强制返回值方法,如果线程完成了,则返回get或者join的结果,如果未完成,则强制中断线程,并将设置的value返回 boolean complete(V value) //强制返回值方法,如果线程完成了,则强制设置value为返回值,如果未完成,则强制完成线程,并将设置的value返回 void obtrudeValue(V value) //强制返回Exception,如果线程未完成,则强制完成 boolean completeExceptionally(Throwable t) //强制返回Exception,即使线程完成了,也强制完成,强制返回异常状态 void obtrudeException(V value)

complete方法,如果未完成强制完成
Supplier task = () -> { try { Thread.sleep(500); }catch(Exception e){} return "Hello World!"+Thread.currentThread().getName(); }; ExecutorService service = Executors.newSingleThreadExecutor(); CompletableFuture completableFuture = CompletableFuture.supplyAsync(task, service); completableFuture.complete("Too long"); String string = completableFuture.join(); System.out.println(string); service.shutdown(); //输出 Too long//但是如果另外一种写法 Supplier task = () -> { try { Thread.sleep(500); }catch(Exception e){} return "Hello World!"+Thread.currentThread().getName(); }; ExecutorService service = Executors.newSingleThreadExecutor(); CompletableFuture completableFuture = CompletableFuture.supplyAsync(task, service); try { Thread.sleep(800); }catch(Exception e){} completableFuture.complete("Too long"); String string = completableFuture.join(); System.out.println(string); service.shutdown(); //输出 Hello World!pool-1-thread-1

obtrudeValue方法,无论线程是否完成都设置value为返回值
Supplier task = () -> { try { Thread.sleep(500); }catch(Exception e){} return "Hello World!"+Thread.currentThread().getName(); }; ExecutorService service = Executors.newSingleThreadExecutor(); CompletableFuture completableFuture = CompletableFuture.supplyAsync(task, service); try { Thread.sleep(800); }catch(Exception e){} completableFuture.obtrudeValue("Too long"); String string = completableFuture.join(); System.out.println(string); service.shutdown(); //输出 Too long

first task: supplier supplyAsync(),runable runAsync()
seconde task:runable thenRun(),consumer thenAccept(),function thenApply()
//Runable()->System.out.println("Person read")void run()thenRun() CompletableFuture completableFuture = CompletableFuture.runAsync(()-> System.out.println("run first")) .thenRun(()-> System.out.println("then run")); //Consumern->System.out.println(n+"Person read")void accept()thenAccept() //Functionid -> readPersonFromDB(id)Person apply()thenApply()//具体执行方法 CompletableFuturecompletableFuture = CompletableFuture.supplyAsync(()-> Arrays.asList(1,2,3)) .thenApply(list -> readPerson(list)); completableFuture.thenRun(()-> System.out.println("The list of Person had been read")); completableFuture.thenAccept(persons -> System.out.println(persons.size())); public static List readPerson(List list){ List personList = new ArrayList(); for(int i :list ){ personList.add(new Person("name",i)); } return personList; } //输出 The list of Person had been read 3

在哪个线程执行
CompletableFuture c2 = CompletableFuture.runAsync()//common FJ Pool c2.thenApplyAsync()//provided pool of threads c2.thenRun()//the same thread c2.thenAcceptAsync()//the same pool of thread

CompletableFuture
public class TestCompletableFuture { public static void main(String[] args) throws Exception{ Supplier> supplyIDs = () -> { sleep(200); return Arrays.asList(1L,2L,3L); }; Function, List> fetchUsers = ids -> { sleep(300); return ids.stream().map(User::new).collect(Collectors.toList()); }; Consumer> displayer = users -> users.forEach(System.out::println); CompletableFuture> completableFuture = CompletableFuture.supplyAsync(supplyIDs); completableFuture.thenApply(fetchUsers).thenAccept(displayer); sleep(10000); } private static void sleep(int time){ try{ Thread.sleep(time); }catch (Exception e){ } } } //输出 User{id=1} User{id=2} User{id=3} //还可增加ExecutorService保持在单独的线程内执行 ExecutorService executorService = Executors.newSingleThreadExecutor(); CompletableFuture> completableFuture = CompletableFuture.supplyAsync(supplyIDs,executorService); completableFuture.thenApply(fetchUsers).thenAcceptAsync(displayer,executorService); Function, CompletableFuture>>内置CompletableFuture方法 public class TestCompletableFuture { public static void main(String[] args) throws Exception{ ExecutorService executorService = Executors.newSingleThreadExecutor(); ExecutorService executorService2 = Executors.newSingleThreadExecutor(); Supplier> supplyIDs = () -> { sleep(200); System.out.println("supplyIDs=="+Thread.currentThread().getName()); return Arrays.asList(1L,2L,3L); }; Function, CompletableFuture>> fetchUsers = ids -> { System.out.println("fetchUsers=="+Thread.currentThread().getName()); sleep(300); Supplier> userSuplier = () -> { System.out.println("userSuplier=="+Thread.currentThread().getName()); return ids.stream().map(User::new).collect(Collectors.toList()); }; return CompletableFuture.supplyAsync(userSuplier,executorService2); }; Consumer> displayer = users -> users.forEach(System.out::println); CompletableFuture> completableFuture = CompletableFuture.supplyAsync(supplyIDs,executorService); completableFuture.thenComposeAsync(fetchUsers,executorService2).thenAcceptAsync(displayer,executorService); sleep(10000); executorService.shutdown(); } private static void sleep(int time){ try{ Thread.sleep(time); }catch (Exception e){ } } }

thenAcceptBoth可以同时完成两个不同的CompletableFuture
ExecutorService executorService = Executors.newSingleThreadExecutor(); ExecutorService executorService2 = Executors.newSingleThreadExecutor(); Supplier> supplyIDs = () -> { sleep(200); System.out.println("supplyIDs=="+Thread.currentThread().getName()); return Arrays.asList(1L,2L,3L); }; Function, CompletableFuture>> fetchUsers = ids -> { System.out.println("fetchUsers=="+Thread.currentThread().getName()); sleep(300); Supplier> userSuplier = () -> { System.out.println("userSuplier=="+Thread.currentThread().getName()); return ids.stream().map(User::new).collect(Collectors.toList()); }; return CompletableFuture.supplyAsync(userSuplier,executorService2); }; Function, CompletableFuture>> fetchEmails = ids -> { System.out.println("fetchUsers=="+Thread.currentThread().getName()); sleep(300); Supplier> emailSuplier = () -> { System.out.println("userSuplier=="+Thread.currentThread().getName()); return ids.stream().map(Email::new).collect(Collectors.toList()); }; return CompletableFuture.supplyAsync(emailSuplier,executorService2); }; Consumer> displayer = users -> users.forEach(System.out::println); CompletableFuture> completableFuture = CompletableFuture.supplyAsync(supplyIDs,executorService); CompletableFuture> userComp= completableFuture.thenComposeAsync(fetchUsers,executorService2); CompletableFuture> emailComp= completableFuture.thenComposeAsync(fetchEmails,executorService2); userComp.thenAcceptBoth(emailComp, (user,email) ->{ System.out.println(user.size()+""+email.size()); }); sleep(1000); executorService.shutdown(); executorService2.shutdown();

acceptEither执行完成任何一个后再执行其他操作
public static void main(String[] args) throws Exception{ Supplier> supplyIDs = () -> { sleep(200); return Arrays.asList(1L,2L,3L); }; Function, CompletableFuture>> fetchUsers1 = ids -> { sleep(300); Supplier> userSuplier = () -> { return ids.stream().map(User::new).collect(Collectors.toList()); }; return CompletableFuture.supplyAsync(userSuplier); }; Function, CompletableFuture>> fetchUsers2 = ids -> { sleep(3000); Supplier> userSuplier = () -> { return ids.stream().map(User::new).collect(Collectors.toList()); }; return CompletableFuture.supplyAsync(userSuplier); }; Consumer> displayer = users -> users.forEach(System.out::println); CompletableFuture> completableFuture = CompletableFuture.supplyAsync(supplyIDs); CompletableFuture> user1= completableFuture.thenComposeAsync(fetchUsers1); CompletableFuture> user2= completableFuture.thenComposeAsync(fetchUsers2); user1.thenRun(()-> System.out.println("User1 done")); user2.thenRun(()-> System.out.println("User2 done")); user1.acceptEither(user2, displayer); sleep(100000); }

【Future/CompletableFuture与Lambda】CompletableFuture异常处理
exceptionally() whenComplete() handle()CompletableFuture> user3 = CompletableFuture.supplyAsync(supplyIDs) .thenApply(list -> readUser(list)) .exceptionally(excetion -> new ArrayList()); user3.thenRun(()-> System.out.println("user3 is run")); //whenComplete CompletableFuture> user3 = CompletableFuture.supplyAsync(supplyIDs) .thenApply(list -> readUser(list)) .whenComplete( (list, exception) -> { if (list !=null) System.out.println("run successfuly"); else System.out.println("run exception"); } ); //handle可以吞下异常,返回新的异常或者返回默认结果 CompletableFuture> user3 = CompletableFuture.supplyAsync(supplyIDs) .thenApply(list -> readUser(list)) .handle( (list, exception) -> { if (list !=null) return list; else return new ArrayList(); } );

    推荐阅读