导入依赖

Java 程序多用Maven 构建,需要导入一下依赖包:

1
2
3
4
5
6
<!--  KV 缓存 aerospike -->
<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-client</artifactId>
<version>4.1.11</version>
</dependency>

Aerospike 天然支持Netty,如果需要用到 Netty 做 IO 循环事件,需要增加如下 POC 依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<dependencies>
<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-client</artifactId>
<version>4.1.11</version>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>4.1.11.Final</version>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
<version>4.1.11.Final</version>
</dependency>

<!-- Only needed when using epoll event loops on linux -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<classifier>linux-x86_64</classifier>
<version>4.1.11.Final</version>
</dependency>
</dependencies>

配置客户端连接

单点连接

1
2
3
4
5
6
7
8
9
10
11
12
// 方法 1:最简单的方式,使用默认的客户端连接策略
AerospikeClient aerospikeClient = new AerospikeClient("10.57.30.214", 3000);

// 方法 2:自己创建客户端连接策略
// 创建全局写策略
WritePolicy policy = new WritePolicy();
policy.setTimeout(3000); // 单位毫秒
// 创建客户端连接策略
ClientPolicy clientPolicy = new ClientPolicy();
clientPolicy.writePolicyDefault = policy;
// 创建客户端
AerospikeClient aerospikeClient = new AerospikeClient(clientPolicy, "10.57.30.214", 3000);

集群连接

集群同步方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 集群 endpoints,逗号分割
String hostStr = "10.57.30.214:3000,10.57.30.215:3000"

String[] arr = hostStr.split(","); // 通过逗号截取字符串
Host[] hosts = new Host[arr.length];
for (int i = 0; i < arr.length; i++) {
String[] hostPort = StringUtils.split(arr[i], ":");
Host host = new Host(hostPort[0], Integer.valueOf(hostPort[1]));
hosts[i] = host;
}

WritePolicy wp = new WritePolicy();
wp.setTimeout(3000);

ClientPolicy clientPolicy = new ClientPolicy();
clientPolicy.writePolicyDefault = wp;

AerospikeClient aerospikeClient = new AerospikeClient(clientPolicy, hosts);

集群异步方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 集群 endpoints,逗号分割
String hostStr = "10.57.30.214:3000,10.57.30.215:3000"

String[] arr = hostStr.split(","); // 通过逗号截取字符串
Host[] hosts = new Host[arr.length];
for (int i = 0; i < arr.length; i++) {
String[] hostPort = StringUtils.split(arr[i], ":");
Host host = new Host(hostPort[0], Integer.valueOf(hostPort[1]));
hosts[i] = host;
}

WritePolicy wp = new WritePolicy();
wp.setTimeout(3000);
wp.timeoutDelay = 50;

AsyncClientPolicy clientPolicy = new AsyncClientPolicy();
clientPolicy.writePolicyDefault = wp;

AsyncClient aerospikeClient = new AsyncClient(clientPolicy, hosts);

Springboot 配置

1
2
3
4
5
6
7
8
9
10
#--------------- aerospike -----------------#

#as配置
aerospike.endpoints=10.57.30.214:3000
aerospike.namespace=ns1
aerospike.timeout=3000
#as的单台最大tps,压测任意调大,线上1000000,单位为一分钟
aerospike.max.tps=1000000
#as单条record最大长度限制
aerospike.data.maxLen=8388608
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package cn.openmind.conf;

import com.aerospike.client.AerospikeClient;
import com.aerospike.client.Host;
import com.aerospike.client.policy.ClientPolicy;
import com.aerospike.client.policy.WritePolicy;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* AerospikeConfig
*
* @author zhoujunwen
* @date 2020-09-16 10:37
* @desc
*/
@Configuration
@Getter
@Setter
@Slf4j
public class AerospikeConf {
@Value("${aerospike.namespace}")
private String namespace;
@Value("${aerospike.endpoints}")
private String hostStr;
@Value("${aerospike.timeout}")
private Integer timeout;

@Bean
public AerospikeClient aerospikeClient() {
AerospikeClient client = new AerospikeClient(new ClientPolicy(), getHost());
return client;
}

@Bean
public WritePolicy writePolicy() {
WritePolicy policy = new WritePolicy();
policy.setTimeout(getTimeout());
return policy;
}


private Host[] getHost() {
if (StringUtils.isBlank(getHostStr())) {
log.warn(" 初始化 arerospike 异常,因为没有配置 host 列表");
return null;
}
String[] arr = getHostStr().split(",");
Host[] hosts = new Host[arr.length];
for (int i = 0; i < arr.length; i++) {
String[] hostPort = StringUtils.split(arr[i], ":");
Host host = new Host(hostPort[0], Integer.valueOf(hostPort[1]));
hosts[i] = host;
}
return hosts;
}

}

