本文字数:3850字
预计阅读时间:15 分钟
业务端生成的业务数据发送给审核中台。审核中台前置有机器审核,机器审核通过后再经过人工审核。审核结束后,审核中台将审核结果发送给业务端。业务端根据审核结果对业务数据做出相应的处理:数据露出或者数据删除。审核中台是为了把控用户生成数据的安全性与合法性。
1.1.3 领域模型
领域模型(Domain Model),是完成从需求分析到面向对象设计的一座桥梁,领域模型是指对需求所涉及的领域的建模,所以也叫业务对象模型,是描述业务用例实现的对象模型。它是对业务角色和业务实体之间应该如何联系和协作以执行业务的一种抽象。下文中提到的业务领域模型是对业务端数据进审的业务流程进行建模的模型。模型名称为:MBusiness,关键属性为inTopic(进审消息topic),inGroup(进审消息消费者group),outTopic(出审消息topic),outGroup(出审消息生产者group),type(业务类型)。(inTopic、outTopic:是每个业务端对应一组。)
a、inTopic为进审topic,在inTopic下业务端作为Producer生产消息,审核中台作为Consumer消费消息。
b、outTopic为出审topic,在outTopic下审核中台作为Producer生产消息,业务端作为Consumer消费消息。
@Slf4j
@Component
public class ConsumerManager implements ApplicationRunner, DisposableBean {
private Map<String, Consumer> consumerMap = Collections.synchronizedMap(new HashMap<>());
@Resource
private VideoConsumerCallback videoConsumerCallback;
@Resource
private ContentConsumerCallback contentConsumerCallback;
@Resource
private PlayListConsumerCallback playListConsumerCallback;
@Resource
private BroadlistConsumerCallback broadlistConsumerCallback;
@Resource
private MBusinessDao businessDao;
@Override
public void run(ApplicationArguments args) throws Exception {
init();
flush();
}
private void flush() {
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
init();
}
}, 1000 * 60 * 5L, 1000 * 60 * 5L);
}
private void init() {
List<MBusiness> businessList = businessDao.listValid();
if (CollectionUtils.isEmpty(businessList)) {
return;
}
for (MBusiness business : businessList) {
if (consumerMap.containsKey(business.getInTopic())) {
continue;
}
Consumer consumer = initConsumer(business);
if (consumer == null) {
continue;
}
consumer.start();
consumerMap.put(business.getInTopic(), consumer);
log.info("topic:{} group:{} start consumer.", business.getInTopic(), business.getInGroup());
}
}
@Override
public void destroy() throws Exception {
if (consumerMap.isEmpty()) {
return;
}
for (Consumer consumer : consumerMap.values()) {
consumer.shutdown();
}
}
private Consumer initConsumer(MBusiness business) {
if (StringUtils.isBlank(business.getInGroup()) || StringUtils.isBlank(business.getInTopic())) {
return null;
}
Consumer consumer = new Consumer(business.getInGroup(), business.getInTopic());
if (EnterType.VIDEO.getType().equals(business.getEnterType())) {
consumer.setConsumerCallback(videoConsumerCallback);
}
if (EnterType.CONTENT.getType().equals(business.getEnterType())) {
consumer.setConsumerCallback(contentConsumerCallback);
}
if (EnterType.PLAYLIST.getType().equals(business.getEnterType())) {
consumer.setConsumerCallback(playListConsumerCallback);
}
if (EnterType.BROADLIST.getType().equals(business.getEnterType())) {
consumer.setConsumerCallback(broadlistConsumerCallback);
}
consumer.setPullThresholdForQueue(1);
return consumer;
}
public Set<String> listTopic() {
return consumerMap.keySet();
}
public Consumer getConsumer(String topic) {
return consumerMap.get(topic);
}
}
@Slf4j
@Component
public class ProducerManager implements ApplicationRunner, DisposableBean {
private Map<String, Producer> producerMap = Collections.synchronizedMap(new HashMap<>());
private Map<String, Producer> producerTypeMap = Collections.synchronizedMap(new HashMap<>());
private Map<String, String> webhookMap = Collections.synchronizedMap(new HashMap<>());
@Resource
private MBusinessDao businessDao;
@Override
public void run(ApplicationArguments args) throws Exception {
init();
flush();
}
private void flush() {
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
init();
}
}, 1000 * 10 * 3L, 1000 * 60 * 3L);
}
private void init() {
List<MBusiness> businessList = businessDao.listValid();
if (CollectionUtils.isEmpty(businessList)) {
return;
}
for (MBusiness business : businessList) {
if (StringUtils.isNotBlank(business.getOutWebhook()) && !webhookMap.containsKey(business.getType())) {
webhookMap.put(business.getType(), business.getOutWebhook());
}
if (producerMap.containsKey(business.getOutTopic())) {
if (!producerTypeMap.containsKey(business.getType())) {
producerTypeMap.put(business.getType(), producerMap.get(business.getOutTopic()));
log.info("business type:{} topic:{} group:{} init producer.", business.getType(), business.getOutTopic(), business.getOutGroup());
}
continue;
}
Producer producer = initProducer(business);
if (producer == null) {
continue;
}
producer.start();
producerMap.put(business.getOutTopic(), producer);
producerTypeMap.put(business.getType(), producer);
log.info("business type:{} topic:{} group:{} init producer.", business.getType(), business.getOutTopic(), business.getOutGroup());
}
}
@Override
public void destroy() throws Exception {
if (producerMap.isEmpty()) {
return;
}
for (Producer producer : producerMap.values()) {
producer.shutdown();
}
}
public Producer initProducer(MBusiness business) {
if (StringUtils.isBlank(business.getOutGroup()) || StringUtils.isBlank(business.getOutTopic())) {
return null;
}
return new Producer(business.getOutGroup(), business.getOutTopic());
}
public String getWebhook(String type) {
if (StringUtils.isBlank(type)) {
return null;
}
return webhookMap.get(type);
}
public Producer getProducer(String type) {
if (StringUtils.isBlank(type)) {
return null;
}
return producerTypeMap.get(type);
}
}