001package net.gdface.thrift;
002
003import java.util.ArrayList;
004import java.util.List;
005import java.util.concurrent.Callable;
006import java.util.concurrent.ExecutionException;
007import java.util.concurrent.Executor;
008import java.util.concurrent.TimeUnit;
009import java.util.concurrent.TimeoutException;
010import java.util.concurrent.atomic.AtomicBoolean;
011
012import org.apache.commons.pool2.PooledObject;
013import org.apache.commons.pool2.PooledObjectFactory;
014import org.apache.commons.pool2.impl.DefaultPooledObject;
015import org.apache.commons.pool2.impl.GenericObjectPool;
016import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
017
018import com.facebook.nifty.client.FramedClientConnector;
019import com.facebook.nifty.client.NiftyClientChannel;
020import com.facebook.nifty.client.NiftyClientConnector;
021import com.facebook.swift.service.ThriftClient;
022import com.facebook.swift.service.ThriftClientConfig;
023import com.facebook.swift.service.ThriftClientManager;
024import com.google.common.base.Function;
025import com.google.common.base.Throwables;
026import com.google.common.cache.Cache;
027import com.google.common.cache.CacheBuilder;
028import com.google.common.net.HostAndPort;
029import com.google.common.util.concurrent.FutureCallback;
030import com.google.common.util.concurrent.ListenableFuture;
031import com.google.common.util.concurrent.MoreExecutors;
032
033import static com.google.common.net.HostAndPort.fromParts;
034import static com.google.common.net.HostAndPort.fromString;
035import static com.google.common.base.Preconditions.*;
036
037import io.airlift.units.Duration;
038/**
039 * Factory class for creating client instance <br>
040 * Example:<br>
041 * <pre>
042 * // get a FaceApi synchronized instance
043 * FaceApi client = ClientFactory.builder()
044 * .setHostAndPort("127.0.0.1",9090)
045 * .setTimeout(10,TimeUnit.SECONDS)
046 * .build(FaceApi.class,FaceApiThriftClient.class);
047 * </pre>
048 * @author guyadong
049 *
050 */
051public class ClientFactory {
052
053    private static class Singleton{
054        private static final ThriftClientManager CLIENT_MANAGER = new ThriftClientManager();    
055        static{
056            Runtime.getRuntime().addShutdownHook(new Thread(){
057                @Override
058                public void run() {
059                    CLIENT_MANAGER.close();
060                }});
061        }
062    }    
063    private static final Cache<Class<?>, ThriftClient<?>> THRIFT_CLIENT_CACHE = CacheBuilder.newBuilder().softValues().build();
064    private static final Cache<Class<?>, Object> CLIENT_CACHE = CacheBuilder.newBuilder().softValues().build();
065    /** 接口类 -- 实例资源池缓存 */
066    private static final Cache<Class<?>,GenericObjectPool<?>> INSTANCE_POOL_CACHE = CacheBuilder.newBuilder().softValues().build();;
067        private ThriftClientManager clientManager; 
068    private ThriftClientConfig thriftClientConfig = new ThriftClientConfig();
069    private HostAndPort hostAndPort;
070    private volatile NiftyClientConnector<? extends NiftyClientChannel> connector;
071    private String clientName = ThriftClientManager.DEFAULT_NAME;
072    private GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
073    private Executor executor = MoreExecutors.directExecutor();
074    /**
075     * 接口实例代理函数对象,
076     * 如果提供了该函数对象(不为null),则在创建接口实例时调用该函数,
077     * 将thrift/swift创建的接口实例转为代理实例,
078     */
079    private Function<Object, Object> decorator = null;
080    protected ClientFactory() {
081    }
082
083    public ClientFactory setManager(ThriftClientManager clientManager){
084        this.clientManager = clientManager;
085        return this;
086    }
087    public ClientFactory setThriftClientConfig(ThriftClientConfig thriftClientConfig) {
088        this.thriftClientConfig = thriftClientConfig;
089        return this;
090    }
091    /**
092     * set all timeout arguments
093     * @param time
094     * @param unit
095     * @return
096     * @see #setConnectTimeout(Duration)
097     * @see #setReceiveTimeout(Duration)
098     * @see #setReadTimeout(Duration)
099     * @see #setWriteTimeout(Duration)
100     */
101    public ClientFactory setTimeout(Duration timeout){
102        setConnectTimeout(timeout);
103        setReceiveTimeout(timeout);
104        setReadTimeout(timeout);
105        setWriteTimeout(timeout);
106        return this;
107    }
108    public ClientFactory setTimeout(long time,TimeUnit unit){
109        return setTimeout(new Duration(time,unit));
110    }
111    public ClientFactory setConnectTimeout(Duration connectTimeout) {
112        thriftClientConfig.setConnectTimeout(connectTimeout);
113        return this;
114    }
115    public ClientFactory setReceiveTimeout(Duration receiveTimeout) {
116        thriftClientConfig.setReceiveTimeout(receiveTimeout);
117        return this;
118    }
119    public ClientFactory setReadTimeout(Duration readTimeout) {
120        thriftClientConfig.setReadTimeout(readTimeout);
121        return this;
122    }
123    public ClientFactory setWriteTimeout(Duration writeTimeout) {
124        thriftClientConfig.setWriteTimeout(writeTimeout);
125        return this;
126    }
127    public ClientFactory setSocksProxy(HostAndPort socksProxy) {
128        thriftClientConfig.setSocksProxy(socksProxy);
129        return this;
130    }
131    public ClientFactory setMaxFrameSize(int maxFrameSize) {
132        thriftClientConfig.setMaxFrameSize(maxFrameSize);
133        return this;
134    }
135    public ClientFactory setHostAndPort(HostAndPort hostAndPort) {
136        if(null == this.hostAndPort){
137                synchronized(this){
138                        if(null == this.hostAndPort){
139                                this.hostAndPort = checkNotNull(hostAndPort,"hostAndPort must not be null");
140                        }
141                }
142        }else{
143                throw new IllegalStateException("the memeber hostAndPort be initialized always");
144        }
145        return this;
146    }
147    public ClientFactory setHostAndPort(String host,int port) {
148        return setHostAndPort(fromParts(host, port));
149    }
150    public ClientFactory setHostAndPort(String host) {
151        return setHostAndPort(fromString(host));
152    }
153    public ClientFactory setConnector(NiftyClientConnector<? extends NiftyClientChannel> connector) {
154        this.connector = connector;
155        return this;
156    }
157    public ClientFactory setClientName(String clientName) {
158        this.clientName = clientName;
159        return this;
160    }
161    /**
162     * 设置资源池配置参数
163     * @param poolConfig
164     * @return
165     */
166    public synchronized ClientFactory setPoolConfig(GenericObjectPoolConfig poolConfig) {
167                this.poolConfig = checkNotNull(poolConfig,"poolConfig is null");
168                return this;
169        }
170
171        public synchronized ClientFactory setExecutor(Executor executor) {
172                this.executor = checkNotNull(executor,"executor is null");
173                return this;
174        }
175
176        public Executor getExecutor() {
177                return executor;
178        }
179
180        @SuppressWarnings("unchecked")
181        public <T> ClientFactory setDecorator(Function<T, T> decorator) {
182                this.decorator = (Function<Object, Object>) decorator;
183                return this;
184        }
185        public HostAndPort getHostAndPort(){
186        return checkNotNull(this.hostAndPort,"hostAndPort is null");
187    }
188    private NiftyClientConnector<? extends NiftyClientChannel> getConnector(){
189        if(null == this.connector){
190                synchronized(this){
191                        if(null == this.connector){
192                                this.connector = new FramedClientConnector(this.getHostAndPort());
193                        }                       
194                }
195        }
196        return this.connector;
197    }
198    private ThriftClientManager getClientManager(){
199        if(null == this.clientManager){
200                synchronized(this){
201                        if(null == this.clientManager){
202                                this.clientManager = Singleton.CLIENT_MANAGER;
203                        }
204                }
205        }
206        return this.clientManager;
207    }
208
209    @SuppressWarnings("unchecked")
210        private <T>ThriftClient<T> getThriftClient(final Class<T> interfaceClass) {
211        try {
212                        return (ThriftClient<T>) THRIFT_CLIENT_CACHE.get(checkNotNull(interfaceClass,"interfaceClass is null"), new Callable<ThriftClient<?>>(){
213                                        @Override
214                                        public ThriftClient<?> call() throws Exception {
215                                                return new ThriftClient<T>(
216                                                getClientManager(),
217                                                interfaceClass,
218                                                thriftClientConfig,
219                                                clientName);
220                                        }});
221                } catch (Exception e) {
222                Throwables.throwIfUnchecked(e);
223            throw new RuntimeException(e);
224        }
225    }
226        @SuppressWarnings("unchecked")
227        private <T> GenericObjectPool<T> getObjectPool(final Class<T> interfaceClass){
228                try{
229                        return (GenericObjectPool<T>) INSTANCE_POOL_CACHE.get(checkNotNull(interfaceClass,"interfaceClass is null"), 
230                                        new Callable<GenericObjectPool<T>>(){
231                                @Override
232                                public GenericObjectPool<T> call() throws Exception {
233                                        return new GenericObjectPool<T>(new ClientInstanceFactory<T>(interfaceClass),poolConfig);
234                                }});
235                } catch (Exception e) {
236                Throwables.throwIfUnchecked(e);
237            throw new RuntimeException(e);
238        }
239        }
240    /**
241         * 返回{@code instance}对应的资源池对象
242         * @param instance
243         * @return
244         */
245        @SuppressWarnings("unchecked")
246        private <T> GenericObjectPool<T> getObjectPoolByInstance(T instance){
247                checkArgument(null != instance,"intance is null");
248                List<GenericObjectPool<?>> found = new ArrayList<GenericObjectPool<?>>(1);
249                for(Class<?> clazz : instance.getClass().getInterfaces()){
250                        GenericObjectPool<?> pool = INSTANCE_POOL_CACHE.getIfPresent(clazz) ;
251                        if(null !=pool){
252                                found.add(pool);
253                        }
254                }
255                checkState(found.size() ==1,"%s is not valid instance of thrift client",instance.getClass().getName());
256                return (GenericObjectPool<T>) found.get(0);
257        }
258
259        public <T>void closeObjectPool(Class<T> interfaceClass){
260                if(null != interfaceClass){
261                        GenericObjectPool<?> pool = INSTANCE_POOL_CACHE.getIfPresent(interfaceClass);
262                        if(null != pool){
263                                pool.close();
264                        }
265                }
266        }
267
268        /**
269         * thrift client 实例不是线程安全的,只可单线程独占使用,所以每次调用实例时要向资源池{@link GenericObjectPool}
270     * 申请一个{@code interfaceClass}的实例,用完后调用{@link #releaseInstance(Object)}归还,其他线程才可重复使用。
271     * @param interfaceClass 接口类,不可为{@code null}
272     * @return
273     */
274    public <T>T applyInstance(Class<T> interfaceClass) {
275        try {
276                return getObjectPool(interfaceClass).borrowObject();
277                } catch (Exception e) {
278                Throwables.throwIfUnchecked(e);
279            throw new RuntimeException(e);
280        }
281    }
282    /**
283     * 释放{@code instance}实例使用权,必须和{@link #applyInstance(Class)}配对使用
284     * @param instance 接口实例
285     */
286    public <T>void releaseInstance(T instance){
287        if(null != instance){
288                getObjectPoolByInstance(instance).returnObject(instance);
289        }
290    }
291    private class ClientInstanceFactory<T> implements PooledObjectFactory<T>{
292        private final Class<T> interfaceClass;
293
294                private ClientInstanceFactory(Class<T> interfaceClass) {
295                        checkArgument(null != interfaceClass && interfaceClass.isInterface());
296                        this.interfaceClass = interfaceClass;
297                }
298                private NiftyClientChannel getChannel(PooledObject<T>p){
299                return (NiftyClientChannel) getClientManager().getRequestChannel(p.getObject());
300                }
301                @Override
302                public PooledObject<T> makeObject() throws Exception {
303                        T obj = getThriftClient(interfaceClass).open(getClientManager().createChannel(getConnector()).get());
304                        return new DefaultPooledObject<T>(obj);
305                }
306
307                @Override
308                public void destroyObject(PooledObject<T> p) throws Exception {
309                getChannel(p).close();
310                }
311
312                @Override
313                public boolean validateObject(PooledObject<T> p) {
314                        return getChannel(p).getNettyChannel().isConnected();
315                }
316
317                @Override
318                public void activateObject(PooledObject<T> p) throws Exception {
319                        // socket连接长时间空闲会被自动关闭,
320                        // 为确保borrowObject方法返回的实例有效,在这里要检查对象是否被关闭
321                        // 否则返回的对象可能因为长时间空闲连接被关闭而在使用时导致连接关闭异常
322                        if(!validateObject(p)){
323                                throw new IllegalStateException();
324                        }
325                }
326
327                @Override
328                public void passivateObject(PooledObject<T> p) throws Exception {
329                }
330        
331    }
332        public static ClientFactory builder() {
333                return new ClientFactory();
334        }
335    /**
336     * 构造{@code interfaceClass}实例<br>
337     * thriftyImplClass为null时,假设destClass为thrifty 异步实例类型
338     * @param <I> 接口类
339     * @param <O> 返回实例类型
340     * @param <T> 基于thrifty实现接口类的实例类型
341     * @param interfaceClass
342     * @param thriftyImplClass
343     * @param destClass 返回的实例类型,如果interfaceClass和thriftyImplClass为null,必须有参数为{@link ClientFactory}的构造函数
344     * 否则必须有参数类型为interfaceClass的构造函数
345     * @return 返回 {@code destClass }实例
346     */
347    @SuppressWarnings("unchecked")
348    public<I,T extends I,O> O  build(final Class<I> interfaceClass,final Class<T> thriftyImplClass,final Class<O> destClass){
349        try {
350            return (O) CLIENT_CACHE.get(interfaceClass, new Callable<Object>(){
351                @Override
352                public Object call() throws Exception {
353                        if(thriftyImplClass == null){
354                                // destClass 为异步模式实例
355                        return destClass.getDeclaredConstructor(ClientFactory.class).newInstance(ClientFactory.this); 
356                        }
357                                T instance =thriftyImplClass.getDeclaredConstructor(ClientFactory.class).newInstance(ClientFactory.this);
358                        if(decorator !=null){
359                                instance = (T) decorator.apply(instance);
360                        }
361                    return destClass.getDeclaredConstructor(interfaceClass).newInstance(instance);
362                }});
363        } catch (ExecutionException e) {
364                Throwables.throwIfUnchecked(e.getCause());
365            throw new RuntimeException(e.getCause());
366                }
367    }
368        /**
369         * 构造{@code interfaceClass}实例<br>
370         * {@link #build(Class, Class, Class)}的简化版本,当thriftImplClass只实现了一个接口时,自动推断接口类型
371         * @param thriftyImplClass
372         * @param destClass
373         * @return
374         * @see #build(Class, Class, Class)
375         */
376        @SuppressWarnings("unchecked")
377        public<I,O,T extends I> O  build(Class<T> thriftyImplClass,Class<O> destClass){
378                checkArgument(thriftyImplClass != null);
379                checkArgument(thriftyImplClass.getInterfaces().length ==1,
380                                "can't determines interface class from %s",thriftyImplClass.getName());
381                Class<I> interfaceClass = (Class<I>) thriftyImplClass.getInterfaces()[0];
382                return build(interfaceClass,thriftyImplClass,destClass);
383    }
384        
385        /**
386         * 测试当前连接是否有效
387         * @return 连接有效返回{@code true},否则返回{@code false}
388         */
389        public boolean testConnect(){
390                try {
391                        NiftyClientChannel channel = getClientManager().createChannel(getConnector()).get();
392                        channel.close();
393                        return true;
394                } catch (Exception e) {                 
395                }
396                return false;
397        }
398    public <V> void addCallback(
399            final ListenableFuture<V> future,
400            final FutureCallback<? super V> callback) {
401        ThriftUtils.addCallback(future, callback, getExecutor());
402    }
403        @Override
404        public String toString() {
405                StringBuilder builder = new StringBuilder();
406                builder.append("ClientFactory [hostAndPort=");
407                builder.append(hostAndPort);
408                builder.append(", clientName=");
409                builder.append(clientName);
410                builder.append("]");
411                return builder.toString();
412        }
413        /**
414         * 将{@code future} 封装为{@link ListenableFutureDecorator}实例
415         * @param async thrift 异步接口实例
416         * @param future 异步返回结果实例
417         * @return {@link ListenableFutureDecorator}实例
418         */
419        @SuppressWarnings("unchecked")
420        public <A,V>ListenableFutureDecorator<A,V>wrap(A async,ListenableFuture<V> future){
421                if(future instanceof ListenableFutureDecorator){
422                        return (ListenableFutureDecorator<A,V>)future;
423                }
424                return new ListenableFutureDecorator<A, V>(async,future);       
425        }
426    /**
427     * {@link ListenableFuture}接口的装饰类,
428     * 用于确保异步调用结束时释放异步接口实例,参见{@link ClientFactory#releaseInstance(Object)}
429     * @author guyadong
430     *
431     * @param <A> thrift 异步接口类型
432     * @param <V> 方法返回值类型
433     */
434    public class ListenableFutureDecorator<A,V> implements ListenableFuture<V>{
435        private final A async;
436        private final ListenableFuture<V> future;
437        /** 确保 {@link #releaseAsync()}方法只被调用一次的标志字段 */
438        private final AtomicBoolean released = new AtomicBoolean(false);
439                public ListenableFutureDecorator(A async, ListenableFuture<V> future) {
440                        this.async = checkNotNull(async,"async is null");
441                        this.future = checkNotNull(future,"future is null");
442                }
443                private void releaseAsync(){
444                        if(released.compareAndSet(false, true)){
445                                releaseInstance(async); 
446                        }
447                }
448                @Override
449                public boolean cancel(boolean mayInterruptIfRunning) {
450                        return future.cancel(mayInterruptIfRunning);
451                }
452
453                @Override
454                public V get() throws InterruptedException, ExecutionException {
455                        try{
456                                return future.get();
457                        }finally{
458                                releaseAsync();                         
459                        }
460                }
461
462                @Override
463                public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
464                        try{
465                                return future.get(timeout, unit);
466                        }finally{
467                                releaseAsync();
468                        }
469                }
470
471                @Override
472                public boolean isCancelled() {
473                        return future.isCancelled();
474                }
475
476                @Override
477                public boolean isDone() {
478                        return future.isDone();
479                }
480
481                @Override
482                public void addListener(Runnable listener, Executor executor) {
483                        future.addListener(listener, executor);                 
484                }       
485    }
486    static{
487                // JVM 结束时自动清除资源池中所有对象
488                Runtime.getRuntime().addShutdownHook(new Thread(){
489
490                        @Override
491                        public void run() {
492                                for(GenericObjectPool<?> pool:INSTANCE_POOL_CACHE.asMap().values()){
493                                        pool.close();
494                                }
495                        }                       
496                });
497    }
498}