文章目录
实现长连接负载均衡策略
tcp服务实例注册到了zk,现在sdk需要获取到tcp服务的地址。
注册服务到zookeeper
首先,在tcp服务一启动的时候,就会把自己的ip地址(通过jdk的InetAddress获取本机ip地址)和启动端口(配置文件的形式)注册到zookeeper中
RegistryZK
public class RegistryZK implements Runnable {
private static Logger logger = LoggerFactory.getLogger(RegistryZK.class);
private ZKit zKit;
private String ip;
private BootstrapConfig.TcpConfig tcpConfig;
public RegistryZK(ZKit zKit, String ip, BootstrapConfig.TcpConfig tcpConfig) {
this.zKit = zKit;
this.ip = ip;
this.tcpConfig = tcpConfig;
}
@Override
public void run() {
// 创建3个节点: /im-coreRoot、/im-coreRoot/tcp、/im-coreRoot/web
zKit.createRootNode();
// 创建节点: /im-coreRoot/tcp/{ip}:{tcpPort}
String tcpPath = Constants.ImCoreZkRoot + Constants.ImCoreZkRootTcp + "/" + ip + ":" + tcpConfig.getTcpPort();
zKit.createNode(tcpPath);
logger.info("Registry zookeeper tcpPath success, msg=[{}]", tcpPath);
// 创建节点: /im-coreRoot/web/{ip}:{webSocketPort}
String webPath = Constants.ImCoreZkRoot + Constants.ImCoreZkRootWeb + "/" + ip + ":" + tcpConfig.getWebSocketPort();
zKit.createNode(webPath);
logger.info("Registry zookeeper webPath success, msg=[{}]", tcpPath);
}
}
Starter#registerZK(bootstrapConfig)
启动类中调用RegistryZK,需要引入com.github.sgroschupf / zkclient / 0.1
依赖,org.yaml / snakeyaml / 1.27
依赖
public class Starter {
public static void main(String[] args) throws UnknownHostException {
if(args.length > 0){
start(args[0]);
}
}
private static void start(String path){
try {
Yaml yaml = new Yaml();
InputStream inputStream = new FileInputStream(path);
BootstrapConfig bootstrapConfig = yaml.loadAs(inputStream, BootstrapConfig.class);
new LimServer(bootstrapConfig.getLim()).start();
new LimWebSocketServer(bootstrapConfig.getLim()).start();
// 初始化redisson配置
RedisManager.init(bootstrapConfig);
// 初始化rabbitmq配置
MqFactory.init(bootstrapConfig.getLim().getRabbitmq());
MessageReceiver.init(bootstrapConfig.getLim().getBrokerId()+"");
// 将tcp服务地址和端口号、web服务地址和端口号 注册到zookeeper
registerZK(bootstrapConfig);
}catch (Exception e){
e.printStackTrace();
System.exit(500);
}
}
public static void registerZK(BootstrapConfig config) throws UnknownHostException {
// jdk的InetAddress获取本机ip地址
String hostAddress = InetAddress.getLocalHost().getHostAddress();
// 使用zk的jar包中的类创建zk客户端, 参数为: zookeeper服务的地址、连接超时时间
ZkClient zkClient = new ZkClient(config.getLim().getZkConfig().getZkAddr(), config.getLim().getZkConfig().getZkConnectTimeOut());
// 使用创建的zk客户端来填充ZKit对象
ZKit zKit = new ZKit(zkClient);
// 其实就是 开个线程 使用jdk拿到的主机地址 到zk 中创建 对应的节点
RegistryZK registryZK = new RegistryZK(zKit, hostAddress, config.getLim());
Thread thread = new Thread(registryZK);
thread.start();
}
}
路由策略-RouteHandle接口
当从zookeeper中获取到服务的地址后,需要从这些地址中选出1个地址,返回给客户端使用,因此抽象出1个RouteHandle 接口,定义如下
public interface RouteHandle {
String routeServer(List<String> values, String key);
}
随机-RandomHandle
使用随机数返回
public class RandomHandle implements RouteHandle {
@Override
public String routeServer(List<String> values, String key) {
int size = values.size();
if(size == 0){
throw new ApplicationException(UserErrorCode.SERVER_NOT_AVAILABLE);
}
int i = ThreadLocalRandom.current().nextInt(size);
return values.get(i);
}
}
轮询-LoopHandle
轮询返回
public class LoopHandle implements RouteHandle {
private AtomicLong index = new AtomicLong();
@Override
public String routeServer(List<String> values, String key) {
int size = values.size();
if(size == 0){
throw new ApplicationException(UserErrorCode.SERVER_NOT_AVAILABLE);
}
Long l = index.incrementAndGet() % size;
if(l < 0){
l = 0L;
}
return values.get(l.intValue());
}
}
一致性哈希- ConsistentHashHandle
委托给AbstractConsistentHash去实现,方便拓展不同的实现
public class ConsistentHashHandle implements RouteHandle {
//TreeMap
private AbstractConsistentHash hash;
public void setHash(AbstractConsistentHash hash) {
this.hash = hash;
}
@Override
public String routeServer(List<String> values, String key) {
return hash.process(values,key);
}
}
TreeMapConsistentHash
public class TreeMapConsistentHash extends AbstractConsistentHash {
private TreeMap<Long,String> treeMap = new TreeMap<>();
private static final int NODE_SIZE = 2;
@Override
protected void add(long key, String value) {
// 添加虚拟节点, 防止一致性hash节点分配不均
for (int i = 0; i < NODE_SIZE; i++) {
Long hash = super.hash("node" + key + i);
treeMap.put(hash, value);
}
treeMap.put(key,value);
}
@Override
protected String getFirstNodeValue(String value) {
Long hash = super.hash(value);
SortedMap<Long, String> last = treeMap.tailMap(hash);
if(!last.isEmpty()){
return last.get(last.firstKey());
}
if (treeMap.size() == 0){
throw new ApplicationException(UserErrorCode.SERVER_NOT_AVAILABLE) ;
}
return treeMap.firstEntry().getValue();
}
@Override
protected void processBefore() {
treeMap.clear();
}
}
配置
配置文件相关
appConfig:
appId: 10000
privateKey: 123456
zkAddr: 127.0.0.1:2181 # zk连接地址
zkConnectTimeOut: 50000 #zk超时时间
imRouteWay: 1 # 路由策略1轮训 2随机 3hash
consistentHashWay: 1 # 如果选用一致性hash的话具体hash算法 1 TreeMap 2 自定义Map
tcpPort: 9000 # tcp端口
webSocketPort: 19000 # webSocket端口
needWebSocket: true #是否需要开启webSocket
loginModel: 1
messageRecallTimeOut : 1200000000 #消息可撤回时间,单位毫秒
BeanConfig
@Configuration
public class BeanConfig {
@Autowired
AppConfig appConfig;
@Bean
public ZkClient buildZKClient() {
return new ZkClient(appConfig.getZkAddr(), appConfig.getZkConnectTimeOut());
}
/* 根据定义的枚举, 去创建不同的实现 */
@Bean
public RouteHandle routeHandle() throws Exception {
Integer imRouteWay = appConfig.getImRouteWay();
String routWay = "";
ImUrlRouteWayEnum handler = ImUrlRouteWayEnum.getHandler(imRouteWay);
routWay = handler.getClazz();
RouteHandle routeHandle = (RouteHandle) Class.forName(routWay).newInstance();
if(handler == ImUrlRouteWayEnum.HASH){
Method setHash = Class.forName(routWay).getMethod("setHash", AbstractConsistentHash.class);
Integer consistentHashWay = appConfig.getConsistentHashWay();
String hashWay = "";
RouteHashMethodEnum hashHandler = RouteHashMethodEnum.getHandler(consistentHashWay);
hashWay = hashHandler.getClazz();
AbstractConsistentHash consistentHash = (AbstractConsistentHash) Class.forName(hashWay).newInstance();
setHash.invoke(routeHandle,consistentHash);
}
return routeHandle;
}
@Bean
public EasySqlInjector easySqlInjector () {
return new EasySqlInjector();
}
@Bean
public SnowflakeIdWorker buildSnowflakeSeq() {
return new SnowflakeIdWorker(0);
}
}
获取服务地址接口
/**
*
* im的登录接口,返回im地址
*/
@RequestMapping("/login")
public ResponseVO login(@RequestBody @Validated LoginReq req, Integer appId) {
req.setAppId(appId);
ResponseVO login = imUserService.login(req);
if (login.isOk()) {
List<String> allNode;
if (req.getClientType() == ClientType.WEB.getCode()) {
allNode = zKit.getAllWebNode();
} else {
allNode = zKit.getAllTcpNode();
}
String s = routeHandle.routeServer(allNode, req .getUserId());
RouteInfo parse = RouteInfoParseUtil.parse(s);
return ResponseVO.successResponse(parse);
}
return ResponseVO.errorResponse();
}
Zkit
@Component
public class ZKit {
private static Logger logger = LoggerFactory.getLogger(ZKit.class);
@Autowired
private ZkClient zkClient;
/**
* get all TCP server node from zookeeper
*
* @return
*/
public List<String> getAllTcpNode() {
// 获取指定路径: /im-coreRoot/tcp 下的子节点
// tcp服务启动时, 会向zookeeper中注册
List<String> children = zkClient.getChildren(Constants.ImCoreZkRoot + Constants.ImCoreZkRootTcp);
return children;
}
/**
* get all WEB server node from zookeeper
*
* @return
*/
public List<String> getAllWebNode() {
// 获取指定路径: /im-coreRoot/web 下的子节点
List<String> children = zkClient.getChildren(Constants.ImCoreZkRoot + Constants.ImCoreZkRootWeb);
return children;
}
}
业务回调
准备工作
引入依赖org.apache.httpcomponents / httpclient / 4.5.9
配置GlobalHttpClientConfig
@Configuration
@ConfigurationProperties(prefix = "httpclient")
public class GlobalHttpClientConfig {
private Integer maxTotal; // 最大连接数
private Integer defaultMaxPerRoute; // 最大并发链接数
private Integer connectTimeout; // 创建链接的最大时间
private Integer connectionRequestTimeout; // 链接获取超时时间
private Integer socketTimeout; // 数据传输最长时间
private boolean staleConnectionCheckEnabled; // 提交时检查链接是否可用
PoolingHttpClientConnectionManager manager = null;
HttpClientBuilder httpClientBuilder = null;
// 定义httpClient链接池
@Bean(name = "httpClientConnectionManager")
public PoolingHttpClientConnectionManager getPoolingHttpClientConnectionManager() {
return getManager();
}
private PoolingHttpClientConnectionManager getManager() {
if (manager != null) {
return manager;
}
manager = new PoolingHttpClientConnectionManager();
manager.setMaxTotal(maxTotal); // 设定最大链接数
manager.setDefaultMaxPerRoute(defaultMaxPerRoute); // 设定并发链接数
return manager;
}
/**
* 实例化连接池,设置连接池管理器。 这里需要以参数形式注入上面实例化的连接池管理器
*
* @Qualifier 指定bean标签进行注入
*/
@Bean(name = "httpClientBuilder")
public HttpClientBuilder getHttpClientBuilder(@Qualifier("httpClientConnectionManager") PoolingHttpClientConnectionManager httpClientConnectionManager) {
// HttpClientBuilder中的构造方法被protected修饰,所以这里不能直接使用new来实例化一个HttpClientBuilder,可以使用HttpClientBuilder提供的静态方法create()来获取HttpClientBuilder对象
httpClientBuilder = HttpClientBuilder.create();
httpClientBuilder.setConnectionManager(httpClientConnectionManager);
return httpClientBuilder;
}
/**
* 注入连接池,用于获取httpClient
*
* @param httpClientBuilder
* @return
*/
@Bean
public CloseableHttpClient getCloseableHttpClient(@Qualifier("httpClientBuilder") HttpClientBuilder httpClientBuilder) {
return httpClientBuilder.build();
}
public CloseableHttpClient getCloseableHttpClient() {
if (httpClientBuilder != null) {
return httpClientBuilder.build();
}
httpClientBuilder = HttpClientBuilder.create();
httpClientBuilder.setConnectionManager(getManager());
return httpClientBuilder.build();
}
/**
* Builder是RequestConfig的一个内部类 通过RequestConfig的custom方法来获取到一个Builder对象
* 设置builder的连接信息
*
* @return
*/
@Bean(name = "builder")
public RequestConfig.Builder getBuilder() {
RequestConfig.Builder builder = RequestConfig.custom();
return builder
.setConnectTimeout(connectTimeout)
.setConnectionRequestTimeout(connectionRequestTimeout)
.setSocketTimeout(socketTimeout)
.setStaleConnectionCheckEnabled(staleConnectionCheckEnabled);
}
/**
* 使用builder构建一个RequestConfig对象
*
* @param builder
* @return
*/
@Bean
public RequestConfig getRequestConfig(@Qualifier("builder") RequestConfig.Builder builder) {
return builder.build();
}
public Integer getMaxTotal() {
return maxTotal;
}
public void setMaxTotal(Integer maxTotal) {
this.maxTotal = maxTotal;
}
public Integer getDefaultMaxPerRoute() {
return defaultMaxPerRoute;
}
public void setDefaultMaxPerRoute(Integer defaultMaxPerRoute) {
this.defaultMaxPerRoute = defaultMaxPerRoute;
}
public Integer getConnectTimeout() {
return connectTimeout;
}
public void setConnectTimeout(Integer connectTimeout) {
this.connectTimeout = connectTimeout;
}
public Integer getConnectionRequestTimeout() {
return connectionRequestTimeout;
}
public void setConnectionRequestTimeout(Integer connectionRequestTimeout) {
this.connectionRequestTimeout = connectionRequestTimeout;
}
public Integer getSocketTimeout() {
return socketTimeout;
}
public void setSocketTimeout(Integer socketTimeout) {
this.socketTimeout = socketTimeout;
}
public boolean isStaleConnectionCheckEnabled() {
return staleConnectionCheckEnabled;
}
public void setStaleConnectionCheckEnabled(boolean staleConnectionCheckEnabled) {
this.staleConnectionCheckEnabled = staleConnectionCheckEnabled;
}
}
HttpRequestUtils
@Component
public class HttpRequestUtils {
@Autowired
private CloseableHttpClient httpClient;
@Autowired
private RequestConfig requestConfig;
@Autowired
GlobalHttpClientConfig httpClientConfig;
public String doGet(String url, Map<String, Object> params, String charset) throws Exception {
return doGet(url,params,null,charset);
}
/**
* 通过给的url地址,获取服务器数据
*
* @param url 服务器地址
* @param params 封装用户参数
* @param charset 设定字符编码
* @return
*/
public String doGet(String url, Map<String, Object> params, Map<String, Object> header, String charset) throws Exception {
if (StringUtils.isEmpty(charset)) {
charset = "utf-8";
}
URIBuilder uriBuilder = new URIBuilder(url);
// 判断是否有参数
if (params != null) {
// 遍历map,拼接请求参数
for (Map.Entry<String, Object> entry : params.entrySet()) {
uriBuilder.setParameter(entry.getKey(), entry.getValue().toString());
}
}
// 声明 http get 请求
HttpGet httpGet = new HttpGet(uriBuilder.build());
httpGet.setConfig(requestConfig);
if (header != null) {
// 遍历map,拼接header参数
for (Map.Entry<String, Object> entry : header.entrySet()) {
httpGet.addHeader(entry.getKey(),entry.getValue().toString());
}
}
String result = "";
try {
// 发起请求
CloseableHttpResponse response = httpClient.execute(httpGet);
// 判断状态码是否为200
if (response.getStatusLine().getStatusCode() == 200) {
// 返回响应体的内容
result = EntityUtils.toString(response.getEntity(), charset);
}
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
return result;
}
/**
* GET请求, 含URL 参数
*
* @param url
* @param params
* @return 如果状态码为200,则返回body,如果不为200,则返回null
* @throws Exception
*/
public String doGet(String url, Map<String, Object> params) throws Exception {
return doGet(url, params, null);
}
/**
* GET 请求,不含URL参数
*
* @param url
* @return
* @throws Exception
*/
public String doGet(String url) throws Exception {
return doGet(url, null, null);
}
public String doPost(String url, Map<String, Object> params, String jsonBody, String charset) throws Exception {
return doPost(url,params,null,jsonBody,charset);
}
/**
* 带参数的post请求
*
* @param url
* @return
* @throws Exception
*/
public String doPost(String url, Map<String, Object> params, Map<String, Object> header, String jsonBody, String charset) throws Exception {
if (StringUtils.isEmpty(charset)) {
charset = "utf-8";
}
URIBuilder uriBuilder = new URIBuilder(url);
// 判断是否有参数
if (params != null) {
// 遍历map,拼接请求参数
for (Map.Entry<String, Object> entry : params.entrySet()) {
uriBuilder.setParameter(entry.getKey(), entry.getValue().toString());
}
}
// 声明httpPost请求
HttpPost httpPost = new HttpPost(uriBuilder.build());
// 加入配置信息
httpPost.setConfig(requestConfig);
// 判断map是否为空,不为空则进行遍历,封装from表单对象
if (StringUtils.isNotEmpty(jsonBody)) {
StringEntity s = new StringEntity(jsonBody, charset);
s.setContentEncoding(charset);
s.setContentType("application/json");
// 把json body放到post里
httpPost.setEntity(s);
}
if (header != null) {
// 遍历map,拼接header参数
for (Map.Entry<String, Object> entry : header.entrySet()) {
httpPost.addHeader(entry.getKey(),entry.getValue().toString());
}
}
String result = "";
// CloseableHttpClient httpClient = HttpClients.createDefault(); // 单个
CloseableHttpResponse response = null;
try {
// 发起请求
response = httpClient.execute(httpPost);
// 判断状态码是否为200
if (response.getStatusLine().getStatusCode() == 200) {
// 返回响应体的内容
result = EntityUtils.toString(response.getEntity(), charset);
}
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
return result;
}
/**
* 不带参数post请求
* @param url
* @return
* @throws Exception
*/
public String doPost(String url) throws Exception {
return doPost(url, null,null,null);
}
/**
* get 方法调用的通用方式
* @param url
* @param tClass
* @param map
* @param charSet
* @return
* @throws Exception
*/
public <T> T doGet(String url, Class<T> tClass, Map<String, Object> map, String charSet) throws Exception {
String result = doGet(url, map, charSet);
if (StringUtils.isNotEmpty(result))
return JSON.parseObject(result, tClass);
return null;
}
/**
* get 方法调用的通用方式
* @param url
* @param tClass
* @param map
* @param charSet
* @return
* @throws Exception
*/
public <T> T doGet(String url, Class<T> tClass, Map<String, Object> map, Map<String, Object> header, String charSet) throws Exception {
String result = doGet(url, map, header, charSet);
if (StringUtils.isNotEmpty(result))
return JSON.parseObject(result, tClass);
return null;
}
/**
* post 方法调用的通用方式
* @param url
* @param tClass
* @param map
* @param jsonBody
* @param charSet
* @return
* @throws Exception
*/
public <T> T doPost(String url, Class<T> tClass, Map<String, Object> map, String jsonBody, String charSet) throws Exception {
String result = doPost(url, map,jsonBody,charSet);
if (StringUtils.isNotEmpty(result))
return JSON.parseObject(result, tClass);
return null;
}
public <T> T doPost(String url, Class<T> tClass, Map<String, Object> map, Map<String, Object> header, String jsonBody, String charSet) throws Exception {
String result = doPost(url, map, header,jsonBody,charSet);
if (StringUtils.isNotEmpty(result))
return JSON.parseObject(result, tClass);
return null;
}
/**
* post 方法调用的通用方式
* @param url
* @param map
* @param jsonBody
* @param charSet
* @return
* @throws Exception
*/
public String doPostString(String url, Map<String, Object> map, String jsonBody, String charSet) throws Exception {
return doPost(url, map,jsonBody,charSet);
}
/**
* post 方法调用的通用方式
* @param url
* @param map
* @param jsonBody
* @param charSet
* @return
* @throws Exception
*/
public String doPostString(String url, Map<String, Object> map, Map<String, Object> header, String jsonBody, String charSet) throws Exception {
return doPost(url, map, header, jsonBody,charSet);
}
}
CallbackService
@Component
public class CallbackService {
private Logger logger = LoggerFactory.getLogger(CallbackService.class);
@Autowired
HttpRequestUtils httpRequestUtils;
@Autowired
AppConfig appConfig;
@Autowired
ShareThreadPool shareThreadPool;
// 回调方法
public void callback(Integer appId, String callbackCommand, String jsonBody) {
// 使用线程池异步的方式去做
shareThreadPool.submit(() -> {
try {
httpRequestUtils.doPost(
appConfig.getCallbackUrl(),
Object.class,
builderUrlParams(appId, callbackCommand),
jsonBody,
null
);
} catch (Exception e) {
logger.error("callback 回调{} : {}出现异常 : {} ", callbackCommand, appId, e.getMessage());
}
});
}
// 前置回调方法
public ResponseVO beforeCallback(Integer appId, String callbackCommand, String jsonBody) {
try {
// 同步去调用
ResponseVO responseVO = httpRequestUtils.doPost(
"",
ResponseVO.class,
builderUrlParams(appId, callbackCommand),
jsonBody,
null
);
return responseVO;
} catch (Exception e) {
logger.error("callback 之前 回调{} : {}出现异常 : {} ", callbackCommand, appId, e.getMessage());
return ResponseVO.successResponse();
}
}
public Map builderUrlParams(Integer appId, String command) {
Map map = new HashMap();
map.put("appId", appId);
map.put("command", command);
return map;
}
}
回调
这里所谓的回调,允许用户配置1个回调地址,在业务操作的时候(可以在业务操作之前,或业务操作完成之后),会主动发送1个请求,并携带业务操作命令,业务请求参数到指定的回调地址上,从而让用户知道该业务操作是否完成或修改该业务操作参数。
用户资料变更回调
@Override
@Transactional
public ResponseVO modifyUserInfo(ModifyUserInfoReq req) {
QueryWrapper query = new QueryWrapper<>();
query.eq("app_id", req.getAppId());
query.eq("user_id", req.getUserId());
query.eq("del_flag", DelFlagEnum.NORMAL.getCode());
ImUserDataEntity user = imUserDataMapper.selectOne(query);
if (user == null) {
throw new ApplicationException(UserErrorCode.USER_IS_NOT_EXIST);
}
ImUserDataEntity update = new ImUserDataEntity();
BeanUtils.copyProperties(req, update);
update.setAppId(null);
update.setUserId(null);
int update1 = imUserDataMapper.update(update, query);
if (update1 == 1) {
UserModifyPack pack = new UserModifyPack();
BeanUtils.copyProperties(req, pack);
messageProducer.sendToUser(
req.getUserId(),
req.getClientType(),
req.getImei(),
UserEventCommand.USER_MODIFY,
pack, req.getAppId()
);
// 用户资料变更之后回调开关 是否开启
// 如果开启了的话, 在变更完用户信息成功之后, 向指定的url发起回调请求
if (appConfig.isModifyUserAfterCallback()) {
callbackService.callback(
req.getAppId(),
// user.modify.after
Constants.CallbackCommand.ModifyUserAfter,
JSONObject.toJSONString(req)
);
}
return ResponseVO.successResponse();
}
throw new ApplicationException(UserErrorCode.MODIFY_USER_ERROR);
}
好友模块回调
略
群组模块回调
略
本文含有隐藏内容,请 开通VIP 后查看