需求和设计方案
实时质检的复杂度在于外呼系统数据流的边界切割,为了便于和催收平台对接,我们重新设计了接口,把音频流的切割放在AI平台,外呼系统仅需要将音频流数据通过socket推送到AI质检系统即可。
尽管如此,依然无法完全解决产品需求,Socket Header不能携带更多的质检配置信息,亦不能在数据流中既推送文本配置信息(比如授权、规则集等)又推送流媒体,因此,我们又鉴于此设计了Http接口,上传需要本次催收通话中语音质检的配置信息。
上图,是初步设计的数据流时序图。催收平台通过HTTP接口提交质检所需的数据(appKey,appSecret,strategySign)以及一个全局唯一的会话字段session-uuid
,AI质检系统将其保存在本地缓存中,为后续的质检提供服务。之后,催收系统在调用外呼系统时,将session-uuid
绑到socket协议的header中,AI系统解析外呼系统推送的socket中header信息,并在本地缓存中查找与之对应的质检配置信息,然后处理音频流,切割成不同的片段质检,并将质检结果通过回调接口传给催收平台(AI平台有质检记录,可通过session-uuid查看)。
上述设计中,参考了智能调查系统。因此,如果催收需要我们回调数据,则需要提供回调接口。
Netty解决粘包和拆包
粘包/拆包
所谓的TCP粘包是指一次会话接收的数据不能完整的体现出一个完整的消息。为何TCP会出现粘包?主要因为TCP是以流的形式在网络中传递数据,在加上网络上的最大传输单元(MTU:Maximum Transmission Unit)往往小于在应用处理的消息,所以就会引发一次处理的消息无法满足消息数据的需要,导致粘包的存在。处理粘包的唯一方式就是自定义应用层的数据通讯协议,通过定制协议来控制现有接收的数据是否满足消息数据的需要。
解决方法
消息定长: 报文大小固定长度,不够空格补全,发送和接收方遵循相同的约定,这样即使粘包了通过接收方编程实现获取定长报文也能区分。
包尾添加特殊分隔符: 例如每条报文结束都添加回车换行符(例如FTP协议)或者指定特殊字符作为报文分隔符,接收方通过特殊分隔符切分报文区分。
**将消息分为消息头和消息体:**消息头中包含表示信息的总长度(或者消息体长度)的字段。
自定义协议。解决TCP粘包/拆包
自定义协议
1 2 3 4 5 6 7 8 9 10 11 12 13 SVTP(Session Voice Transport Protocol) 基于TCP协议的应用层协议 数据包格式 +——----——-------+——-----————----——-------+——----——+ |协议开始标志 | 长度 | 数据 | |PT(包类型) 8bit | PL(包长度)16bit | | +——----——-------+——-----————----——-------+——----——+ PT: 0(EMPTY)|1(INFO)|2(DATA)|3(TEXT) 的一种 PL: 0 ~ 65535 的正整数,代表包正文部分的长度, DATA: 包正文(文本或二进制数据流) 1.协议开始标志head_data,为int类型的数据,16进制表示为0X76 2.传输数据的长度contentLength,int类型 3.要传输的数据
协议标记
PT: 0(EMPTY)|1(INFO)|2(DATA)|3(TEXT) 的一种
定义为枚举类型:
EMPTY为空包。
INFO在语音数据流开始和结束时发送。
DATA为正式的语音流数据包。
TEXT为文本数据流。
1 2 3 public enum SVTPType { EMPTY, INFO, DATA, TEXT }
编解码器
将应用程序的数据转换为网络格式,以及将网络格式转换为应用程序的数据的组件分别叫作编码器和解码器,同时具有这两种功能的单一组件叫作编解码器。Netty 提供了一系列用来创建所有这些编码器、解码器以及编解码器的工具,从专门为知名协议(如 HTTP 以及 Base64)预构建的类,到你可以按需定制的通用的消息转换编解码器,应有尽有。
如果将消息看作是对于特定的应用程序具有具体含义的结构化的字节序列——它的数据。那么编码器是将消息转换为适合于传输的格式(最有可能的就是字节流);而对应的解码器则是将网络字节流转换回应用程序的消息格式。因此,编码器操作出站数据,而解码器处理入站数据。
——《Netty实战》第二部分编解码器
将字节解码为消息——ByteToMessageDecoder和ReplayingDecoder;
将一种消息类型解码为另一种——MessageToMessageDecoder。
CODE实现
SOCKET服务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 package cn.socket.server; import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.Channel;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;@Slf4j @Component public class AsrTcpServer { @Value("${socket.port:8888}") private int port; private void startServer () { EventLoopGroup boss = new NioEventLoopGroup (); EventLoopGroup work = new NioEventLoopGroup (); try { ServerBootstrap bootstrap = new ServerBootstrap (); bootstrap.group(boss, work).channel(NioServerSocketChannel.class) .childHandler(new SocketChannelInitializer ()) .option(ChannelOption.SO_BACKLOG, 1024 ) .childOption(ChannelOption.SO_KEEPALIVE, true ); log.info("【服务器启动成功========端口:" + port + "】" ); Channel channel = bootstrap.bind(port).sync().channel(); channel.closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { boss.shutdownGracefully(); work.shutdownGracefully(); } } @PostConstruct() public void init () { new Thread (() -> startServer()).start(); } }
Handler:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 package cn.socket.server;import cn.socket.decoder.OutboundDecoder;import cn.socket.encoder.OutboundEncoder;import cn.socket.handler.SocketServerHandler;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelPipeline;import io.netty.channel.socket.SocketChannel;import io.netty.handler.stream.ChunkedWriteHandler;import io.netty.handler.timeout.IdleStateHandler;import java.util.concurrent.TimeUnit;public class SocketChannelInitializer extends ChannelInitializer <SocketChannel> { @Override protected void initChannel (SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline .addLast("chunked" , new ChunkedWriteHandler ()) .addLast(new IdleStateHandler (60 , 30 , 60 * 30 , TimeUnit.SECONDS)) .addLast("svtp-encode" , new OutboundEncoder ()) .addLast("svtp-decoder" , new OutboundDecoder ()) .addLast(new SocketServerHandler ("/" , null , false )); } }
自定义协议:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 package cn.socket.protocol;import cn.socket.server.SVTPPacket;import com.google.common.base.Charsets;import com.google.common.collect.Maps;import lombok.Getter;import lombok.extern.slf4j.Slf4j;import org.apache.commons.lang3.StringUtils;import java.util.Map;@Slf4j @Getter public class OutboundProtocol { public enum SVTPType { EMPTY, INFO, DATA, TEXT } private SVTPType type; private int length; private byte [] data; private Map<String, String> infoMap; public OutboundProtocol (SVTPType type, byte [] data) { this .type = type; this .data = data; this .length = data.length; parseInfo(); } public byte [] toBytes() { byte [] bytes = new byte [data.length + 3 ]; bytes[0 ] = (byte ) type.ordinal(); bytes[1 ] = (byte ) (length >> 8 & 0xFF ); bytes[2 ] = (byte ) (length & 0xFF ); System.arraycopy(data, 0 , bytes, 3 , length); return bytes; } public String getInfo (String key) { if (infoMap == null || infoMap.isEmpty() || key == null ) { return null ; } String val = infoMap.get(key.toLowerCase()); if (StringUtils.isBlank(val)) { return null ; } return val; } private void parseInfo () { if (SVTPPacket.SVTPType.INFO.equals(type)) { String infoStr = new String (data, Charsets.UTF_8); log.debug("Package content: " + infoStr); String[] lines = StringUtils.split(infoStr, "\n" ); infoMap = Maps.newHashMap(); for (String line : lines) { line = StringUtils.trimToEmpty(line); String[] infoParts = StringUtils.split(line, ":" , 2 ); if (infoParts.length == 2 ) { String key = StringUtils.trimToNull(infoParts[0 ]); String val = StringUtils.trimToNull(infoParts[1 ]); if (key != null ) { infoMap.put(key.toLowerCase(), val); } } } } } }
编码器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 package cn.socket.encoder;import cn.socket.protocol.OutboundProtocol;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.MessageToByteEncoder;public class OutboundEncoder extends MessageToByteEncoder <OutboundProtocol> { @Override protected void encode (ChannelHandlerContext channelHandlerContext, OutboundProtocol msg, ByteBuf byteBuf) throws Exception { byteBuf.writeByte(msg.getType().ordinal()); int length = msg.getLength() & 0x0000FFFF ; byte lowEndian = (byte ) (length & 0x00FF ); byte highEndian = (byte ) ((length >> 8 ) & 0xFF ); byteBuf.writeByte(highEndian); byteBuf.writeByte(lowEndian); byteBuf.writeBytes(msg.getData()); } }
解码器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 package cn.socket.decoder;import cn.socket.protocol.OutboundProtocol;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.ByteToMessageDecoder;import java.util.List;public class OutboundDecoder extends ByteToMessageDecoder { public final int BASE_LENGTH = 1 + 2 ; @Override protected void decode (ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { if (byteBuf.readableBytes() > BASE_LENGTH) { int beginReader; OutboundProtocol.SVTPType type; while (true ) { beginReader = byteBuf.readerIndex(); byteBuf.markReaderIndex(); int pktType = byteBuf.readByte(); if (pktType >= 0 && pktType < OutboundProtocol.SVTPType.values().length) { type = OutboundProtocol.SVTPType.values()[pktType]; break ; } byteBuf.resetReaderIndex(); byteBuf.readByte(); if (byteBuf.readableBytes() < BASE_LENGTH) { return ; } } int ll = byteBuf.readByte(); int lr = byteBuf.readByte(); int length = (0xFF00 & (ll << 8 )) | (0x00FF & lr); if (byteBuf.readableBytes() < length) { byteBuf.readerIndex(beginReader); return ; } byte [] data = new byte [length]; byteBuf.readBytes(data); OutboundProtocol protocol = new OutboundProtocol (type, data); list.add(protocol); } } }
Netty中处理TCP会话session
在同步阻塞的网络编程中,代码都是按照TCP操作顺序编写的,即创建连接、多次读写、关闭连接,这样很容易判断这一系列操作是否是同一个TCP连接。而在事件驱动的异步编程网络模型中,IO操作都会触发一个事件Event调用事件函数处理该事件,例如接收到客户端的新数据,Mina会调用messageReceived,Netty会调用channelRead, Twisted会调用dataReceived,同一个TCP连接的多次请求和多个客户端的请求是一样的。
传统阻塞网络编程中,每个TCP连接都是独立的一个线程,同一个连接的数据流不会发送到其他的TCP连接。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 public class CServer extends Thread { private static final Logger LOGGER = LoggerFactory.getLogger(CServer.class); private ServerSocket serverSocket; private int port = 4004 ; private int maxConnections = 1024 ; private String host = "0.0.0.0" ; private int clientNumber = 400 ; private void startSVTPServer () { while (Boolean.TRUE) { try { new CNetWorker (serverSocket.accept(), clientNumber++).start(); LOGGER.info("accept socket, clientNumber: {}" , clientNumber); } catch (IOException ignore) { } } } @Override public void run () { try { InetAddress address = InetAddress.getByName(host); serverSocket = new ServerSocket (port, maxConnections, address); LOGGER.info("Server started at {}:{}" , host, port); startSVTPServer(); } catch (UnknownHostException e) { LOGGER.error("Bind wrong ip {}." , host, e); } catch (IOException e) { LOGGER.error("IO Exception happend: " , e); } finally { if (serverSocket != null ) { try { serverSocket.close(); } catch (IOException e) { e.printStackTrace(); } } } } private static class CNetWorker extends Thread { enum State { CONNECTED, ESTABLISHED, CLOSED } private Socket socket; private int socketTimeout = 100000 ; private int clientNumber; private State state; private ByteArrayOutputStream dataBuffer; private OutputStream wavFile; private String uuid; private int rate; private InputStream socketIn; private OutputStream socketOut; public CNetWorker (Socket socket, int clientNumber) { this .socket = socket; try { socket.setSoTimeout(socketTimeout); } catch (SocketException ignore) { } this .clientNumber = clientNumber; state = State.CONNECTED; dataBuffer = new ByteArrayOutputStream (); } public void run () { try { socketIn = new BufferedInputStream (socket.getInputStream()); socketOut = socket.getOutputStream(); while (Boolean.TRUE) { SVTPPacket pkt = new SVTPPacket (socketIn); if (State.CONNECTED.equals(state)) { if (SVTPPacket.SVTPType.INFO.equals(pkt.getType())) { String action = pkt.getInfo("action" ); if ("start" .equalsIgnoreCase(action)) { String samplerate = pkt.getInfo("sample-rate" ); this .rate = 8000 ; if (rate == 8000 || rate == 16000 ) { this .rate = rate; } String uuid = pkt.getInfo("session-uuid" ); this .uuid = uuid; LOGGER.info("Session-uuid: {} started with samplerate: {}" , uuid, rate); state = State.ESTABLISHED; } } } else if (State.ESTABLISHED.equals(state)) { if (SVTPPacket.SVTPType.INFO.equals(pkt.getType())) { if ("stop" .equalsIgnoreCase(pkt.getInfo("action" ))) { state = State.CLOSED; LOGGER.info("Session-uuid: {}, stoped." , uuid); break ; } } else if (SVTPPacket.SVTPType.DATA.equals(pkt.getType())) { } } } } catch (IOException e) { } } } }
在异步网络编程中,多个连接使用同一个ChannelHandler,如果同一个会话中的多个完整的消息数据流才能聚集成一个最终的数据包,比如语音流,需要处理session问题。
Mina定义了IoSession接口存储session信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 public interface IoSession { getAttribute(Object key) getAttribute(Object key, Object defaultValue) setAttribute(Object key) setAttribute(Object key, Object value) setAttributeIfAbsent(Object key) setAttributeIfAbsent(Object key, Object value) replaceAttribute(Object key, Object oldValue, Object newValue) removeAttribute(Object key) removeAttribute(Object key, Object value) containsAttribute(Object key) getAttributeKeys() }
Netty将这种看似Map的功能进一步抽象,形成了AttributeMap接口:
1 2 3 public interface AttributeMap { <T> Attribute<T> attr (AttributeKey<T> key) ; }
AttributeMap接口只有一个attr()方法,接收一个AttributeKey类型的key,返回一个Attribute类型的value。按照Javadoc,AttributeMap实现必须是线程安全的。
Netty中,所有的Channle和ChannelHandlerContext都实现了AttributeMap接口。
上图来源:https://blog.csdn.net/zxhoo/article/details/17719333
AttributeMap的key的类型全部是AttributeKey,AttributeKey是个泛型类,灵活方便。同时,AttributeKey继承了UniqueName类,内部使用ConcurrentHashMap来保证name的唯一性。
1 2 3 4 5 6 7 8 9 10 public interface Attribute <T> { AttributeKey<T> key () ; T get () ; void set (T value) ; T getAndSet (T value) ; T setIfAbsent (T value) ; T getAndRemove () ; boolean compareAndSet (T oldValue, T newValue) ; void remove () ; }
参考文章/书籍
[1] Netty 实战
[2] Netty之解决TCP粘包拆包(自定义协议)
[3] Netty处理TCP连接的session
[4] Netty4学习笔记(7)-- AttributeMap