001package net.gdface.thrift; 002 003import java.util.ArrayList; 004import java.util.List; 005import java.util.concurrent.Callable; 006import java.util.concurrent.ExecutionException; 007import java.util.concurrent.Executor; 008import java.util.concurrent.TimeUnit; 009import java.util.concurrent.TimeoutException; 010import java.util.concurrent.atomic.AtomicBoolean; 011 012import org.apache.commons.pool2.PooledObject; 013import org.apache.commons.pool2.PooledObjectFactory; 014import org.apache.commons.pool2.impl.DefaultPooledObject; 015import org.apache.commons.pool2.impl.GenericObjectPool; 016import org.apache.commons.pool2.impl.GenericObjectPoolConfig; 017 018import com.facebook.nifty.client.FramedClientConnector; 019import com.facebook.nifty.client.NiftyClientChannel; 020import com.facebook.nifty.client.NiftyClientConnector; 021import com.facebook.swift.service.ThriftClient; 022import com.facebook.swift.service.ThriftClientConfig; 023import com.facebook.swift.service.ThriftClientManager; 024import com.google.common.base.Function; 025import com.google.common.base.Throwables; 026import com.google.common.cache.Cache; 027import com.google.common.cache.CacheBuilder; 028import com.google.common.net.HostAndPort; 029import com.google.common.util.concurrent.FutureCallback; 030import com.google.common.util.concurrent.ListenableFuture; 031import com.google.common.util.concurrent.MoreExecutors; 032 033import static com.google.common.net.HostAndPort.fromParts; 034import static com.google.common.net.HostAndPort.fromString; 035import static com.google.common.base.Preconditions.*; 036 037import io.airlift.units.Duration; 038/** 039 * Factory class for creating client instance <br> 040 * Example:<br> 041 * <pre> 042 * // get a FaceApi synchronized instance 043 * FaceApi client = ClientFactory.builder() 044 * .setHostAndPort("127.0.0.1",9090) 045 * .setTimeout(10,TimeUnit.SECONDS) 046 * .build(FaceApi.class,FaceApiThriftClient.class); 047 * </pre> 048 * @author guyadong 049 * 050 */ 051public class ClientFactory { 052 053 private static class Singleton{ 054 private static final ThriftClientManager CLIENT_MANAGER = new ThriftClientManager(); 055 static{ 056 Runtime.getRuntime().addShutdownHook(new Thread(){ 057 @Override 058 public void run() { 059 CLIENT_MANAGER.close(); 060 }}); 061 } 062 } 063 private static final Cache<Class<?>, ThriftClient<?>> THRIFT_CLIENT_CACHE = CacheBuilder.newBuilder().softValues().build(); 064 private static final Cache<Class<?>, Object> CLIENT_CACHE = CacheBuilder.newBuilder().softValues().build(); 065 /** 接å£ç±» -- å®žä¾‹èµ„æºæ± ç¼“å˜ */ 066 private static final Cache<Class<?>,GenericObjectPool<?>> INSTANCE_POOL_CACHE = CacheBuilder.newBuilder().softValues().build();; 067 private ThriftClientManager clientManager; 068 private ThriftClientConfig thriftClientConfig = new ThriftClientConfig(); 069 private HostAndPort hostAndPort; 070 private volatile NiftyClientConnector<? extends NiftyClientChannel> connector; 071 private String clientName = ThriftClientManager.DEFAULT_NAME; 072 private GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig(); 073 private Executor executor = MoreExecutors.directExecutor(); 074 /** 075 * 接å£å®žä¾‹ä»£ç†å‡½æ•°å¯¹è±¡, 076 * 如果æä¾›äº†è¯¥å‡½æ•°å¯¹è±¡(ä¸ä¸ºnull),则在创建接å£å®žä¾‹æ—¶è°ƒç”¨è¯¥å‡½æ•°ï¼Œ 077 * å°†thrift/swift创建的接å£å®žä¾‹è½¬ä¸ºä»£ç†å®žä¾‹, 078 */ 079 private Function<Object, Object> decorator = null; 080 protected ClientFactory() { 081 } 082 083 public ClientFactory setManager(ThriftClientManager clientManager){ 084 this.clientManager = clientManager; 085 return this; 086 } 087 public ClientFactory setThriftClientConfig(ThriftClientConfig thriftClientConfig) { 088 this.thriftClientConfig = thriftClientConfig; 089 return this; 090 } 091 /** 092 * set all timeout arguments 093 * @param time 094 * @param unit 095 * @return 096 * @see #setConnectTimeout(Duration) 097 * @see #setReceiveTimeout(Duration) 098 * @see #setReadTimeout(Duration) 099 * @see #setWriteTimeout(Duration) 100 */ 101 public ClientFactory setTimeout(Duration timeout){ 102 setConnectTimeout(timeout); 103 setReceiveTimeout(timeout); 104 setReadTimeout(timeout); 105 setWriteTimeout(timeout); 106 return this; 107 } 108 public ClientFactory setTimeout(long time,TimeUnit unit){ 109 return setTimeout(new Duration(time,unit)); 110 } 111 public ClientFactory setConnectTimeout(Duration connectTimeout) { 112 thriftClientConfig.setConnectTimeout(connectTimeout); 113 return this; 114 } 115 public ClientFactory setReceiveTimeout(Duration receiveTimeout) { 116 thriftClientConfig.setReceiveTimeout(receiveTimeout); 117 return this; 118 } 119 public ClientFactory setReadTimeout(Duration readTimeout) { 120 thriftClientConfig.setReadTimeout(readTimeout); 121 return this; 122 } 123 public ClientFactory setWriteTimeout(Duration writeTimeout) { 124 thriftClientConfig.setWriteTimeout(writeTimeout); 125 return this; 126 } 127 public ClientFactory setSocksProxy(HostAndPort socksProxy) { 128 thriftClientConfig.setSocksProxy(socksProxy); 129 return this; 130 } 131 public ClientFactory setMaxFrameSize(int maxFrameSize) { 132 thriftClientConfig.setMaxFrameSize(maxFrameSize); 133 return this; 134 } 135 public ClientFactory setHostAndPort(HostAndPort hostAndPort) { 136 if(null == this.hostAndPort){ 137 synchronized(this){ 138 if(null == this.hostAndPort){ 139 this.hostAndPort = checkNotNull(hostAndPort,"hostAndPort must not be null"); 140 } 141 } 142 }else{ 143 throw new IllegalStateException("the memeber hostAndPort be initialized always"); 144 } 145 return this; 146 } 147 public ClientFactory setHostAndPort(String host,int port) { 148 return setHostAndPort(fromParts(host, port)); 149 } 150 public ClientFactory setHostAndPort(String host) { 151 return setHostAndPort(fromString(host)); 152 } 153 public ClientFactory setConnector(NiftyClientConnector<? extends NiftyClientChannel> connector) { 154 this.connector = connector; 155 return this; 156 } 157 public ClientFactory setClientName(String clientName) { 158 this.clientName = clientName; 159 return this; 160 } 161 /** 162 * è®¾ç½®èµ„æºæ± é…ç½®å‚æ•° 163 * @param poolConfig 164 * @return 165 */ 166 public synchronized ClientFactory setPoolConfig(GenericObjectPoolConfig poolConfig) { 167 this.poolConfig = checkNotNull(poolConfig,"poolConfig is null"); 168 return this; 169 } 170 171 public synchronized ClientFactory setExecutor(Executor executor) { 172 this.executor = checkNotNull(executor,"executor is null"); 173 return this; 174 } 175 176 public Executor getExecutor() { 177 return executor; 178 } 179 180 @SuppressWarnings("unchecked") 181 public <T> ClientFactory setDecorator(Function<T, T> decorator) { 182 this.decorator = (Function<Object, Object>) decorator; 183 return this; 184 } 185 public HostAndPort getHostAndPort(){ 186 return checkNotNull(this.hostAndPort,"hostAndPort is null"); 187 } 188 private NiftyClientConnector<? extends NiftyClientChannel> getConnector(){ 189 if(null == this.connector){ 190 synchronized(this){ 191 if(null == this.connector){ 192 this.connector = new FramedClientConnector(this.getHostAndPort()); 193 } 194 } 195 } 196 return this.connector; 197 } 198 private ThriftClientManager getClientManager(){ 199 if(null == this.clientManager){ 200 synchronized(this){ 201 if(null == this.clientManager){ 202 this.clientManager = Singleton.CLIENT_MANAGER; 203 } 204 } 205 } 206 return this.clientManager; 207 } 208 209 @SuppressWarnings("unchecked") 210 private <T>ThriftClient<T> getThriftClient(final Class<T> interfaceClass) { 211 try { 212 return (ThriftClient<T>) THRIFT_CLIENT_CACHE.get(checkNotNull(interfaceClass,"interfaceClass is null"), new Callable<ThriftClient<?>>(){ 213 @Override 214 public ThriftClient<?> call() throws Exception { 215 return new ThriftClient<T>( 216 getClientManager(), 217 interfaceClass, 218 thriftClientConfig, 219 clientName); 220 }}); 221 } catch (Exception e) { 222 Throwables.throwIfUnchecked(e); 223 throw new RuntimeException(e); 224 } 225 } 226 @SuppressWarnings("unchecked") 227 private <T> GenericObjectPool<T> getObjectPool(final Class<T> interfaceClass){ 228 try{ 229 return (GenericObjectPool<T>) INSTANCE_POOL_CACHE.get(checkNotNull(interfaceClass,"interfaceClass is null"), 230 new Callable<GenericObjectPool<T>>(){ 231 @Override 232 public GenericObjectPool<T> call() throws Exception { 233 return new GenericObjectPool<T>(new ClientInstanceFactory<T>(interfaceClass),poolConfig); 234 }}); 235 } catch (Exception e) { 236 Throwables.throwIfUnchecked(e); 237 throw new RuntimeException(e); 238 } 239 } 240 /** 241 * 返回{@code instance}å¯¹åº”çš„èµ„æºæ± 对象 242 * @param instance 243 * @return 244 */ 245 @SuppressWarnings("unchecked") 246 private <T> GenericObjectPool<T> getObjectPoolByInstance(T instance){ 247 checkArgument(null != instance,"intance is null"); 248 List<GenericObjectPool<?>> found = new ArrayList<GenericObjectPool<?>>(1); 249 for(Class<?> clazz : instance.getClass().getInterfaces()){ 250 GenericObjectPool<?> pool = INSTANCE_POOL_CACHE.getIfPresent(clazz) ; 251 if(null !=pool){ 252 found.add(pool); 253 } 254 } 255 checkState(found.size() ==1,"%s is not valid instance of thrift client",instance.getClass().getName()); 256 return (GenericObjectPool<T>) found.get(0); 257 } 258 259 public <T>void closeObjectPool(Class<T> interfaceClass){ 260 if(null != interfaceClass){ 261 GenericObjectPool<?> pool = INSTANCE_POOL_CACHE.getIfPresent(interfaceClass); 262 if(null != pool){ 263 pool.close(); 264 } 265 } 266 } 267 268 /** 269 * thrift client å®žä¾‹ä¸æ˜¯çº¿ç¨‹å®‰å…¨çš„,åªå¯å•线程独å ä½¿ç”¨ï¼Œæ‰€ä»¥æ¯æ¬¡è°ƒç”¨å®žä¾‹æ—¶è¦å‘èµ„æºæ± {@link GenericObjectPool} 270 * 申请一个{@code interfaceClass}的实例,用完åŽè°ƒç”¨{@link #releaseInstance(Object)}归还,其他线程æ‰å¯é‡å¤ä½¿ç”¨ã€‚ 271 * @param interfaceClass 接å£ç±»ï¼Œä¸å¯ä¸º{@code null} 272 * @return 273 */ 274 public <T>T applyInstance(Class<T> interfaceClass) { 275 try { 276 return getObjectPool(interfaceClass).borrowObject(); 277 } catch (Exception e) { 278 Throwables.throwIfUnchecked(e); 279 throw new RuntimeException(e); 280 } 281 } 282 /** 283 * 释放{@code instance}实例使用æƒ,必须和{@link #applyInstance(Class)}é…对使用 284 * @param instance 接å£å®žä¾‹ 285 */ 286 public <T>void releaseInstance(T instance){ 287 if(null != instance){ 288 getObjectPoolByInstance(instance).returnObject(instance); 289 } 290 } 291 private class ClientInstanceFactory<T> implements PooledObjectFactory<T>{ 292 private final Class<T> interfaceClass; 293 294 private ClientInstanceFactory(Class<T> interfaceClass) { 295 checkArgument(null != interfaceClass && interfaceClass.isInterface()); 296 this.interfaceClass = interfaceClass; 297 } 298 private NiftyClientChannel getChannel(PooledObject<T>p){ 299 return (NiftyClientChannel) getClientManager().getRequestChannel(p.getObject()); 300 } 301 @Override 302 public PooledObject<T> makeObject() throws Exception { 303 T obj = getThriftClient(interfaceClass).open(getClientManager().createChannel(getConnector()).get()); 304 return new DefaultPooledObject<T>(obj); 305 } 306 307 @Override 308 public void destroyObject(PooledObject<T> p) throws Exception { 309 getChannel(p).close(); 310 } 311 312 @Override 313 public boolean validateObject(PooledObject<T> p) { 314 return getChannel(p).getNettyChannel().isConnected(); 315 } 316 317 @Override 318 public void activateObject(PooledObject<T> p) throws Exception { 319 // socket连接长时间空闲会被自动关é—, 320 // 为确ä¿borrowObject方法返回的实例有效,åœ¨è¿™é‡Œè¦æ£€æŸ¥å¯¹è±¡æ˜¯å¦è¢«å…³é— 321 // å¦åˆ™è¿”回的对象å¯èƒ½å› 为长时间空闲连接被关é—而在使用时导致连接关é—异常 322 if(!validateObject(p)){ 323 throw new IllegalStateException(); 324 } 325 } 326 327 @Override 328 public void passivateObject(PooledObject<T> p) throws Exception { 329 } 330 331 } 332 public static ClientFactory builder() { 333 return new ClientFactory(); 334 } 335 /** 336 * æž„é€ {@code interfaceClass}实例<br> 337 * thriftyImplClass为nullæ—¶,å‡è®¾destClass为thrifty 异æ¥å®žä¾‹ç±»åž‹ 338 * @param <I> 接å£ç±» 339 * @param <O> 返回实例类型 340 * @param <T> 基于thrifty实现接å£ç±»çš„实例类型 341 * @param interfaceClass 342 * @param thriftyImplClass 343 * @param destClass 返回的实例类型,如果interfaceClasså’ŒthriftyImplClass为null,å¿…é¡»æœ‰å‚æ•°ä¸º{@link ClientFactory}çš„æž„é€ å‡½æ•° 344 * å¦åˆ™å¿…é¡»æœ‰å‚æ•°ç±»åž‹ä¸ºinterfaceClassçš„æž„é€ å‡½æ•° 345 * @return 返回 {@code destClass }实例 346 */ 347 @SuppressWarnings("unchecked") 348 public<I,T extends I,O> O build(final Class<I> interfaceClass,final Class<T> thriftyImplClass,final Class<O> destClass){ 349 try { 350 return (O) CLIENT_CACHE.get(interfaceClass, new Callable<Object>(){ 351 @Override 352 public Object call() throws Exception { 353 if(thriftyImplClass == null){ 354 // destClass ä¸ºå¼‚æ¥æ¨¡å¼å®žä¾‹ 355 return destClass.getDeclaredConstructor(ClientFactory.class).newInstance(ClientFactory.this); 356 } 357 T instance =thriftyImplClass.getDeclaredConstructor(ClientFactory.class).newInstance(ClientFactory.this); 358 if(decorator !=null){ 359 instance = (T) decorator.apply(instance); 360 } 361 return destClass.getDeclaredConstructor(interfaceClass).newInstance(instance); 362 }}); 363 } catch (ExecutionException e) { 364 Throwables.throwIfUnchecked(e.getCause()); 365 throw new RuntimeException(e.getCause()); 366 } 367 } 368 /** 369 * æž„é€ {@code interfaceClass}实例<br> 370 * {@link #build(Class, Class, Class)}的简化版本,当thriftImplClassåªå®žçŽ°äº†ä¸€ä¸ªæŽ¥å£æ—¶ï¼Œè‡ªåŠ¨æŽ¨æ–æŽ¥å£ç±»åž‹ 371 * @param thriftyImplClass 372 * @param destClass 373 * @return 374 * @see #build(Class, Class, Class) 375 */ 376 @SuppressWarnings("unchecked") 377 public<I,O,T extends I> O build(Class<T> thriftyImplClass,Class<O> destClass){ 378 checkArgument(thriftyImplClass != null); 379 checkArgument(thriftyImplClass.getInterfaces().length ==1, 380 "can't determines interface class from %s",thriftyImplClass.getName()); 381 Class<I> interfaceClass = (Class<I>) thriftyImplClass.getInterfaces()[0]; 382 return build(interfaceClass,thriftyImplClass,destClass); 383 } 384 385 /** 386 * 测试当å‰è¿žæŽ¥æ˜¯å¦æœ‰æ•ˆ 387 * @return 连接有效返回{@code true},å¦åˆ™è¿”回{@code false} 388 */ 389 public boolean testConnect(){ 390 try { 391 NiftyClientChannel channel = getClientManager().createChannel(getConnector()).get(); 392 channel.close(); 393 return true; 394 } catch (Exception e) { 395 } 396 return false; 397 } 398 public <V> void addCallback( 399 final ListenableFuture<V> future, 400 final FutureCallback<? super V> callback) { 401 ThriftUtils.addCallback(future, callback, getExecutor()); 402 } 403 @Override 404 public String toString() { 405 StringBuilder builder = new StringBuilder(); 406 builder.append("ClientFactory [hostAndPort="); 407 builder.append(hostAndPort); 408 builder.append(", clientName="); 409 builder.append(clientName); 410 builder.append("]"); 411 return builder.toString(); 412 } 413 /** 414 * å°†{@code future} å°è£…为{@link ListenableFutureDecorator}实例 415 * @param async thrift å¼‚æ¥æŽ¥å£å®žä¾‹ 416 * @param future 异æ¥è¿”回结果实例 417 * @return {@link ListenableFutureDecorator}实例 418 */ 419 @SuppressWarnings("unchecked") 420 public <A,V>ListenableFutureDecorator<A,V>wrap(A async,ListenableFuture<V> future){ 421 if(future instanceof ListenableFutureDecorator){ 422 return (ListenableFutureDecorator<A,V>)future; 423 } 424 return new ListenableFutureDecorator<A, V>(async,future); 425 } 426 /** 427 * {@link ListenableFuture}接å£çš„装饰类, 428 * 用于确ä¿å¼‚æ¥è°ƒç”¨ç»“æŸæ—¶é‡Šæ”¾å¼‚æ¥æŽ¥å£å®žä¾‹,å‚è§{@link ClientFactory#releaseInstance(Object)} 429 * @author guyadong 430 * 431 * @param <A> thrift å¼‚æ¥æŽ¥å£ç±»åž‹ 432 * @param <V> 方法返回值类型 433 */ 434 public class ListenableFutureDecorator<A,V> implements ListenableFuture<V>{ 435 private final A async; 436 private final ListenableFuture<V> future; 437 /** ç¡®ä¿ {@link #releaseAsync()}方法åªè¢«è°ƒç”¨ä¸€æ¬¡çš„æ ‡å¿—å—æ®µ */ 438 private final AtomicBoolean released = new AtomicBoolean(false); 439 public ListenableFutureDecorator(A async, ListenableFuture<V> future) { 440 this.async = checkNotNull(async,"async is null"); 441 this.future = checkNotNull(future,"future is null"); 442 } 443 private void releaseAsync(){ 444 if(released.compareAndSet(false, true)){ 445 releaseInstance(async); 446 } 447 } 448 @Override 449 public boolean cancel(boolean mayInterruptIfRunning) { 450 return future.cancel(mayInterruptIfRunning); 451 } 452 453 @Override 454 public V get() throws InterruptedException, ExecutionException { 455 try{ 456 return future.get(); 457 }finally{ 458 releaseAsync(); 459 } 460 } 461 462 @Override 463 public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { 464 try{ 465 return future.get(timeout, unit); 466 }finally{ 467 releaseAsync(); 468 } 469 } 470 471 @Override 472 public boolean isCancelled() { 473 return future.isCancelled(); 474 } 475 476 @Override 477 public boolean isDone() { 478 return future.isDone(); 479 } 480 481 @Override 482 public void addListener(Runnable listener, Executor executor) { 483 future.addListener(listener, executor); 484 } 485 } 486 static{ 487 // JVM ç»“æŸæ—¶è‡ªåŠ¨æ¸…é™¤èµ„æºæ± 䏿‰€æœ‰å¯¹è±¡ 488 Runtime.getRuntime().addShutdownHook(new Thread(){ 489 490 @Override 491 public void run() { 492 for(GenericObjectPool<?> pool:INSTANCE_POOL_CACHE.asMap().values()){ 493 pool.close(); 494 } 495 } 496 }); 497 } 498}