在需要的地方通过注解注入即可。比如:

1
2
@Autowired
private AerospikeClient aerospikeClient;

定义接口

一般而言,我们项目中存在多种缓存机制。大部分的时候,我们优先选择了 Redis,而且对 redis 的数据结构也相对熟悉。因此,在定义接口时,我们需要借鉴 redis client 的方法,方便缓存迁移。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package cn.openmind.cache;

import java.util.List;

/**
* IKVStorage
*
* @author zhoujunwen
* @date 2020-09-16 09:51
* @desc
*/
public interface IKVStorage {
/**
* 设置 key 的过期时间
*
* @param key 缓存 key
* @param ttl ttl
* @return
* @throws Exception
*/
boolean touch(String key, int ttl) throws Exception;

/**
* 自增
*
* @param key 缓存 key
* @param by 自增步长
* @param ttl
* @return
* @throws Exception
*/
long incr(String key, long by, int ttl) throws Exception;

/**
* 删除
*
* @param key
* @return
* @throws Exception
*/
boolean delete(String key) throws Exception;

/**
* 判断 key 是否存在
*
* @param key
* @return
* @throws Exception
*/
boolean exists(String key) throws Exception;

/**
* 设值
*
* @param key key
* @param value 值, 支持对象和基本数据类型
* @param ttl ttl
* @return
*/
boolean setString(String key, String value, int ttl);

/**
* @param key
* @return
*/
String getString(String key);


/**
* 入队
*
* @param key
* @param value
* @return
*/
<T> boolean lpush(String key, T... value) throws Exception;

/**
* 从列表中删除某个元素
*
* @param key
* @param value
* @return
*/
<T> boolean lrem(String key, T value);

/**
* 获取队列成员
*
* @param key
* @param start
* @param end
* @return
*/
List<?> lrange(String key, int start, int end);
}

实现接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
package cn.openmind.cache.impl;

import cn.openmind.conf.AerospikeConf;
import cn.tongdun.kara.biz.service.dal.IKVStorage;
import com.aerospike.client.*;
import com.aerospike.client.cdt.*;
import com.aerospike.client.policy.RecordExistsAction;
import com.aerospike.client.policy.WritePolicy;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Collections;
import java.util.List;

