001package com.facebook.swift.service;
002
003import org.jboss.netty.channel.ChannelHandlerContext;
004import org.jboss.netty.channel.Channels;
005import org.jboss.netty.channel.DownstreamMessageEvent;
006import org.jboss.netty.channel.MessageEvent;
007import org.jboss.netty.channel.SimpleChannelDownstreamHandler;
008import org.jboss.netty.handler.codec.http.DefaultHttpRequest;
009import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
010import org.jboss.netty.handler.codec.http.HttpMethod;
011import org.jboss.netty.handler.codec.http.HttpRequest;
012import com.facebook.nifty.core.ThriftMessage;
013import com.google.common.base.MoreObjects;
014import com.google.common.base.Supplier;
015
016import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
017import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK;
018
019/**
020 * XHR(XML Http Request)编码器<br>
021 * 将{@link com.facebook.nifty.core.NiftyDispatcher}输出的
022 * {@link ThriftMessage}响应数据转为{@link DownstreamMessageEvent},
023 * 
024 * @author guyadong
025 *
026 */
027public class ThriftXHREncoder extends SimpleChannelDownstreamHandler {
028        private static final Supplier<HttpRequest> DEFAULT_SUPPLIER = new Supplier<HttpRequest>() {
029
030                @Override
031                public HttpRequest get() {
032                        // 如果没有获取 HttpRequest 则使用用HTTP 1.1为Response的版本号
033                        return new DefaultHttpRequest(HTTP_1_1,HttpMethod.GET, "/");
034                }
035        };
036        private final Supplier<HttpRequest> currentRequest;
037        /**
038         * @param currentRequest 返回当前HTTP请求的{@link Supplier}实例,为{@code null}则使用默认实例{@link #DEFAULT_SUPPLIER}
039         */
040        public ThriftXHREncoder(Supplier<HttpRequest> currentRequest) {
041                super();
042                this.currentRequest = MoreObjects.firstNonNull(currentRequest,DEFAULT_SUPPLIER);
043        }
044        public ThriftXHREncoder(){
045                this(null);
046        }
047        @Override
048        public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
049                if(e.getMessage() instanceof ThriftMessage){
050                        ThriftMessage thriftMessage = (ThriftMessage)e.getMessage();
051                        if(thriftMessage.getBuffer().readable()){
052                                switch (thriftMessage.getTransportType()) {
053                                case UNFRAMED:
054                                        HttpRequest request = MoreObjects.firstNonNull(currentRequest.get(),DEFAULT_SUPPLIER.get());
055                    DefaultHttpResponse response = new DefaultHttpResponse(request.getProtocolVersion(), OK);
056                    // 将ThriftMessage中的数据装入HttpResponse
057                    response.setContent(thriftMessage.getBuffer());
058                                        ctx.sendDownstream(new DownstreamMessageEvent(ctx.getChannel(), 
059                                                        Channels.future(ctx.getChannel()), response, e.getRemoteAddress()));
060                                        return;
061                                default:
062                        throw new UnsupportedOperationException(
063                                        thriftMessage.getTransportType().name() + " transport is not supported");
064                                }
065                        }
066                }
067                super.writeRequested(ctx, e);
068        }
069
070}