# encoding=utf-8 import math from pytdx.hq import TdxHq_API import pathlib import multiprocessing as mp from multiprocessing import Pool class myTdx: def __init__(self): self.HqHOSTS = pathlib.Path("HqHOSTS.txt").read_text().split() self.SYMBOLS = pathlib.Path("A.txt").read_text().split() self.SYMBOLS2 = self.getsymbols2() self.HSDICT = dict(zip(self.HqHOSTS, self.SYMBOLS2)) self.manager = mp.Manager self.RESULT = self.manager().list() def getsymbols2(self): symbols = self.SYMBOLS total = len(symbols) page = len(self.HqHOSTS) perpage = math.ceil(total / page) return [symbols[i:i + perpage] for i in range(0, len(symbols), perpage)] def getdata(self, host, slist): # 数据获取接口一般返回list结构, api = TdxHq_API() liveapi = api.connect(host, 7709) if liveapi: r_list = [] for symbol in slist: market_code = 1 if str(symbol[0]) == '6' else 0 data = liveapi.get_transaction_data(market_code, symbol, 0, 30) rdic = {symbol: data} r_list.append(rdic) self.RESULT.extend(r_list) else: print(host, "服務器不可用...") liveapi.disconnect() def save(self): # print(self.RESULT) log = open('r.txt', 'a') print(self.RESULT, file=log) if __name__ == '__main__': a = myTdx() pool = Pool(len(a.HqHOSTS)) for host in a.HqHOSTS: alist = a.HSDICT.get(host) pool.apply_async(func=a.getdata, args=(host, alist)) pool.close() pool.join() a.save()
# encoding=utf-8 from pytdx.hq import TdxHq_API import pathlib import re if __name__ == '__main__': ips_text = pathlib.Path("HQHOSTA.txt").read_text(encoding='utf-8') # 读取文件 ips_list = re.compile(r'\d+\.\d+\.\d+\.\d+').findall(ips_text) # 正则查找所有ip ips_listOK = [] api = TdxHq_API() for ip in ips_list: if api.connect(ip, 7709): ips_listOK.append(ip) if len(ips_listOK) > 0: ips_listOld = pathlib.Path("HqHOSTS.txt").read_text(encoding='utf-8').split() ips_listOld.extend(ips_listOK) ips_new = list(set(ips_listOld)) # 去重 pathlib.Path("HqHOSTS.txt").write_text("\n".join(ips_new)) pathlib.Path('HQHOSTA.txt').unlink() print("完成")
标签:__,HqHOSTS,pytdx,示例,self,list,pathlib,ips,多线程 From: https://www.cnblogs.com/pu369/p/17306338.html