001package com.facebook.swift.service; 002 003import org.jboss.netty.channel.ChannelHandlerContext; 004import org.jboss.netty.channel.Channels; 005import org.jboss.netty.channel.DownstreamMessageEvent; 006import org.jboss.netty.channel.MessageEvent; 007import org.jboss.netty.channel.SimpleChannelDownstreamHandler; 008import org.jboss.netty.handler.codec.http.DefaultHttpRequest; 009import org.jboss.netty.handler.codec.http.DefaultHttpResponse; 010import org.jboss.netty.handler.codec.http.HttpMethod; 011import org.jboss.netty.handler.codec.http.HttpRequest; 012import com.facebook.nifty.core.ThriftMessage; 013import com.google.common.base.MoreObjects; 014import com.google.common.base.Supplier; 015 016import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1; 017import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK; 018 019/** 020 * XHR(XML Http Request)ç¼–ç 器<br> 021 * å°†{@link com.facebook.nifty.core.NiftyDispatcher}输出的 022 * {@link ThriftMessage}å“应数æ®è½¬ä¸º{@link DownstreamMessageEvent}, 023 * 024 * @author guyadong 025 * 026 */ 027public class ThriftXHREncoder extends SimpleChannelDownstreamHandler { 028 private static final Supplier<HttpRequest> DEFAULT_SUPPLIER = new Supplier<HttpRequest>() { 029 030 @Override 031 public HttpRequest get() { 032 // å¦‚æžœæ²¡æœ‰èŽ·å– HttpRequest 则使用用HTTP 1.1为Responseçš„ç‰ˆæœ¬å· 033 return new DefaultHttpRequest(HTTP_1_1,HttpMethod.GET, "/"); 034 } 035 }; 036 private final Supplier<HttpRequest> currentRequest; 037 /** 038 * @param currentRequest 返回当å‰HTTP请求的{@link Supplier}实例,为{@code null}则使用默认实例{@link #DEFAULT_SUPPLIER} 039 */ 040 public ThriftXHREncoder(Supplier<HttpRequest> currentRequest) { 041 super(); 042 this.currentRequest = MoreObjects.firstNonNull(currentRequest,DEFAULT_SUPPLIER); 043 } 044 public ThriftXHREncoder(){ 045 this(null); 046 } 047 @Override 048 public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception { 049 if(e.getMessage() instanceof ThriftMessage){ 050 ThriftMessage thriftMessage = (ThriftMessage)e.getMessage(); 051 if(thriftMessage.getBuffer().readable()){ 052 switch (thriftMessage.getTransportType()) { 053 case UNFRAMED: 054 HttpRequest request = MoreObjects.firstNonNull(currentRequest.get(),DEFAULT_SUPPLIER.get()); 055 DefaultHttpResponse response = new DefaultHttpResponse(request.getProtocolVersion(), OK); 056 // å°†ThriftMessageä¸çš„æ•°æ®è£…å…¥HttpResponse 057 response.setContent(thriftMessage.getBuffer()); 058 ctx.sendDownstream(new DownstreamMessageEvent(ctx.getChannel(), 059 Channels.future(ctx.getChannel()), response, e.getRemoteAddress())); 060 return; 061 default: 062 throw new UnsupportedOperationException( 063 thriftMessage.getTransportType().name() + " transport is not supported"); 064 } 065 } 066 } 067 super.writeRequested(ctx, e); 068 } 069 070}