CTP接口封装

  • 简介
  • 一、安装
  • 二、CtpPlus文件介绍
  • 三.应用范例
  • 1.配置文件:FutureAccount.py
  • 2.获取实时行情和交易接口测试
  • 1.1获取实时行情
  • 1.2 交易接口测试
  • 总结

  • 简介

    CtpPlus是上期技术CTP API的Python封装,具有以下特点:

  • 易使用:Python语言,结构清晰,注释完整,文档详尽。
  • 低延时:基于Cython释放GIL;支持多路行情源;无需主事件引擎,实现去中心化。
  • 忠实于CTP官方特性:充分利用CTP的异步、多线程特性。

  • CtpPlus开源地址:https://gitee.com/syealfalfa/CtpPlus-master


    CTP上期技术官网下载地址:
    api接口和chm开发文档:http://www.sfit.com.cn/5_2_DocumentDown_2.htm

    一、安装

    首先配置Anaconda环境,
    以下为安装方法:
    请添加图片描述
    项目中已提供了python3.8和python3.10的window安装包,cd到对应的安装包路径即可

    然后使用pip命令安装:

    pip install CtpPlus-1.0-cp38-cp38-win_amd64.whl
    

    二、CtpPlus文件介绍

  • api:存放的是Linux和windows版本CTP链接库
  • c2cython:实现CTP回调函数功能
  • cython2c:使用cython调用CTP接口
  • MdApiBase.py:封装后的CTP行情接口
  • TraderApiBase.py:封装后的CTP交易接口
  • MdApi.py和TraderApi.py:接口案例文件
  • ApiConst.py:对应CTP的ThostFtdcUserApiDataType.h中的数据类型
  • ApiStruct.py:对应CTP的ThostFtdcUserApiStruct.h中的数据结构
  • FutureAccount.py:配置文件
  • 三.应用范例

    1.配置文件:FutureAccount.py

    代码如下(示例):

    # encoding:utf-8
    
    import os
    
    BASE_LOCATION = "./log"
    MD_LOCATION = BASE_LOCATION
    TD_LOCATION = BASE_LOCATION
    
    SIMULATE_SERVER = {
        '电信1': {'BrokerID': 9999, 'TDServer': "180.168.146.187:10201", 'MDServer': '180.168.146.187:10211', 'AppID': 'simnow_client_test', 'AuthCode': '0000000000000000'},
        '电信2': {'BrokerID': 9999, 'TDServer': "180.168.146.187:10202", 'MDServer': '180.168.146.187:10212', 'AppID': 'simnow_client_test', 'AuthCode': '0000000000000000'},
        '移动': {'BrokerID': 9999, 'TDServer': "218.202.237.33:10203", 'MDServer': '218.202.237.33:10213', 'AppID': 'simnow_client_test', 'AuthCode': '0000000000000000'},
        'TEST': {'BrokerID': 9999, 'TDServer': "180.168.146.187:10130", 'MDServer': '180.168.146.187:10131', 'AppID': 'simnow_client_test', 'AuthCode': '0000000000000000'},
        'N视界': {'BrokerID': 10010, 'TDServer': "210.14.72.12:4600", 'MDServer': '210.14.72.12:4602', 'AppID': '', 'AuthCode': ''},
    }
    
    
    class FutureAccount:
        def __init__(self, broker_id, server_dict, reserve_server_dict, investor_id, password, app_id, auth_code, subscribe_list, md_flow_path=MD_LOCATION, td_flow_path=TD_LOCATION):
            self.broker_id = broker_id  # 期货公司BrokerID
            self.server_dict = server_dict  # 登录的服务器地址
            self.reserve_server_dict = reserve_server_dict  # 备用服务器地址
            self.investor_id = investor_id  # 账户
            self.password = password  # 密码
            self.app_id = app_id  # 认证使用AppID
            self.auth_code = auth_code  # 认证使用授权码
            self.subscribe_list = subscribe_list  # 订阅合约列表[]
            self.md_flow_path = md_flow_path  # MdApi流文件存储地址,默认MD_LOCATION
            self.td_flow_path = td_flow_path  # TraderApi流文件存储地址,默认TD_LOCATION
    
    
    def get_simulate_account(investor_id, password, subscribe_list=None, server_name='电信1', md_flow_path=MD_LOCATION, td_flow_path=TD_LOCATION):
        if server_name not in SIMULATE_SERVER.keys():
            print(f'{server_name}不在可选列表[电信1, 电信2, 移动, TEST]中,默认使用电信1。')
            server_name = '电信1'
    
        if subscribe_list is None:
            subscribe_list = []
    
        investor_id = investor_id if isinstance(investor_id, bytes) else investor_id.encode(encoding='utf-8')
        password = password if isinstance(password, bytes) else password.encode(encoding='utf-8')
        return FutureAccount(
            broker_id=SIMULATE_SERVER[server_name]['BrokerID'],  # 期货公司BrokerID
            server_dict=SIMULATE_SERVER[server_name],  # TDServer为交易服务器,MDServer为行情服务器。服务器地址格式为"ip:port。"
            reserve_server_dict={},
            investor_id=investor_id,  # 账户
            password=password,  # 密码
            app_id=SIMULATE_SERVER[server_name]['AppID'],  # 认证使用AppID
            auth_code=SIMULATE_SERVER[server_name]['AuthCode'],  # 认证使用授权码
            subscribe_list=subscribe_list,  # 订阅合约列表
            md_flow_path=md_flow_path,  # MdApi流文件存储地址,默认MD_LOCATION
            td_flow_path=td_flow_path  # TraderApi流文件存储地址,默认TD_LOCATION
        )
    

    2.获取实时行情和交易接口测试

    1.1获取实时行情

    演示这个功能的例子是examples/test_MdApi.py,运行之后可以看到如下的输出结果:

    # -*- codeing:utf-8 -*-
    '''
    @author: syealfalfa
    @datetime: 2024/2/22 11:00
    @Blog: 获取tick数据
    '''
    from CtpPlus.CTP.ApiStruct import ReqUserLoginField, QryMulticastInstrumentField
    from CtpPlus.CTP.FutureAccount import get_simulate_account, FutureAccount
    from CtpPlus.CTP.MdApi import run_api
    from CtpPlus.CTP.MdApiBase import MdApiBase
    
    
    class TickEngine(MdApiBase):
        def __init__(self, broker_id, md_server, investor_id, password, app_id, auth_code, instrument_id_list,
                     md_queue_list=None, page_dir='', using_udp=False, multicast=False, *args, **kwargs):
            super(TickEngine, self).__init__()
    
        def OnRtnDepthMarketData(self, pDepthMarketData):
            print(pDepthMarketData)
    
        def OnRspUserLogin(self, pRspUserLogin, pRspInfo, nRequestID, bIsLast):
            self.write_log('OnRspUserLogin', pRspUserLogin)
    
        def OnRspQryMulticastInstrument(self, pMulticastInstrument, pRspInfo, nRequestID, bIsLast):
            print(f'OnRspQryMulticastInstrument: {pMulticastInstrument}')
    
    
    if __name__ == '__main__':
        subscribe_list = [b'rb2410']
    
        future_account = get_simulate_account(
            investor_id='',  # SimNow账户
            password='',  # SimNow账户密码
            subscribe_list=subscribe_list,  # 合约列表
            server_name='TEST'  # 电信1、电信2、移动、TEST
        )
    
        run_api(TickEngine, future_account)
    
    

    请添加图片描述从输出日志可以看到,AlgoPlus第一步连接服务器,第二步登陆账户,第三步订阅行情,最后就是接收行情数据。

    1.2 交易接口测试

    # -*- codeing:utf-8 -*-
    '''
    @author: syealfalfa
    @datetime: 2024/3/4 9:56
    @Blog:
    '''
    import time
    
    from CtpPlus.CTP.ApiStruct import QryRULEIntraParameterField, QryProductGroupField, QryExchangeField, QryInstrumentField
    from CtpPlus.CTP.FutureAccount import get_simulate_account, FutureAccount
    from CtpPlus.CTP.TraderApiBase import TraderApiBase
    from CtpPlus.utils.base_field import to_str
    
    
    class TraderEngine(TraderApiBase):
        def __init__(self, broker_id, td_server, investor_id, password, app_id, auth_code, md_queue=None, flow_path='', private_resume_type=2, public_resume_type=2):
            super(TraderEngine, self).__init__()
    
        def OnRspSettlementInfoConfirm(self, pSettlementInfoConfirm, pRspInfo, nRequestID, bIsLast):
            """投资者结算结果确认响应"""
            self.write_log('pSettlementInfoConfirm', pSettlementInfoConfirm)
    
        def OnRspUserLogin(self, pRspUserLogin, pRspInfo, nRequestID, bIsLast):
            """登录请求响应"""
            self.write_log('OnRspUserLogin', pRspUserLogin)
    
            # 请求RULE品种内对锁仓折扣参数查询
            # pQryRULEIntraParameter = QryRULEIntraParameterField(ExchangeID=b'9080', ProdFamilyCode=b'MA')
            # ret = self.ReqQryRULEIntraParameter(pQryRULEIntraParameter)
            # print(f'ReqQryRULEIntraParameter: ret = {ret}')
    
            # 请求查询产品组
            # pQryProductGroup = QryProductGroupField(ProductID=b'ag', ExchangeID=b'SHFE')
            # ret = self.ReqQryProductGroup(pQryProductGroup)
            # print(f'ReqQryProductGroup: ret = {ret}')
    
            # 买开仓
            ret = self.buy_open(b'SHFE', b'rb2405', 3720, 1)
            if not ret:
                self.write_log("ReqOrderInsert", "买入开仓成功")
    
            """查询持仓明细"""
            # self.query_position_detail()
    
            """请求查询交易所"""
            # pQryExchange = QryExchangeField(ExchangeID=b'SHFE')
            # self.ReqQryExchange(pQryExchange)
    
            """请求查询合约,填空可以查询到所有合约"""
            # pQryInstrument = QryInstrumentField()
            # self.ReqQryInstrument(pQryInstrument)
            # self.query_instrument()
    
        def OnRspQryInstrument(self, pInstrument, pRspInfo, nRequestID, bIsLast):
            """请求查询合约响应"""
            pInstrument['InstrumentName'] = to_str(pInstrument['InstrumentName'])
            self.write_log("OnRspQryInstrument", pInstrument, pRspInfo)
    
        def OnRspQryRULEIntraParameter(self, pRULEIntraParameter, pRspInfo, nRequestID, bIsLast):
            self.write_log('OnRspQryRULEIntraParameter', pRULEIntraParameter)
    
        def OnRspQryProductGroup(self, pProductGroup, pRspInfo, nRequestID, bIsLast):
            self.write_log('OnRspQryProductGroup', pProductGroup, pRspInfo, nRequestID, bIsLast)
    
        def OnRspQryExchange(self, pExchange, pRspInfo, nRequestID, bIsLast):
            pExchange['ExchangeName'] = to_str(pExchange['ExchangeName'])
            self.write_log('OnRspQryExchange', pExchange)
    
    
    def run_api(api_cls, account):
        if isinstance(account, FutureAccount):
            trader_engine = api_cls(
                account.broker_id,
                account.server_dict['TDServer'],
                account.investor_id,
                account.password,
                account.app_id,
                account.auth_code,
                None,
                account.td_flow_path
            )
            trader_engine.Join()
    
    
    if __name__ == '__main__':
        subscribe_list = [b'rb2410']
    
        future_account = get_simulate_account(
            investor_id='******',  # SimNow账户
            password='******',  # SimNow账户密码
            subscribe_list=subscribe_list,  # 合约列表
            server_name='TEST'  # 电信1、电信2、移动、TEST、实盘
        )
    
        run_api(TraderEngine, future_account)
    

    测试结果如下:请添加图片描述

    总结

    请自行下载源码研究,文档中微信号,欢迎各位量化交易者添加,相互探讨

    作者:syealfalfa

    物联沃分享整理
    物联沃-IOTWORD物联网 » python-期货CTP接口封装

    发表回复