We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AsyncHeaderExchangeHandler.java
asyncheader=com.alibaba.dubbo.remoting.exchange.support.header.AsyncHeaderExchanger
package com.alibaba.dubbo.remoting.exchange.support.header; import com.alibaba.dubbo.remoting.Channel; import com.alibaba.dubbo.remoting.RemotingException; import com.alibaba.dubbo.remoting.exchange.ExchangeChannel; import com.alibaba.dubbo.remoting.exchange.ExchangeHandler; import com.alibaba.dubbo.remoting.exchange.Request; import com.alibaba.dubbo.remoting.exchange.Response; import com.alibaba.dubbo.rpc.Result; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * User: zhaohuiyu * Date: 12/15/13 * Time: 2:10 AM */ public class AsyncHeaderExchangeHandler extends HeaderExchangeHandler { private static final Logger logger = LoggerFactory.getLogger(AsyncHeaderExchangeHandler.class); public AsyncHeaderExchangeHandler(ExchangeHandler handler) { super(handler); } public void received(final Channel channel, final Object message) throws RemotingException { if (!(message instanceof Request)) { super.received(channel, message); return; } Request request = (Request) message; if (!request.isTwoWay()) { super.received(channel, message); return; } channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); try { final Response response = handleRequest(exchangeChannel, request); Object o = response.getResult(); if (!(o instanceof Result)) { channel.send(response); return; } Result result = (Result) o; Object value = result.getValue(); if (!(value instanceof FutureImpl)) { channel.send(response); return; } //如果是future对象,则并不立即将结果写回 //而是等到异步执行流程执行完毕后返回,即返回过程作为异步请求的continue FutureImpl future = (FutureImpl) value; //直接在业务线程里处理更好,不用切换到别的线程 future.onComplete(new Runnable() { @Override public void run() { try { channel.send(response); } catch (RemotingException e) { logger.error("send response to remote error", e); } } }); } finally { HeaderExchangeChannel.removeChannelIfDisconnected(channel); } } }
AsyncHeaderExchanger.java
package com.alibaba.dubbo.remoting.exchange.support.header; import com.alibaba.dubbo.common.URL; import com.alibaba.dubbo.remoting.RemotingException; import com.alibaba.dubbo.remoting.Transporters; import com.alibaba.dubbo.remoting.exchange.ExchangeClient; import com.alibaba.dubbo.remoting.exchange.ExchangeHandler; import com.alibaba.dubbo.remoting.exchange.ExchangeServer; import com.alibaba.dubbo.remoting.exchange.Exchanger; import com.alibaba.dubbo.remoting.transport.DecodeHandler; /** * User: zhaohuiyu * Date: 12/15/13 * Time: 2:07 AM */ public class AsyncHeaderExchanger implements Exchanger { @Override public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new AsyncHeaderExchangeHandler(handler)))); } @Override public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); } }
Future.java
package com.alibaba.dubbo.remoting.exchange.support.header; import java.io.Serializable; /** * User: zhaohuiyu * Date: 9/13/13 * Time: 5:53 PM */ public interface Future<T extends Serializable> extends Serializable { T get(); }
FutureImpl.java
package com.alibaba.dubbo.remoting.exchange.support.header; import java.io.Serializable; import java.util.concurrent.Executor; /** * User: zhaohuiyu * Date: 12/28/13 * Time: 10:36 AM */ public class FutureImpl<T extends Serializable> implements Future<T>, Serializable { private static final long serialVersionUID = 8675742628361519458L; private transient Runnable runnable; private transient Executor executor; private volatile boolean isDone = false; private T result; public static <U extends Serializable> FutureImpl<U> newFuture() { return new FutureImpl<U>(); } public void onComplete(Runnable runnable) { onComplete(runnable, null); } public void onComplete(Runnable runnable, Executor executor) { if(runnable == null) throw new IllegalArgumentException("runnable can not null"); this.runnable = runnable; this.executor = executor; if (isDone) { trigger(); } } private void trigger() { if (runnable == null) return; if (executor == null) { //run in current thread runnable.run(); } else { executor.execute(runnable); } } public void done(T result) { this.result = result; this.isDone = true; trigger(); } @Override public T get() { return result; } }
上面是代码 下面是使用方式
package com.alibaba.dubbo.async; public interface Service{ //方法的返回值必须是Future,这里的future就是个占位符而已,不是j.u.c里的Future Future<String> query(); }
public class ServiceImpl{ Future<String> query(){ final FutureImpl<String> future = FutureImpl.newFuture(); new Thread(new Runnable(){ @Override public void run(){ //假设这里异步干些事儿 try{ Thread.sleep(10000); }catch(Exception e){} //干完了之后把结果返回去 future.done("done"); } }).start(); return future; } }
//provider
<dubbo:service interface="com.alibaba.dubbo.async.Service" ref="service" version="1.0.0" timeout="2000" protocol="async" > //这里要注意 <dubbo:parameter key="exchanger" value="asyncheader" /> </dubbo:service>
//consumer
<dubbo:reference id="service" interface="com.alibaba.dubbo.async.Service" version="1.0.0" check="false"> </dubbo:reference>
调用:
//千万注意,query的返回结果不是j.u.c的Future,所以这里的阻塞还是在query()方法上,不是get上 String result = service.query().get();
The text was updated successfully, but these errors were encountered:
As we have provide new async invoke in 2.7.x, I think this issue can be closed.
2.7.x is on the way. @yuyijq you also can build from master to try the new async invoke.
&READY-TO-CLOSE&
Sorry, something went wrong.
check #1957 for more infos.
chickenlj
No branches or pull requests
AsyncHeaderExchangeHandler.java
asyncheader=com.alibaba.dubbo.remoting.exchange.support.header.AsyncHeaderExchanger
AsyncHeaderExchanger.java
Future.java
FutureImpl.java
上面是代码
下面是使用方式
//provider
//consumer
调用:
The text was updated successfully, but these errors were encountered: