OKEX之websocket客户端java实现
符号
阅读:773
2021-04-01 11:16:07
评论:0
1.创件配置类,需要配置代理
public class OKExWebSocketConfig {
// okex webSocket url
private static final String SERVICE_URL = "wss://real.okex.com:10442/ws/v3?_compress=false";
// api key
private static final String API_KEY = "**";
// secret key
private static final String SECRET_KEY = "**";
// passphrase
private static final String PASSPHRASE = "**";
//代理IP
public static final String PROXY_IP = "192.*.*.*";
//代理端口
public static final int PROXY_PORT = **;
//是否使用代理
public static final boolean USE_PROXY = true;
public static void publicConnect(OKExWebSocketClient webSocketClient) {
webSocketClient.connection(SERVICE_URL);
}
public static void loginConnect(OKExWebSocketClient webSocketClient) {
//与服务器建立连接
webSocketClient.connection(SERVICE_URL);
//登录账号,用户需提供 api-key,passphrase,secret—key 不要随意透漏 ^_^
webSocketClient.login(API_KEY , PASSPHRASE , SECRET_KEY);
}
}
2.创建客户端类
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Proxy;
import java.util.Base64;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;
import okio.ByteString;
import org.apache.commons.compress.compressors.deflate64.Deflate64CompressorInputStream;
import org.apache.commons.lang3.time.DateFormatUtils;
import bw.okcoin.api.rest.spot.bean.SpotDepthItem;
import bw.okcoin.api.util.CharsetEnum;
import bw.okcoin.api.util.DateUtils;
import bw.okcoin.api.websocket.OKExWebSocketConfig;
import com.alibaba.fastjson.JSONArray;
public class OKExWebSocketClient {
private static WebSocket webSocket = null;
private static Boolean isLogin = false;
private static Boolean isConnect = false;
/**
* 解压函数 Decompression function
*
* @param bytes
* @return
*/
private static String uncompress(final byte[] bytes) {
try (final ByteArrayOutputStream out = new ByteArrayOutputStream();
final ByteArrayInputStream in = new ByteArrayInputStream(bytes);
final Deflate64CompressorInputStream zin = new Deflate64CompressorInputStream(in)) {
final byte[] buffer = new byte[1024];
int offset;
while (-1 != (offset = zin.read(buffer))) {
out.write(buffer, 0, offset);
}
return out.toString();
} catch (final IOException e) {
throw new RuntimeException(e);
}
}
/**
* 与服务器建立连接,参数为服务器的URL connect server
*
* @param url
*/
public void connection(final String url) {
final OkHttpClient client;
if (OKExWebSocketConfig.USE_PROXY) {
client = new OkHttpClient.Builder().readTimeout(5, TimeUnit.SECONDS)
.proxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress(OKExWebSocketConfig.PROXY_IP, OKExWebSocketConfig.PROXY_PORT))) // 增加代理
.build();
} else {
client = new OkHttpClient.Builder().readTimeout(5, TimeUnit.SECONDS).build();
}
final Request request = new Request.Builder().url(url).build();
webSocket = client.newWebSocket(request, new WebSocketListener() {
@Override
public void onOpen(final WebSocket webSocket, final Response response) {
isConnect = true;
System.out.println(DateFormatUtils.format(new Date(), DateUtils.TIME_STYLE_S4) + " Connected to the server success!");
// 连接成功后,设置定时器,每隔25,自动向服务器发送心跳,保持与服务器连接
final Runnable runnable = new Runnable() {
@Override
public void run() {
// task to run goes here
sendMessage("ping");
}
};
final ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
// 第二个参数为首次执行的延时时间,第三个参数为定时执行的间隔时间
service.scheduleAtFixedRate(runnable, 25, 25, TimeUnit.SECONDS);
}
@Override
public void onMessage(final WebSocket webSocket, final String s) {
System.out.println(DateFormatUtils.format(new Date(), DateUtils.TIME_STYLE_S4) + " Receive: " + s);
if (null != s && s.contains("login")) {
if (s.endsWith("true}")) {
isLogin = true;
}
}
}
@Override
public void onClosing(WebSocket webSocket, final int code, final String reason) {
System.out.println(DateFormatUtils.format(new Date(), DateUtils.TIME_STYLE_S4) + " Connection is disconnected !!!");
webSocket.close(1000, "Long time not to send and receive messages! ");
webSocket = null;
isConnect = false;
}
@Override
public void onClosed(final WebSocket webSocket, final int code, final String reason) {
System.out.println("Connection has been disconnected.");
isConnect = false;
}
@Override
public void onFailure(final WebSocket webSocket, final Throwable t, final Response response) {
t.printStackTrace();
System.out.println("Connection failed!");
isConnect = false;
}
@Override
public void onMessage(final WebSocket webSocket, final ByteString bytes) {
final String s = OKExWebSocketClient.uncompress(bytes.toByteArray());
System.out.println(DateFormatUtils.format(new Date(), DateUtils.TIME_STYLE_S4) + " Receive: " + s);
}
});
}
/**
* 获得sign sign
*
* @param message
* @param secret
* @return
*/
private String sha256_HMAC(final String message, final String secret) {
String hash = "";
try {
final Mac sha256_HMAC = Mac.getInstance("HmacSHA256");
final SecretKeySpec secret_key = new SecretKeySpec(secret.getBytes(CharsetEnum.UTF_8.charset()), "HmacSHA256");
sha256_HMAC.init(secret_key);
final byte[] bytes = sha256_HMAC.doFinal(message.getBytes(CharsetEnum.UTF_8.charset()));
hash = Base64.getEncoder().encodeToString(bytes);
} catch (final Exception e) {
System.out.println(DateFormatUtils.format(new Date(), DateUtils.TIME_STYLE_S4) + "Error HmacSHA256 ===========" + e.getMessage());
}
return hash;
}
/**
* @param list
* @return
*/
private String listToJson(final List<String> list) {
final JSONArray jsonArray = new JSONArray();
jsonArray.addAll(list);
return jsonArray.toJSONString();
}
/**
* 登录 login
*
* @param apiKey
* @param passPhrase
* @param secretKey
*/
public void login(final String apiKey, final String passPhrase, final String secretKey) {
final String timestamp = (Double.parseDouble(DateUtils.getEpochTime()) + 28800) + "";
final String message = timestamp + "GET" + "/users/self/verify";
final String sign = sha256_HMAC(message, secretKey);
final String str = "{\"op\"" + ":" + "\"login\"" + "," + "\"args\"" + ":" + "[" + "\"" + apiKey + "\"" + "," + "\"" + passPhrase + "\"" + "," + "\""
+ timestamp + "\"" + "," + "\"" + sign + "\"" + "]}";
sendMessage(str);
}
/**
* 订阅,参数为频道组成的集合 Bulk Subscription
*
* @param list
*/
public void subscribe(final List<String> list) {
final String s = listToJson(list);
final String str = "{\"op\": \"subscribe\", \"args\":" + s + "}";
sendMessage(str);
}
/**
* 取消订阅,参数为频道组成的集合 unsubscribe
*
* @param list
*/
public void unsubscribe(final List<String> list) {
final String s = listToJson(list);
final String str = "{\"op\": \"unsubscribe\", \"args\":" + s + "}";
sendMessage(str);
}
private void sendMessage(final String str) {
if (null != webSocket) {
try {
Thread.sleep(1000);
} catch (final Exception e) {
e.printStackTrace();
}
System.out.println(DateFormatUtils.format(new Date(), DateUtils.TIME_STYLE_S4) + " Send: " + str);
if (isConnect) {
webSocket.send(str);
return;
}
}
System.out.println(DateFormatUtils.format(new Date(), DateUtils.TIME_STYLE_S4) + " Please establish a connection before operation !!!");
}
/**
* 断开连接 Close Connection
*/
public void closeConnection() {
if (null != webSocket) {
webSocket.close(1000, "User close connect !!!");
} else {
System.out.println(DateFormatUtils.format(new Date(), DateUtils.TIME_STYLE_S4) + " Please establish a connection before operation !!!");
}
isConnect = false;
}
public boolean getIsLogin() {
return isLogin;
}
public boolean getIsConnect() {
return isConnect;
}
}
3.测试订阅websocket
public class OKExTest {
private static final OKExWebSocketClient webSocketClient = new OKExWebSocketClient();
@Before
public void connect() {
OKExWebSocketConfig.publicConnect(webSocketClient);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Test
public void wsTest() {
//创建一个list集合,添加要订阅的频道名称
final ArrayList<String> list = new ArrayList<>();
list.add("spot/ticker:BTC-USDT");
// list.add("spot/ticker:ETH-USDT");
// list.add("spot/candle60s:BTC-USDT");
// list.add("spot/candle180s:ETH-USDT");
// list.add("spot/depth5:BTC-USDT");
// list.add("spot/depth5:ETH-USDT");
webSocketClient.subscribe(list);
//为保证测试方法不停,需要让线程延迟
try {
Thread.sleep(10000000);
} catch (final Exception e) {
e.printStackTrace();
}
while(true){
}
}
}
4.输出结果
声明
1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。