沪深websocket level2/level1行情推送接入示例

发布于:2024-05-05 ⋅ 阅读:(27) ⋅ 点赞:(0)

行情接入包

golang packge:

package hangqing

import (
	"bufio"
	"bytes"
	"compress/flate"
	"encoding/json"
	"github.com/gorilla/websocket"
	"io/ioutil"
	"log"
	"net/http"
	"net/url"
	"strings"
	"sync"
	"time"
)

type ServerAddrRsp struct {
	Code   string `json:"code"`
	Server string `json:"server"`
}

type Hq struct {
	token    string          //jvQuant Token
	server   string          //websocket服务器地址
	conn     *websocket.Conn //websocket连接
	cmdChan  chan string
	exitChan chan int
	lv1Deal  func(string string) //level1行情处理方法
	lv2Deal  func(string string) //level2行情处理方法
	wg       *sync.WaitGroup
}

//实例初始化
func (hq *Hq) Construct(token, serAddr string, lv1Handle, lv2Handle func(string string)) {
	hq.token = token
	if serAddr == "" {
		hq.server = hq.initServer()
	}
	hq.lv1Deal = lv1Handle
	hq.lv2Deal = lv2Handle
	hq.conn = hq.connect()
	hq.wg = &sync.WaitGroup{}
	hq.cmdChan = make(chan string, 128)
	hq.exitChan = make(chan int)

	//接收协程
	hq.wg.Add(2)
	go func() {
		hq.receive()
		hq.wg.Done()
	}()
	//发送协程
	go func() {
		hq.cmd()
		hq.wg.Done()
	}()
}

//获取行情服务器地址
func (hq *Hq) initServer() (server string) {
	params := url.Values{
		"market": []string{"ab"},
		"type":   []string{"websocket"},
		"token":  []string{hq.token},
	}
	req := "http://jvquant.com/query/server?" + params.Encode()
	rb, err := HttpOnce(req, nil, nil, 3000)
	if err != nil {
		log.Fatalln("获取行情服务器地址失败:", req, err)
	}
	rspMap := ServerAddrRsp{}
	err = json.Unmarshal(rb, &rspMap)
	if err != nil {
		log.Fatalln("解析行情服务器地址失败:", string(rb), err)
	}
	server = rspMap.Server
	if rspMap.Code != "0" || server == "" {
		log.Fatalln("解析行情服务器地址失败:", string(rb))
	}
	log.Println("获取行情服务器地址成功:", server)
	return
}

//连接行情服务器
func (hq Hq) connect() (conn *websocket.Conn) {
	wsUrl := hq.server + "?token=" + hq.token
	conn, _, err := websocket.DefaultDialer.Dial(wsUrl, nil)
	if err != nil {
		log.Fatalln("行情服务器连接错误:", err)
	}
	return
}

//增加level1行情订阅
func (hq Hq) AddLv1(codeArr []string) {
	cmd := "add="
	cmdArr := []string{}
	for _, code := range codeArr {
		cmdArr = append(cmdArr, "lv1_"+code)
	}
	cmd = cmd + strings.Join(cmdArr, ",")
	hq.SendRawCmd(cmd)
}

//增加level2行情订阅
func (hq Hq) AddLv2(codeArr []string) {
	cmd := "add="
	cmdArr := []string{}
	for _, code := range codeArr {
		cmdArr = append(cmdArr, "lv2_"+code)
	}
	cmd = cmd + strings.Join(cmdArr, ",")
	hq.SendRawCmd(cmd)
}

//指令入队列
func (hq Hq) SendRawCmd(cmd string) {
	hq.cmdChan <- cmd
}

//关闭行情连接
func (hq Hq) Close() {
	close(hq.cmdChan)
	hq.exitChan <- 1
	hq.conn.Close()
}

//线程阻塞等待
func (hq Hq) Wait() {
	hq.wg.Wait()
}

//websocket指令发送
func (hq Hq) cmd() {
	for cmd := range hq.cmdChan {
		log.Println("发送指令:" + cmd)
		err := hq.conn.WriteMessage(websocket.TextMessage, []byte(cmd))
		if err != nil {
			log.Println("指令发送错误:", err)
		}
	}
}

//websocket行情接收处理
func (hq Hq) receive() {
	for {
		select {
		case <-hq.exitChan:
			log.Print("接收协程退出")
			return
		default:
			//阻塞接收
			messageType, rb, err := hq.conn.ReadMessage()
			if err != nil {
				log.Print("接收错误:", err)
			}
			//文本消息
			if messageType == websocket.TextMessage {
				log.Println("Text响应:", string(rb))
			}
			//二进制消息
			if messageType == websocket.BinaryMessage {
				unZipByte := DeCompress(rb)
				text := string(unZipByte)
				ex1 := strings.Split(text, "\n")
				for _, ex1r := range ex1 {
					ex2 := strings.Split(ex1r, "=")
					if len(ex2) == 2 {
						code := ex2[0]
						hqs := ex2[1]
						if strings.HasPrefix(code, "lv1_") {
							hq.lv1Deal(hqs)
						}
						if strings.HasPrefix(code, "lv2_") {
							hq.lv2Deal(hqs)
						}
					}
				}
			}
		}
	}
}

//二进制数据解压方法
func DeCompress(b []byte) []byte {
	var buffer bytes.Buffer
	buffer.Write([]byte(b))
	reader := flate.NewReader(&buffer)
	var result bytes.Buffer
	result.ReadFrom(reader)
	reader.Close()
	return result.Bytes()
}
//http请求封装
func HttpOnce(Url string, headers, postData map[string]string, msTimeOut int) (r []byte, err error) {
	client := &http.Client{
		Timeout: time.Duration(time.Duration(msTimeOut) * time.Millisecond),
	}
	method := http.MethodGet
	r = []byte{}
	err = nil
	if len(headers) == 0 {
		headers = map[string]string{}
	}
	if len(postData) != 0 {
		method = http.MethodPost
		headers["Content-Type"] = "application/x-www-form-urlencoded"
	}

	postParam := url.Values{}
	for k, v := range postData {
		postParam.Set(k, v)
	}
	postParamBuff := bytes.NewBufferString(postParam.Encode())
	req, err := http.NewRequest(method, Url, postParamBuff)

	if err != nil {
		return r, err
	}
	for k, v := range headers {
		req.Header.Add(k, v)
	}
	resp, er := client.Do(req)
	if er != nil {
		err = er
		return
	}
	defer resp.Body.Close()
	if err != nil {
		return r, err
	}
	br := bufio.NewReader(resp.Body)
	r, err = ioutil.ReadAll(br)
	return r, err
}

参考地址:https://github.com/jvQuant/OpenAPIDemo


网站公告

今日签到

点亮在社区的每一天
去签到