作者:钱文锦
场景:
天润融通是一家云呼叫中心服务商,其中CTI-Cloud为大量头部客户提供高效、稳定的呼叫中心服务。现在,天润通过T-Phone SDK将CTI-Cloud的功能延伸到移动端,为客户提供移动端的呼叫服务。
在天润的T-Phone SDK中,我们需要采集WebRTC信息来进行数据的分析并作出优化的建议,所以需要将SDK中采集到的相关日志进行上报;为了精简日志上报的数据,我们只针对其中的传输数据每隔5秒在双方接通后上传,针对传输中网络的抖动和链接状态可以数据化展示,提供对每一通话的数据分析,以便在后续SDK演进中提供数据支撑;另外我们对每一通电话做了操作日志,记录了接口被调用的操作和时间,为用户在某一通电话的操作记录做还原,分析可能的误操作等,为客户提供更好的交互体验。
因为现在仍处于项目初期,我们更关心用户在某一个时间段内的使用情况,在大量使用的场景中是否仍然能保证较高的通话质量,同时我们应该尽可能做到对每个座席都可以进行分析,做到每一个座席都应该有自己的数据表。
举个例子:如果我们要查询企业7000001的座席9001 2020年2月14日12:00-12:10分的一通通话的WebRTC日志,如果没有按照座席进行分表,SQL语句应该是这样:
select log_time, audio_bytes_sent,... from aladdin.webrtc_log where device_id = '70000019001' and where log_time between 1581652800000 and 1581653400000;
如果要提升查询的速度,我们首先要对device_id
和log_time
字段建立索引,但是当数据量比较大的时候,索引的存储也会是问题,所以要考虑分表(我们之前使用的数据库是aws的rds,所以没有分库的概念)
分表的选择有两种,按照时间分表或者按照座席分表。为什么我们要按照座席分表?如果按照时间分表,这样就会出现不同表的数据量差异过大,甚至存在某个表里没有数据的情况,因为很少有人半夜做外呼。但是我们也不能这样武断的不为半夜的时间段建立表,万一人打的是国际长途呢(手动滑稽)?但是一个座席不可能存在不外呼的情况,而且对于移动端的应用,我们在排查问题时更多是通过某个座席向我们反馈发生的问题,我们再针对这个座席进行排查,所以在查询的时候device_id
这个字段是必须要体现的,如果按照device_id
进行分表,我们在查询的时候就不再需要对这个字段建立索引了。因而选择按照座席进行分表。
如果要使用传统的数据库做分表,我们在插入数据之前一定要先判断这张表是否存在,同时我们还需要提前创建好这些表。这种步骤在我看来就显得很鸡肋。如果能有数据库可以做到在插入数据时指定表名,如果存在则插入,如果不存在则自动创建表,这样就方便多了。
日志上报的整体处理流程
整个流程需要T-Phone SDK,CTI-Cloud的Interface模块(CTI-Cloud对客户开放的接口)和日志上报模块相互协作
设计:
考虑到日志上报的频率较高,对IO吞吐的要求比较高。我们可以通过全异步的方式进行数据的采集。这次使用了Vert.x作为全异步项目开发的工具。
在数据存储上基于以下几点考虑我们选择了TDEngine:
- 不管是WebRTC日志还是操作日志,都是按照时间产生的数据流。而TDEngine正好是一个专门为物联网结构化数据流设计的时序数据库
- WebRTC日志和操作日志存储的数据格式都是一致的,但是如果要做到都每个使用的座席都可以进行分析,最好的方式是每个座席都能有一张自己的数据表。TDEngine提供了超级表,在超级表中定义数据结构,并按照tag区分,只要在插入数据时指定表名即可做到分表。显然解决了上述的鸡肋问题。按照TDEngine官网上的介绍:
为充分利用其数据的时序性和其他数据特点,TDengine要求对每个数据采集点单独建表
其实我们的座席就相当于是一个独立的数据采集点,TDEngine在我们的场景中是很贴合业务的。
- 时间。时间也是我们在查询中重点关注的部分,在传统的数据库中,我们需要通过对字段建立索引来提升查询速度,可是我们仍然不想建立索引,因为索引仍需要占用存储空间,我们是否可以通过类似分表的方式来取代索引呢?答案是肯定的:
TDengine中写入的数据在硬盘上是按时间维度进行分片的。同一个vnode中的表在同一时间范围内的数据都存放在同一文件组中。这一数据分片方式可以大大简化数据在时间维度的查询,提高查询速度。在默认配置下,硬盘上的每个数据文件存放10天数据。用户可根据需要修改系统配置参数daysPerFile进行个性化配置。
- 插入和查询的速度要快,稳定。
在我们的开发服务器上尝试了一下TDEngine。和官网上介绍的出入不大,查询和存储速度确实很快,而且也不依赖其他文件系统,所以就使用TDEngine作为这个模块的存储引擎。由于TDEngine中对列有长度限制,最长4096,而且我们上报的字段比较多,所以尽量分配好每个字段的长度。
在数据的采集过程中,TPhone SDK不会直接和我们进行数据交互,而是会先将数据存储到SQS中,我们再从SQS中拉取数据,然后对数据处理后进行存储。
先来创建一个超级表,tdengine提供的超级表在我看来还是很方便的,我们可以直接利用超级表来做到自动的对数据进行分表存储
create database aladdin; use aladdin; create table webrtc_log( createTime timestamp, deviceId binary(100), audioBytesSent bigint, audioBytesReceived bigint, ... ssrcSendGoogCurrentDelayMs int, ssrcSendGoogJitterBufferMs int ) tags ( deviceIdTag binary(100) );
tdengine提供了非常多的连接方式,为了更好的配合Vertx进行异步存储,我们在这里使用了Rest方式进行数据库操作。
开始
在有了整体思路之后我们开始上手开发:
- 应用配置:
{ "aws.region": "<your aws region>", "aws.accessKey": "<your aws ak>", "aws.secretAccessKey": "<your aws sk>", "aladdin.maxPool": 100, "aladdin.maxWaitQueue": 1500, "aladdin.queue.name": ["queuename1","queuename2"], "aladdin.cache.expireAfterWrite": 30, "aladdin.cache.expireAfterAccess": 30, "tdengine.host": "<your tdengine host>", "tdengine.port": 6020, "tdengine.user": "root", "tdengine.password": "<your tdengine password>" }
- 重写Launcher
import com.tinet.twatch.aladdin.config.Configurer; import io.vertx.core.DeploymentOptions; import io.vertx.core.Launcher; import io.vertx.core.Vertx; import io.vertx.core.json.JsonObject; import io.vertx.core.logging.SLF4JLogDelegateFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author qianwj * @since v1.0 */ public class AladdinLauncher extends Launcher { private static Configurer configurer = new Configurer(); private Logger logger = LoggerFactory.getLogger(AladdinLauncher.class); public static void main(String[] args) { System.setProperty("vertx.logger-delegate-factory-class-name", SLF4JLogDelegateFactory.class.getName()); new AladdinLauncher().dispatch(args); } @Override public void beforeDeployingVerticle(DeploymentOptions deploymentOptions) { logger.info("Loading config starting..."); JsonObject config = configurer.load(); JsonObject local = deploymentOptions.getConfig(); if (!config.isEmpty()) { // 将consul配置注入到context中 local.mergeIn(config); deploymentOptions.setConfig(local); } super.beforeDeployingVerticle(deploymentOptions); logger.info("Loading config completed, config: {}", deploymentOptions.getConfig()); } @Override public void afterConfigParsed(JsonObject config) { logger.info("Loading local config complete, local config: {}", config.encodePrettily()); } @Override public void handleDeployFailed(Vertx vertx, String mainVerticle, DeploymentOptions deploymentOptions, Throwable cause) { logger.error("Deploy verticle occur exception: {}, App will be closed immediately!", cause.getLocalizedMessage(), cause); vertx.close(); } }
其实写完第二步就可以知道这个配置文件存在不是必要的,我们使用了Consul作为配置中心来进行集中配置,这一步主要是为了注入consul的配置以及加载日志。
- 拉取SQS中的数据
import com.amazonaws.AmazonServiceException; import com.tinet.ctilink.yun.entity.YunMessage; import com.tinet.twatch.aladdin.service.AwsSQSService; import com.tinet.twatch.aladdin.config.Configurer; import io.vertx.core.AbstractVerticle; import io.vertx.core.eventbus.EventBus; import io.vertx.core.json.Json; import io.vertx.core.json.JsonArray; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; public class DataCollectVerticle extends AbstractVerticle { private Logger logger = LoggerFactory.getLogger(DataCollectVerticle.class); private volatile boolean shutdown = false; @Override public void start() throws Exception { logger.info("DataCollectVerticle starting..."); AwsSQSService sqsService = Configurer.sqsService(); EventBus bus = vertx.eventBus(); vertx.setPeriodic(1000, id -> { try { if (shutdown) { vertx.cancelTimer(id); } JsonArray array = config().getJsonArray(Configurer.QUEUE_URL); List<YunMessage> msgs = sqsService.receiveMessageAndDelete(array.getString(0)); List<YunMessage> userActionMsgs = sqsService.receiveMessageAndDelete(array.getString(1)); bus.send(Configurer.CHANNEL_ADDRESS, Json.encode(msgs)); } catch (AmazonServiceException e) { logger.warn("msgs received failed, cause: {}", e.getLocalizedMessage(), e); } }); } @Override public void stop() throws Exception { shutdown = true; logger.info("DataCollectVerticle closing..."); } }
- 将数据存储到TDEngine中
import com.github.benmanes.caffeine.cache.Cache; import com.tinet.ctilink.yun.entity.YunMessage; import com.tinet.twatch.aladdin.DataOperator; import com.tinet.twatch.aladdin.config.Configurer; import com.tinet.twatch.aladdin.model.WebRTCLog; import io.vertx.core.AbstractVerticle; import io.vertx.core.Handler; import io.vertx.core.buffer.Buffer; import io.vertx.core.eventbus.EventBus; import io.vertx.core.eventbus.Message; import io.vertx.core.json.Json; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.ext.web.client.HttpResponse; import io.vertx.ext.web.client.WebClient; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.*; public class SaveVerticle extends AbstractVerticle { private Logger logger = LoggerFactory.getLogger(SaveVerticle.class); @Override public void start() throws Exception { logger.info("SaveVerticle starting...."); // 从event bus接收数据 EventBus bus = vertx.eventBus(); bus.consumer(Configurer.CHANNEL_ADDRESS, (Handler<Message<String>>) msg -> { JsonArray coming = new JsonArray(msg.body()); if (coming != null) save(coming); }); } private void save(JsonArray array) { WebClient client = Configurer.tdClient(); List<WebRTCLog> data = new ArrayList<>(); Cache<String, WebRTCLog> cache = Configurer.cache(); if (array.size() > 0) { final WebRTCLog empty = new WebRTCLog(); for (int i = 0; i < array.size(); i++) { String message = array.getJsonObject(i).mapTo(YunMessage.class).getBody(); try { JsonObject json = DataOperator.toJsonObject(message); WebRTCLog log = json.mapTo(WebRTCLog.class); String cacheKey = log.getDeviceId(); WebRTCLog org = cache.get(cacheKey, k -> empty); if (!Objects.equals(org, empty)) { // 如果不是第一次插入 DataOperator.merge(log, org); } cache.put(cacheKey, log); data.add(log); } catch (Exception e) { logger.error("log saved failed, cause: {}", e.getLocalizedMessage(), e); } } client.post("/rest/sql") .basicAuthentication(config().getString("tdengine.user"), config().getString("tdengine.password")) .sendBuffer(insert(data), ar -> { if (ar.succeeded()) { HttpResponse<Buffer> response = ar.result(); if (response != null) { JsonObject res = response.bodyAsJsonObject(); if (!"succ".equals(res.getString("status"))) { logger.warn("data insert failed! data: {}, cause: {}", Json.encode(data), res.getString("desc")); } } } else { logger.error("data insert failed! {}", Json.encode(data), ar.cause()); } }); } } private Buffer insert(WebRTCLog log) throws Exception { String formatter = "INSERT INTO ALADDIN.WEBRTC_LOG_%s " + " USING ALADDIN.WEBRTC_LOG TAGS(%s) " + "VALUES(%s)"; String sql = String.format(formatter, log.getDeviceId(), log.getDeviceId(), DataOperator.valToSql(log)); return Buffer.buffer(sql); } private Buffer insert(List<WebRTCLog> data) throws IllegalAccessException { StringBuilder sqlBuilder = new StringBuilder("INSERT INTO "); String formatter = "ALADDIN.WEBRTC_LOG_%s USING ALADDIN.WEBRTC_LOG TAGS(%s) VALUES(%s) "; for (WebRTCLog log : data) { sqlBuilder.append(String.format(formatter, log.getDeviceId(), log.getDeviceId(), DataOperator.valToSql(log))); } return Buffer.buffer(sqlBuilder.toString()); } @Override public void stop() throws Exception { logger.info("SaveVerticle closing...."); } }
- 部署Verticle
import com.tinet.twatch.aladdin.config.Configurer; import com.tinet.twatch.aladdin.verticle.DataCollectVerticle; import com.tinet.twatch.aladdin.verticle.SaveVerticle; import io.vertx.core.AbstractVerticle; import io.vertx.core.DeploymentOptions; import io.vertx.core.Promise; import io.vertx.core.logging.Logger; import io.vertx.core.logging.LoggerFactory; import io.vertx.ext.web.client.WebClientOptions; public class MainVerticle extends AbstractVerticle { private Logger logger = LoggerFactory.getLogger(MainVerticle.class); @Override public void start(Promise<Void> startPromise) throws Exception { logger.info("MainVerticle starting..."); // 初始化sqs String region = config().getString("aws.region"); String accessKey = config().getString("aws.accessKey"); String secretKey = config().getString("aws.secretAccessKey"); Configurer.initSQSService(region, accessKey, secretKey, config().getJsonArray(Configurer.QUEUE_URL)); DeploymentOptions dataCollectDeploymentOptions = new DeploymentOptions(); dataCollectDeploymentOptions.setInstances(1); dataCollectDeploymentOptions.setConfig(config()); dataCollectDeploymentOptions.setWorker(true); Configurer.initCache(config().getInteger("aladdin.cache.expireAfterWrite"), config().getInteger("aladdin.cache.expireAfterAccess")); vertx.deployVerticle(DataCollectVerticle.class.getName(), dataCollectDeploymentOptions, ar -> { if (ar.succeeded()) { logger.info("DataCollectVerticle started!"); } else { logger.warn("DataCollectVerticle deploy failed! {}", ar.cause().getLocalizedMessage(), ar.cause()); } }); // 初始化webclient WebClientOptions options = new WebClientOptions(); options.setMaxWaitQueueSize(config().getInteger("aladdin.maxWaitQueue")); options.setMaxPoolSize(config().getInteger("aladdin.maxPool")); options.setDefaultHost(config().getString("tdengine.host")); options.setDefaultPort(config().getInteger("tdengine.port")); Configurer.initTDClient(vertx, options); DeploymentOptions saveDeploymentOptions = new DeploymentOptions(); saveDeploymentOptions.setInstances(1); saveDeploymentOptions.setConfig(config()); vertx.deployVerticle(SaveVerticle.class.getName(), saveDeploymentOptions, ar -> { if (ar.succeeded()) { logger.info("SaveVerticle started!"); } else { logger.warn("SaveVerticle deploy failed!"); } }); } }
这样就快速实现了一个日志上报的模块,且多个实例部署时相互之间不会产生影响,当然在实际的生产环境中,我们需要考虑的会更多。
当然,日志上报只是开始。在之后的项目开发中,我还会继续向大家介绍TDEngine在数据分析中的应用实践,感谢观看。
关于天润融通
天润融通创立于2006年,是一家中国领先的智慧云联络中心服务商,我们提供全云、全渠道的联络中心系统平台、应用软件及服务,依托人工智能赋能企业与客户沟通,让联络中心更智能。我们致力于帮助客户实现其商业成功,利用市场领先的云计算与人工智能技术、卓越的联络中心专业知识和网络、通信管理服务经验,以及开放多元的合作伙伴生态系统。
公司业务覆盖保险、教育、企业服务、交通物流、旅游、本地生活、中介服务等十多个领域,赢得太平洋保险、大地保险、百度、美团、贝壳找房、掌门1对1、51Talk等1000+家企业信赖。目前公司规模近300人,总部设在北京,并在上海、深圳设立华东、华南总部。
了解天润融通,请访问:www.ti-net.com.cn
电话:10109099