我使用 API 为盈透证券构建了一个交易机器人。但是,我既无法让它返回“TOP-PER_GAIN”,也无法从我的帐户返回可用现金。
你们中的任何人都可以告诉我哪里出了问题吗?这是相关的代码。
from ib_insync import *
from ib_insync import IB, Stock, MarketOrder
import threading
import time
import datetime
import asyncio
from ibapi.wrapper import *
from ibapi.contract import Contract
class IBapi(Wrapper, Client):
def __init__(self):
EClient.__init__(self, self)
# Listen for real time bars
def realtimeBar(self, reqId, time, open_, high, low, close, volume, wap, count):
super().realtimeBar(reqId, time, open_, high, low, close, volume, wap, count)
try:
bot.on_bar_update(reqId, time, open_, high, low, close, volume, wap, count)
except Exception as e:
print(e)
def error(self, id, errorCode, errorMsg):
print(f"Error: {errorCode}, {errorMsg}")
class bot:
def __init__(self, account_id):
self.account_id = account_id
# Connect to IB on init
self.ib = IB()
self.ib_thread = threading.Thread(target=self.run_loop, daemon=True)
self.ib_thread.start()
time.sleep(1)
self.connect_ib()
# Request symbols from the scanner
self.symbols = self.search_stocks()
if not self.symbols:
print("No symbols found from scanner.")
return
# IB Contract Object
self.contract = Contract()
self.contract.secType = "STK"
self.contract.exchange = "SMART"
self.contract.currency = "USD"
self.contract.symbol = symbol
# Request Market Data
self.ib.reqRealTimeBars(0, self.contract, 5, "TRADES", 1, [])
print("Market data requested")
def connect_ib(self):
self.ib.connect('127.0.0.1', 7496, clientId=1) # Adjust the port if needed
print("Connection successful")
def run_loop(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self.ib.run()
# Pass real time bar data back to the bot object
def on_bar_update(self, reqId, time, open_, high, low, close, volume, wap, count):
print(f"Close price: {close}")
def filter_by_price_change(self, symbols):
filtered_symbols = []
for symbol in symbols:
contract = Stock(symbol, 'SMART', 'USD')
self.ib.qualifyContracts(contract)
ticker = self.ib.reqMktData(contract)
self.ib.sleep(1) # Wait for data to be fetched
if ticker.close and ticker.previousClose:
price_change = ((ticker.close - ticker.previousClose) / ticker.previousClose) * 100
if price_change >= 5:
filtered_symbols.append(symbol)
self.ib.cancelMktData(contract)
return filtered_symbols
def search_stocks(self):
# Define the scanner subscription criteria
scanner = ScannerSubscription(
instrument='STK',
locationCode='STK.US.MAJOR',
scanCode='TOP_PERC_GAIN',
abovePrice=1,
aboveVolume=50000
)
# Request the scanner data
scanDataList = self.ib.reqScannerSubscription(scanner)
time.sleep(20) # Wait a moment for the scanner to gather data
print("Stocks Scanned")
# Extract stock symbols from the scanner data
symbols = [scan.contractDetails.contract.symbol for scan in scanDataList]
print(symbols)
# Filter symbols by a 5% increase in price from the previous day
filtered_symbols = self.filter_by_price_change(symbols)
print(filtered_symbols)
return filtered_symbols
def get_available_cash(self):
try:
account_summary = self.ib.reqAccountSummary(9001, "All", AccountSummaryTags.AllTags)
if not account_summary:
print("No account summary data returned.")
return 0.0
for item in account_summary:
if item.tag == 'AvailableFunds' and item.currency == 'USD'and item.account == self.account_id:
return float(item.value)
print(item.value)
print("Account summary data does not contain available funds information.")
return 0.0
except Exception as e:
print(f"Error retrieving account summary: {e}")
return 0.0
def buy_stock(self, stock, initial_cash):
# Rule 3: Spend up to 1/4 of available money on a stock
max_investment = initial_cash / 4
# Get current price
data = self.ib.reqMktData(stock, '', False, False)
self.ib.sleep(2) # Let IB fetch the data
if data and data.last != 0:
current_price = data.last
quantity = max_investment // current_price
# Place market order to buy the stock
order = MarketOrder('BUY', quantity)
trade = self.ib.placeOrder(stock, order)
return trade, current_price
return None, None
def sell_stock(self, stock, quantity):
# Place market order to sell the stock
order = MarketOrder('SELL', quantity)
trade = self.ib.placeOrder(stock, order)
return trade`your text`
def trading_bot(account_id):
my_bot = bot(account_id)
# Get initial available cash
available_cash = my_bot.get_available_cash()
print(available_cash)
portfolio = []
while True:
current_time = datetime.datetime.now().time()
# Execute only within trading hours, adjust the time as per your need
if datetime.time(9, 30) <= current_time <= datetime.time(16, 0):
# Rule 1: Check for stocks to buy until 9:50 a.m.
if current_time <= datetime.time(10, 30):
candidates = my_bot.search_stocks()
这是当前的输出:
连接成功 股票扫描 [] [] 扫描仪中未找到符号。 检索帐户摘要时出错:未定义名称“AccountSummaryTags” 0.0
代码中有几个问题需要解决:
-
AccountSummaryTags
未定义: -
需要从
ib_insync.ib
模块导入AccountSummaryTags
:python from ib_insync import IB, Stock, MarketOrder, AccountSummaryTags
-
扫描器问题:
-
TOP_PERC_GAIN
不存在: IB API 中没有TOP_PERC_GAIN
扫描代码。应该使用诸如TOP_PERC_GAINERS
之类的代码。有关可用扫描代码的完整列表,请参阅 IB 网站上的 https://interactivebrokers.github.io/tws-api/available_scans.html 。 -
扫描器延迟:
reqScannerSubscription
方法会异步返回数据。正在使用time.sleep(20)
,但这并不能保证扫描仪已在此时间内返回数据。使用ib.sleep()
代替它,它会妥善处理异步操作:python scanDataList = self.ib.reqScannerSubscription(scanner) self.ib.sleep(5) # 等待扫描仪数据
-
reqMktData
用于历史数据: -
正在使用
reqMktData
获取股票的当前价格,但此方法主要用于获取历史数据。虽然它可能起作用,但更可靠的方法是使用reqTickers
或reqMarketDataType(4)
来获取实时价格。 -
其他改进:
- 错误处理: 添加适当的错误处理机制来处理来自 IB API 的错误。
-
循环逻辑:
while True
循环内的当前逻辑似乎不完整。可能希望添加条件来打破循环或定期检查新股票。
以下是更新后的代码,其中包含所述修复程序和改进:
from ib_insync import *
from ib_insync import IB, Stock, MarketOrder, AccountSummaryTags
import threading
import time
import datetime
import asyncio
from ibapi.wrapper import *
from ibapi.contract import Contract
class IBapi(Wrapper, Client):
# ...(代码的其余部分与以前相同)
class bot:
def __init__(self, account_id):
# ...(代码的其余部分与以前相同)
def search_stocks(self):
# Define the scanner subscription criteria
scanner = ScannerSubscription(
instrument='STK',
locationCode='STK.US.MAJOR',
scanCode='TOP_PERC_GAINERS', # 使用正确的扫描代码
abovePrice=1,
aboveVolume=50000
)
# Request the scanner data
scanDataList = self.ib.reqScannerSubscription(scanner)
self.ib.sleep(5) # 等待扫描仪数据
print("股票扫描")
# Extract stock symbols from the scanner data
symbols = [scan.contractDetails.contract.symbol for scan in scanDataList]
print(symbols)
# Filter symbols by a 5% increase in price from the previous day
filtered_symbols = self.filter_by_price_change(symbols)
print(filtered_symbols)
return filtered_symbols
def filter_by_price_change(self, symbols):
# ...(代码的其余部分与以前相同)
def get_available_cash(self):
try:
account_summary = self.ib.reqAccountSummary(9001, "All", AccountSummaryTags.AllTags)
if not account_summary:
print("未返回帐户摘要数据。")
return 0.0
for item in account_summary:
if item.tag == 'AvailableFunds' and item.currency == 'USD' and item.account == self.account_id:
return float(item.value)
print("帐户摘要数据不包含可用资金信息。")
return 0.0
except Exception as e:
print(f"检索帐户摘要时出错:{e}")
return 0.0
# ...(代码的其余部分与以前相同)
def trading_bot(account_id):
# ...(代码的其余部分与以前相同)
更改这些内容应该可以解决遇到的问题,并使能够从扫描仪获取股票数据并从的帐户获取可用现金。
标签:python,api,trading,interactive-brokers From: 78818775