/**
* AspKVStorageImpl
*
* @author zhoujunwen
* @date 2020-09-16 09:52
* @desc
*/
@Slf4j
@Component("aspKVStorage")
public class AspKVStorageImpl implements IKVStorage {
@Autowired
@Setter
private AerospikeClient aerospikeClient;
@Autowired
@Setter
private AerospikeConf aerospikeConf;

// as 的数据集名称,相当于 mysql 表名
private static final String dataSetName = "kara";

private static final String BIN = "data";

private static final Gson gson = new GsonBuilder().excludeFieldsWithoutExposeAnnotation().create();

/**
* 设置 key 的过期时间
*
* @param key 缓存 key
* @param ttl ttl
* @return
* @throws Exception
*/
@Override
public boolean touch(String key, int ttl) throws Exception {
Key k = new Key(aerospikeConf.getNamespace(), dataSetName, key);
try {
WritePolicy wp = new WritePolicy();
wp.expiration = ttl;
wp.setTimeout(aerospikeClient.getWritePolicyDefault().totalTimeout);
aerospikeClient.touch(wp, k);
} catch (AerospikeException e) {
log.warn("it happens error when set ttl {} for key {} ", ttl, key);
throw new Exception(e);
} finally {
}
return true;
}

/**
* 自增
*
* @param key 缓存 key
* @param by 自增步长
* @param ttl
* @return
* @throws Exception
*/
@Override
public long incr(String key, long by, int ttl) throws Exception {
Key k = new Key(aerospikeConf.getNamespace(), dataSetName, key);

long res;

try {
WritePolicy wp = new WritePolicy();
wp.expiration = ttl;
wp.setTimeout(aerospikeClient.getWritePolicyDefault().totalTimeout);
Operation[] opt = new Operation[]{Operation.add(new Bin(BIN, by)), Operation.get(BIN)};

Record record = aerospikeClient.operate(wp, k, opt);
if (record == null) {
return 0L;
}
res = record.getLong(BIN);
} catch (AerospikeException e) {
throw new Exception(e);
} finally {
}

return res;
}

/**
* 删除
*
* @param key
* @return
* @throws Exception
*/
@Override
public boolean delete(String key) throws Exception {
Key k = new Key(aerospikeConf.getNamespace(), dataSetName, key);

boolean res;
try {
res = aerospikeClient.delete(null, k);
} catch (AerospikeException e) {
throw new Exception(e);
} finally {
}

return res;
}

/**
* 判断 key 是否存在
*
* @param key
* @return
*/
@Override
public boolean exists(String key) {
if (StringUtils.isBlank(key)) {
return false;
}
Key k = new Key(aerospikeConf.getNamespace(), dataSetName, key);
return aerospikeClient.exists(null, k);
}

/**
* 设值
*
* @param key key
* @param value 值, 支持对象和普通字符串
* @param ttl ttl
* @return
*/
@Override
public boolean set(String key, Object value, int ttl) {
if (value == null) {
return false;
}
Key k = new Key(aerospikeConf.getNamespace(), dataSetName, key);
Bin bin = new Bin(BIN, value);
WritePolicy wp = new WritePolicy();
wp.expiration = ttl;
wp.maxRetries = 1;
wp.recordExistsAction = RecordExistsAction.REPLACE;
aerospikeClient.put(wp, k, bin);
return true;
}

/**
* @param key
* @return
*/
public String get(String key) {
if (key == null) {
return null;
}

try {
Key k = new Key(aerospikeConf.getNamespace(), dataSetName, key);
Record r = aerospikeClient.get(null, k);
return r != null ? r.getString(BIN) : null;
} catch (Throwable ex) {
throw ex;
}
}

/**
* lpush, push 时去重,支持多个值
*
* @param key
* @param value
* @return
*/
public <T> boolean lpush(String key, T... value) {
if (value == null || value.length == 0) {
return false;
}
WritePolicy policy = new WritePolicy();
Key k = new Key(aerospikeConf.getNamespace(), dataSetName, key);
ListPolicy listPolicy = new ListPolicy(ListOrder.UNORDERED, ListWriteFlags.ADD_UNIQUE);
for (T v : value) {
if (v == null) {
continue;
}
Value val = Value.get(v);
Operation opt = ListOperation.append(listPolicy, BIN, val);
try {
Record rec = aerospikeClient.operate(policy, k, opt);
log.info("asp lpush return record: [{}]", rec);
} catch (AerospikeException e) {
if (e.getResultCode() == ResultCode.ELEMENT_EXISTS) {
log.warn("element [{}] already exists in [{}] key", v, key);
continue;
} else {
return false;
}
} finally {
}
}
return true;
}

@Override
public <T> boolean lrem(String key, T value) {
Key k = new Key(aerospikeConf.getNamespace(), dataSetName, key);
Operation opt = ListOperation.removeByValue(BIN, Value.get(value), ListReturnType.COUNT);
Record r = aerospikeClient.operate(null, k, opt);
int count = r != null ? r.getInt(BIN) : 0;
log.info("asp lrem from key:[{}] remove value:[{}] count:[{}]", key, Value.getAsBlob(value), count);
return true;
}

/**
* 获取队列成员
*
* @param key
* @param start
* @param end
* @return
*/
@Override
public List<?> lrange(String key, int start, int end) {
Key k = new Key(aerospikeConf.getNamespace(), dataSetName, key);
Operation opt = ListOperation.getRange(BIN, start);

Record r = aerospikeClient.operate(null, k, opt);
return r != null ? r.getList(BIN) : Collections.emptyList();
}
}