Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server async without callback #85

Closed
yuyijq opened this issue Sep 12, 2014 · 2 comments
Closed

server async without callback #85

yuyijq opened this issue Sep 12, 2014 · 2 comments
Assignees

Comments

@yuyijq
Copy link
Contributor

yuyijq commented Sep 12, 2014

AsyncHeaderExchangeHandler.java

asyncheader=com.alibaba.dubbo.remoting.exchange.support.header.AsyncHeaderExchanger

package com.alibaba.dubbo.remoting.exchange.support.header;

import com.alibaba.dubbo.remoting.Channel;
import com.alibaba.dubbo.remoting.RemotingException;
import com.alibaba.dubbo.remoting.exchange.ExchangeChannel;
import com.alibaba.dubbo.remoting.exchange.ExchangeHandler;
import com.alibaba.dubbo.remoting.exchange.Request;
import com.alibaba.dubbo.remoting.exchange.Response;
import com.alibaba.dubbo.rpc.Result;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * User: zhaohuiyu
 * Date: 12/15/13
 * Time: 2:10 AM
 */
public class AsyncHeaderExchangeHandler extends HeaderExchangeHandler {
    private static final Logger logger = LoggerFactory.getLogger(AsyncHeaderExchangeHandler.class);

    public AsyncHeaderExchangeHandler(ExchangeHandler handler) {
        super(handler);
    }

    public void received(final Channel channel, final Object message) throws RemotingException {
        if (!(message instanceof Request)) {
            super.received(channel, message);
            return;
        }

        Request request = (Request) message;
        if (!request.isTwoWay()) {
            super.received(channel, message);
            return;
        }

        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
        ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        try {
            final Response response = handleRequest(exchangeChannel, request);
            Object o = response.getResult();
            if (!(o instanceof Result)) {
                channel.send(response);
                return;
            }
            Result result = (Result) o;
            Object value = result.getValue();
            if (!(value instanceof FutureImpl)) {
                channel.send(response);
                return;
            }
            //如果是future对象,则并不立即将结果写回
            //而是等到异步执行流程执行完毕后返回,即返回过程作为异步请求的continue
            FutureImpl future = (FutureImpl) value;

            //直接在业务线程里处理更好,不用切换到别的线程
            future.onComplete(new Runnable() {
                @Override
                public void run() {
                    try {
                        channel.send(response);
                    } catch (RemotingException e) {
                        logger.error("send response to remote error", e);
                    }
                }
            });
        } finally {
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
    }
}

AsyncHeaderExchanger.java

package com.alibaba.dubbo.remoting.exchange.support.header;

import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.remoting.RemotingException;
import com.alibaba.dubbo.remoting.Transporters;
import com.alibaba.dubbo.remoting.exchange.ExchangeClient;
import com.alibaba.dubbo.remoting.exchange.ExchangeHandler;
import com.alibaba.dubbo.remoting.exchange.ExchangeServer;
import com.alibaba.dubbo.remoting.exchange.Exchanger;
import com.alibaba.dubbo.remoting.transport.DecodeHandler;

/**
 * User: zhaohuiyu
 * Date: 12/15/13
 * Time: 2:07 AM
 */
public class AsyncHeaderExchanger implements Exchanger {
    @Override
    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new AsyncHeaderExchangeHandler(handler))));
    }

    @Override
    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }
}

Future.java

package com.alibaba.dubbo.remoting.exchange.support.header;

import java.io.Serializable;

/**
 * User: zhaohuiyu
 * Date: 9/13/13
 * Time: 5:53 PM
 */
public interface Future<T extends Serializable> extends Serializable {
    T get();
}

FutureImpl.java

package com.alibaba.dubbo.remoting.exchange.support.header;

import java.io.Serializable;
import java.util.concurrent.Executor;

/**
 * User: zhaohuiyu
 * Date: 12/28/13
 * Time: 10:36 AM
 */
public class FutureImpl<T extends Serializable> implements Future<T>, Serializable {
    private static final long serialVersionUID = 8675742628361519458L;

    private transient Runnable runnable;

    private transient Executor executor;

    private volatile boolean isDone = false;

    private T result;

    public static <U extends Serializable> FutureImpl<U> newFuture() {
        return new FutureImpl<U>();
    }

    public void onComplete(Runnable runnable) {
        onComplete(runnable, null);
    }

    public void onComplete(Runnable runnable, Executor executor) {
        if(runnable == null) throw new IllegalArgumentException("runnable can not null");
        this.runnable = runnable;
        this.executor = executor;
        if (isDone) {
            trigger();
        }
    }

    private void trigger() {
        if (runnable == null) return;
        if (executor == null) {
            //run in current thread
            runnable.run();
        } else {
            executor.execute(runnable);
        }
    }

    public void done(T result) {
        this.result = result;
        this.isDone = true;
        trigger();
    }

    @Override
    public T get() {
        return result;
    }
}

上面是代码
下面是使用方式

package com.alibaba.dubbo.async;
public interface Service{
   //方法的返回值必须是Future,这里的future就是个占位符而已,不是j.u.c里的Future
   Future<String> query();
}
public class ServiceImpl{
  Future<String> query(){
    final FutureImpl<String> future = FutureImpl.newFuture();
    new Thread(new Runnable(){
        @Override
         public void run(){
              //假设这里异步干些事儿
              try{
                  Thread.sleep(10000);
               }catch(Exception e){}
              //干完了之后把结果返回去
             future.done("done");
         }
    }).start();
    return future;
  }
}

//provider

    <dubbo:service interface="com.alibaba.dubbo.async.Service"
                   ref="service"
                   version="1.0.0"
                   timeout="2000"
                   protocol="async"
            >
        //这里要注意
        <dubbo:parameter key="exchanger" value="asyncheader" />
    </dubbo:service>

//consumer

    <dubbo:reference id="service" interface="com.alibaba.dubbo.async.Service" version="1.0.0" check="false">
    </dubbo:reference>

调用:

//千万注意,query的返回结果不是j.u.c的Future,所以这里的阻塞还是在query()方法上,不是get上
String result = service.query().get();
@diecui1202
Copy link

As we have provide new async invoke in 2.7.x, I think this issue can be closed.

2.7.x is on the way. @yuyijq you also can build from master to try the new async invoke.

&READY-TO-CLOSE&

@diecui1202
Copy link

check #1957 for more infos.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants