0x00 需求背景
我们部门计划要做一个公共服务管理平台,主要给后端业务提供配置化服务,将配置中心的部分配置迁移到服务平台的可视化界面中开放给运营侧配置。当然我们也提供了文图音视任务状态的管理能力,为补偿失败任务提供了益中解决方案;同时对各个应用集群节点也进行了监控,对于需要将数据放在内存场景的情况进行管控。
算法研发同学也对于服务平台“寄予厚望”,希望我们能提供一套AB测试方案,能后帮助他们对比算法优化前后的效果。一番需求沟通和评审之后,我们对ABtest的设计也就有了一个具体的方案,此处不对设计方案做过多的说明。

图1 ABtest设计方案
为了减少对业务代码的入侵,在上图的设计中,服务心跳、ABTEST的逻辑封装在服务平台提供的SDK中。其中,ABTEST使用了AspectJ实现对业务代码的修改和增强。
为了不破坏原有业务的吞吐量,在开发过程中用到了消息队列kafka同步消息,但是kafka集群的配置是在配置中心提供的,不同的环境、不同的机房配置不一样,问题:如何将动态配置注入到AspectJ的切面中呢?
0x01 使用AspectJ实现业务增强
服务平台封装的SDK是纯Java项目,没有引入Spring框架相关的包。因此,不能简单的使用@Autowire @Resource注解引入spring的对象。
在SDK中我们封装了两个对象。
一个是AbTest的注解类,目的是在需要切入的方法上增加@AbTest(indicator="porn_model_indicator",stream=DemoStrem.class, parser=DemoParser)
就可以实现ABtest的功能,其中indicator与创建任务是关联的,目的是定位到该注解indicator和创建的任务是对应则执行abtest,否则不执行;stream是分流器,实现分类策略,SDK定义了Stream接口并提供默认的分流器实现类;parser是解析器,将执行结果解析为符合服务平台所需的数据报文格式,同样SDK定义了Parser接口并提供了默认的解析器实现类。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| @Target({ElementType.METHOD,ElementType.TYPE,ElementType.FIELD,ElementType.LOCAL_VARIABLE}) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited public @interface AbTest {
String indicator();
Class<? extends Stream> stream() default Stream.class;
Class<? extends Parser> parser() default Parser.class; }
|
另一个是AbTestAspect的切面类,目前是对有@AbTest
注解的方法进行增强,并将结果发送到消息队列工服务平台消费。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
| package xxx; @Aspect public class ABTestAspect { private static final Logger logger = LoggerFactory.getLogger(ABTestAspect.class);
IProducer producer; private static ThreadFactory threadName = new ThreadFactoryBuilder() .setNameFormat("PyxisABTestAspect").build(); private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(9, 9, 0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), threadName);
public ABTestAspect() { }
@Around("execution(!void *(..)) && @annotation(xxx.annotation.AbTest)") public Object around(ProceedingJoinPoint point) throws Throwable {
Object s1 = point.proceed(); EXECUTOR_SERVICE.submit(new Runnable() { @Override public void run() { try { MethodSignature signature = (MethodSignature) point.getSignature(); AbTest abTest = signature.getMethod().getDeclaredAnnotation(AbTest.class); Object streamObject = abTest.stream().getConstructor().newInstance(); String indicatorList = abTest.indicator(); String[] indicators = indicatorList.split(","); Stream stream = null; if (streamObject instanceof DemoStream) { stream = (DemoStream) streamObject; } else { logger.error("ABTestAspect 注解参数有误,分流器不存在"); return; }
Object[] args = point.getArgs(); String[] parameterNames = signature.getParameterNames(); HashMap<String, Object> paraMap = new HashMap<>(); for (int index = 0; index < args.length; index++) { paraMap.put(parameterNames[index], args[index]); }
upload(resultArraylist); } catch (Throwable e) { logger.error("ABtest切面执行异常"); e.printStackTrace(); } } });
return s1; }
private void upload(List<Result> resultArraylist) { for (Result result : resultArraylist) { try { final String msgKey = UUID.randomUUID().toString(); producer.produce("report_abtest_result", msgKey, JSONObject.toJSONString(result).getBytes("UTF-8")); } catch (Exception e) { e.printStackTrace(); } } } }
|
0x10 解决问题
有@AspectJ
注解的切面是在Spring容器之外创建的,因此不受Spring容器管理声明周期,并且该实例是单例,在@AspectJ
注解的类中,编译后我们就会发现多一个方法:aspectOf(),Aspect切面对象也就是由该方法创建。
想要在Spring中对切面进行有效注入,那么必须将该类交由Spring管理,这就很明显了,创建一个bean放在Spring的容器中,创建方法必须是aspectOf()
。
**此外,需要对注入的对象增加setter方法。**我们在ABTestAspect
中增加下面方法
1 2 3
| public void setProducer(IProducer producer) { this.producer = producer; }
|
1 2 3 4
| <bean id="aBTestAspect" class="xxx.ABTestAspect" factory-method="aspectOf" > <property name="producer" ref="syncProducer"></property> </bean>
|
1 2 3 4 5 6 7 8 9 10 11 12
|
@Autowire private SimpleProducer producer;
@Bean public ABTestAspect theAspect() { ABTestAspect aspect = Aspects.aspectOf(ABTestAspect.class); aspect.setProducer(producer) return aspect; }
|