首页 > 编程语言 >PandasTA 源码解析(七)

PandasTA 源码解析(七)

时间:2024-04-15 13:44:05浏览次数:19  
标签:slow PandasTA kwargs length 源码 offset close fillna 解析

.\pandas-ta\pandas_ta\momentum\stc.py

# -*- coding: utf-8 -*-
从 pandas 库中导入 DataFrame 和 Series 类
从 pandas_ta.overlap 模块中导入 ema 函数
从 pandas_ta.utils 模块中导入 get_offset、non_zero_range 和 verify_series 函数


# 定义函数:Schaff Trend Cycle (STC)
def stc(close, tclength=None, fast=None, slow=None, factor=None, offset=None, **kwargs):
    # 验证参数
    tclength = int(tclength) if tclength and tclength > 0 else 10
    fast = int(fast) if fast and fast > 0 else 12
    slow = int(slow) if slow and slow > 0 else 26
    factor = float(factor) if factor and factor > 0 else 0.5
    # 如果慢线小于快线,则交换它们的值
    if slow < fast:                
        fast, slow = slow, fast
    # 计算所需数据的长度,取最大值
    _length = max(tclength, fast, slow)
    # 验证收盘价数据,返回验证后的 Series 对象
    close = verify_series(close, _length)
    # 获取偏移量
    offset = get_offset(offset)

    # 如果收盘价为 None,则返回
    if close is None: return

    # kwargs 允许传递三个更多的 Series(ma1、ma2 和 osc),这些可以在这里传递,
    # ma1 和 ma2 输入会抵消内部的 ema 计算,osc 替代了两个 ma。
    ma1 = kwargs.pop("ma1", False)
    ma2 = kwargs.pop("ma2", False)
    osc = kwargs.pop("osc", False)

    # 3 种不同的计算模式..
    if isinstance(ma1, Series) and isinstance(ma2, Series) and not osc:
        # 验证输入的两个外部 Series 对象
        ma1 = verify_series(ma1, _length)
        ma2 = verify_series(ma2, _length)

        # 如果其中一个为 None,则返回
        if ma1 is None or ma2 is None: return
        # 根据外部提供的 Series 计算结果
        xmacd = ma1 - ma2
        # 调用共享计算函数
        pff, pf = schaff_tc(close, xmacd, tclength, factor)

    elif isinstance(osc, Series):
        # 验证输入的振荡器 Series 对象
        osc = verify_series(osc, _length)
        # 如果为 None,则返回
        if osc is None: return
        # 根据提供的振荡器计算结果(应在 0 轴附近)
        xmacd = osc
        # 调用共享计算函数
        pff, pf = schaff_tc(close, xmacd, tclength, factor)

    else:
        # 计算结果..(传统/完整)
        # MACD 线
        fastma = ema(close, length=fast)
        slowma = ema(close, length=slow)
        xmacd = fastma - slowma
        # 调用共享计算函数
        pff, pf = schaff_tc(close, xmacd, tclength, factor)

    # 结果 Series
    stc = Series(pff, index=close.index)
    macd = Series(xmacd, index=close.index)
    stoch = Series(pf, index=close.index)

    # 偏移
    if offset != 0:
        stc = stc.shift(offset)
        macd = macd.shift(offset)
        stoch = stoch.shift(offset)

    # 填充缺失值
    if "fillna" in kwargs:
        stc.fillna(kwargs["fillna"], inplace=True)
        macd.fillna(kwargs["fillna"], inplace=True)
        stoch.fillna(kwargs["fillna"], inplace=True)
    if "fill_method" in kwargs:
        stc.fillna(method=kwargs["fill_method"], inplace=True)
        macd.fillna(method=kwargs["fill_method"], inplace=True)
        stoch.fillna(method=kwargs["fill_method"], inplace=True)

    # 命名和分类
    _props = f"_{tclength}_{fast}_{slow}_{factor}"
    stc.name = f"STC{_props}"
    macd.name = f"STCmacd{_props}"
    # 设置 stoch 对象的名称属性为包含 _props 的字符串
    stoch.name = f"STCstoch{_props}"
    # 设置 stc 和 macd 对象的 category 属性为 "momentum"
    stc.category = macd.category = stoch.category ="momentum"

    # 准备要返回的 DataFrame
    # 创建一个字典,包含 stc.name、macd.name 和 stoch.name 作为键,对应的对象为值
    data = {stc.name: stc, macd.name: macd, stoch.name: stoch}
    # 用 data 字典创建 DataFrame 对象
    df = DataFrame(data)
    # 设置 DataFrame 对象的名称属性为包含 _props 的字符串
    df.name = f"STC{_props}"
    # 设置 DataFrame 对象的 category 属性为 stc 对象的 category 属性
    df.category = stc.category

    # 返回 DataFrame 对象
    return df
# 设置 stc 的文档字符串,描述 Schaff Trend Cycle(STC)指标的计算方法和用法
stc.__doc__ = \
"""Schaff Trend Cycle (STC)

The Schaff Trend Cycle is an evolution of the popular MACD incorportating two
cascaded stochastic calculations with additional smoothing.

The STC returns also the beginning MACD result as well as the result after the
first stochastic including its smoothing. This implementation has been extended
for Pandas TA to also allow for separatly feeding any other two moving Averages
(as ma1 and ma2) or to skip this to feed an oscillator (osc), based on which the
Schaff Trend Cycle should be calculated.

Feed external moving averages:
Internally calculation..
    stc = ta.stc(close=df["close"], tclen=stc_tclen, fast=ma1_interval, slow=ma2_interval, factor=stc_factor)
becomes..
    extMa1 = df.ta.zlma(close=df["close"], length=ma1_interval, append=True)
    extMa2 = df.ta.ema(close=df["close"], length=ma2_interval, append=True)
    stc = ta.stc(close=df["close"], tclen=stc_tclen, ma1=extMa1, ma2=extMa2, factor=stc_factor)

The same goes for osc=, which allows the input of an externally calculated oscillator, overriding ma1 & ma2.


Sources:
    Implemented by rengel8 based on work found here:
    https://www.prorealcode.com/prorealtime-indicators/schaff-trend-cycle2/

Calculation:
    STCmacd = Moving Average Convergance/Divergance or Oscillator
    STCstoch = Intermediate Stochastic of MACD/Osc.
    2nd Stochastic including filtering with results in the
    STC = Schaff Trend Cycle

Args:
    close (pd.Series): Series of 'close's, used for indexing Series, mandatory
    tclen (int): SchaffTC Signal-Line length.  Default: 10 (adjust to the half of cycle)
    fast (int): The short period.   Default: 12
    slow (int): The long period.   Default: 26
    factor (float): smoothing factor for last stoch. calculation.   Default: 0.5
    offset (int): How many periods to offset the result.  Default: 0

Kwargs:
    ma1: 1st moving average provided externally (mandatory in conjuction with ma2)
    ma2: 2nd moving average provided externally (mandatory in conjuction with ma1)
    osc: an externally feeded osillator
    fillna (value, optional): pd.DataFrame.fillna(value)
    fill_method (value, optional): Type of fill method

Returns:
    pd.DataFrame: stc, macd, stoch
"""

# 定义 Schaff Trend Cycle(STC)计算函数
def schaff_tc(close, xmacd, tclength, factor):
    # 实际计算部分,这部分计算适用于不同的操作模式
    # 1. MACD 的 Stochastic
    # 计算区间 tclen 内 xmacd 的最小值
    lowest_xmacd = xmacd.rolling(tclength).min()  # min value in interval tclen
    # 计算区间 tclen 内 xmacd 的范围(最大值 - 最小值)
    xmacd_range = non_zero_range(xmacd.rolling(tclength).max(), lowest_xmacd)
    # 获取 xmacd 的长度
    m = len(xmacd)

    # 计算 MACD 的快速 %K
    # 初始化 stoch1 和 pf 列表
    stoch1, pf = list(xmacd), list(xmacd)
    # 第一个元素的值为 0
    stoch1[0], pf[0] = 0, 0
    # 循环计算 stoch1 和 pf 列表中的值
    for i in range(1, m):
        # 如果 lowest_xmacd[i] 大于 0,则计算快速 %K
        if lowest_xmacd[i] > 0:
            stoch1[i] = 100 * ((xmacd[i] - lowest_xmacd[i]) / xmacd_range[i])
        else:
            # 否则保持前一个值不变
            stoch1[i] = stoch1[i - 1]
        # 计算平滑后的 %D
        pf[i] = round(pf[i - 1] + (factor * (stoch1[i] - pf[i - 1])), 8)
    # 将 pf 转换为 Series 类型,并以 close 的索引为索引
    pf = Series(pf, index=close.index)

    # 计算平滑后的 Percent Fast D, 'PF' 的随机指标
    # 计算滚动窗口为 tclength 的最小值
    lowest_pf = pf.rolling(tclength).min()
    # 计算 pf 在滚动窗口为 tclength 的范围内的非零范围
    pf_range = non_zero_range(pf.rolling(tclength).max(), lowest_pf)

    # 计算 % Fast K of PF
    stoch2, pff = list(xmacd), list(xmacd)
    stoch2[0], pff[0] = 0, 0
    for i in range(1, m):
        if pf_range[i] > 0:
            # 计算 % Fast K of PF
            stoch2[i] = 100 * ((pf[i] - lowest_pf[i]) / pf_range[i])
        else:
            stoch2[i] = stoch2[i - 1]
        # 计算平滑后的 % Fast D of PF
        # 使用平滑因子 factor 进行平滑计算
        pff[i] = round(pff[i - 1] + (factor * (stoch2[i] - pff[i - 1])), 8)

    # 返回平滑后的 % Fast D of PF 和原始的 PF
    return [pff, pf]

# `.\pandas-ta\pandas_ta\momentum\stoch.py`

```py
# -*- coding: utf-8 -*-
# 导入DataFrame类
from pandas import DataFrame
# 从pandas_ta.overlap模块中导入ma函数
from pandas_ta.overlap import ma
# 从pandas_ta.utils模块中导入get_offset、non_zero_range和verify_series函数
from pandas_ta.utils import get_offset, non_zero_range, verify_series


# 定义Stochastic Oscillator (STOCH)函数
def stoch(high, low, close, k=None, d=None, smooth_k=None, mamode=None, offset=None, **kwargs):
    """Indicator: Stochastic Oscillator (STOCH)"""
    # 校验参数
    # 如果k为正数则使用k,否则默认为14
    k = k if k and k > 0 else 14
    # 如果d为正数则使用d,否则默认为3
    d = d if d and d > 0 else 3
    # 如果smooth_k为正数则使用smooth_k,否则默认为3
    smooth_k = smooth_k if smooth_k and smooth_k > 0 else 3
    # 计算_max(k, d, smooth_k)
    _length = max(k, d, smooth_k)
    # 校验high、low和close的长度是否为_length
    high = verify_series(high, _length)
    low = verify_series(low, _length)
    close = verify_series(close, _length)
    # 获取offset值
    offset = get_offset(offset)
    # 如果mamode不是字符串则设为"sma"
    mamode = mamode if isinstance(mamode, str) else "sma"

    # 如果high、low或close有任何一个为None,则返回空值
    if high is None or low is None or close is None: return

    # 计算结果
    # 计算过去k个周期的最低值
    lowest_low = low.rolling(k).min()
    # 计算过去k个周期的最高值
    highest_high = high.rolling(k).max()

    # 计算stoch值
    stoch = 100 * (close - lowest_low)
    stoch /= non_zero_range(highest_high, lowest_low)

    # 计算stoch_k和stoch_d
    stoch_k = ma(mamode, stoch.loc[stoch.first_valid_index():,], length=smooth_k)
    stoch_d = ma(mamode, stoch_k.loc[stoch_k.first_valid_index():,], length=d)

    # 偏移处理
    if offset != 0:
        stoch_k = stoch_k.shift(offset)
        stoch_d = stoch_d.shift(offset)

    # 处理填充值
    if "fillna" in kwargs:
        stoch_k.fillna(kwargs["fillna"], inplace=True)
        stoch_d.fillna(kwargs["fillna"], inplace=True)
    if "fill_method" in kwargs:
        stoch_k.fillna(method=kwargs["fill_method"], inplace=True)
        stoch_d.fillna(method=kwargs["fill_method"], inplace=True)

    # 设置名称和分类
    _name = "STOCH"
    _props = f"_{k}_{d}_{smooth_k}"
    stoch_k.name = f"{_name}k{_props}"
    stoch_d.name = f"{_name}d{_props}"
    stoch_k.category = stoch_d.category = "momentum"

    # 准备要返回的DataFrame
    data = {stoch_k.name: stoch_k, stoch_d.name: stoch_d}
    df = DataFrame(data)
    df.name = f"{_name}{_props}"
    df.category = stoch_k.category
    return df


# 设置stoch函数的文档字符串
stoch.__doc__ = \
"""Stochastic (STOCH)

The Stochastic Oscillator (STOCH) was developed by George Lane in the 1950's.
He believed this indicator was a good way to measure momentum because changes in
momentum precede changes in price.

It is a range-bound oscillator with two lines moving between 0 and 100.
The first line (%K) displays the current close in relation to the period's
high/low range. The second line (%D) is a Simple Moving Average of the %K line.
The most common choices are a 14 period %K and a 3 period SMA for %D.

Sources:
    https://www.tradingview.com/wiki/Stochastic_(STOCH)
    https://www.sierrachart.com/index.php?page=doc/StudiesReference.php&ID=332&Name=KD_-_Slow

Calculation:
    Default Inputs:
        k=14, d=3, smooth_k=3
    SMA = Simple Moving Average
    LL  = low for last k periods
    HH  = high for last k periods

    STOCH = 100 * (close - LL) / (HH - LL)
    STOCHk = SMA(STOCH, smooth_k)
    STOCHd = SMA(FASTK, d)

Args:
    high (pd.Series): Series of 'high's

"""
    # 表示传入函数的参数,分别为低价序列
    low (pd.Series): Series of 'low's
    # 表示传入函数的参数,分别为收盘价序列
    close (pd.Series): Series of 'close's
    # 表示传入函数的参数,表示快速 %K 的周期,默认为 14
    k (int): The Fast %K period. Default: 14
    # 表示传入函数的参数,表示慢速 %K 的周期,默认为 3
    d (int): The Slow %K period. Default: 3
    # 表示传入函数的参数,表示慢速 %D 的周期,默认为 3
    smooth_k (int): The Slow %D period. Default: 3
    # 表示传入函数的参数,参见 ta.ma 的帮助文档。默认为 'sma'
    mamode (str): See ```help(ta.ma)```py. Default: 'sma'
    # 表示传入函数的参数,表示结果的偏移周期数,默认为 0
    offset (int): How many periods to offset the result. Default: 0
# 参数说明:
# - fillna (value, optional): 使用 value 对 pd.DataFrame 进行填充
# - fill_method (value, optional): 填充方法的类型

# 返回值:
# - 返回一个 pd.DataFrame,包含 %K 和 %D 列

.\pandas-ta\pandas_ta\momentum\stochrsi.py

# -*- coding: utf-8 -*-
# 从 pandas 库中导入 DataFrame 类
from pandas import DataFrame
# 从 .rsi 模块中导入 rsi 函数
from .rsi import rsi
# 从 pandas_ta.overlap 模块中导入 ma 函数
from pandas_ta.overlap import ma
# 从 pandas_ta.utils 模块中导入 get_offset, non_zero_range, verify_series 函数
from pandas_ta.utils import get_offset, non_zero_range, verify_series


# 定义函数 stochrsi,计算 Stochastic RSI Oscillator (STOCHRSI)
def stochrsi(close, length=None, rsi_length=None, k=None, d=None, mamode=None, offset=None, **kwargs):
    """Indicator: Stochastic RSI Oscillator (STOCHRSI)"""
    # 校验参数
    length = length if length and length > 0 else 14
    rsi_length = rsi_length if rsi_length and rsi_length > 0 else 14
    k = k if k and k > 0 else 3
    d = d if d and d > 0 else 3
    # 校验 close 序列
    close = verify_series(close, max(length, rsi_length, k, d))
    offset = get_offset(offset)
    # 确定 mamode 默认为 "sma",如果 mamode 不是字符串则设为 "sma"
    mamode = mamode if isinstance(mamode, str) else "sma"

    # 如果 close 为 None,返回空值
    if close is None: return

    # 计算结果
    # 计算 RSI
    rsi_ = rsi(close, length=rsi_length)
    # 计算最低 RSI
    lowest_rsi = rsi_.rolling(length).min()
    # 计算最高 RSI
    highest_rsi = rsi_.rolling(length).max()

    # 计算 stoch 值
    stoch = 100 * (rsi_ - lowest_rsi)
    stoch /= non_zero_range(highest_rsi, lowest_rsi)

    # 计算 STOCHRSI 的 %K 线和 %D 线
    stochrsi_k = ma(mamode, stoch, length=k)
    stochrsi_d = ma(mamode, stochrsi_k, length=d)

    # 偏移
    if offset != 0:
        stochrsi_k = stochrsi_k.shift(offset)
        stochrsi_d = stochrsi_d.shift(offset)

    # 处理填充值
    if "fillna" in kwargs:
        stochrsi_k.fillna(kwargs["fillna"], inplace=True)
        stochrsi_d.fillna(kwargs["fillna"], inplace=True)
    if "fill_method" in kwargs:
        stochrsi_k.fillna(method=kwargs["fill_method"], inplace=True)
        stochrsi_d.fillna(method=kwargs["fill_method"], inplace=True)

    # 命名并分类
    _name = "STOCHRSI"
    _props = f"_{length}_{rsi_length}_{k}_{d}"
    stochrsi_k.name = f"{_name}k{_props}"
    stochrsi_d.name = f"{_name}d{_props}"
    stochrsi_k.category = stochrsi_d.category = "momentum"

    # 准备返回的 DataFrame
    data = {stochrsi_k.name: stochrsi_k, stochrsi_d.name: stochrsi_d}
    df = DataFrame(data)
    df.name = f"{_name}{_props}"
    df.category = stochrsi_k.category

    return df


# 设置 stochrsi 函数的文档字符串
stochrsi.__doc__ = \
"""Stochastic (STOCHRSI)

"Stochastic RSI and Dynamic Momentum Index" was created by Tushar Chande and Stanley Kroll and published in Stock & Commodities V.11:5 (189-199)

It is a range-bound oscillator with two lines moving between 0 and 100.
The first line (%K) displays the current RSI in relation to the period's
high/low range. The second line (%D) is a Simple Moving Average of the %K line.
The most common choices are a 14 period %K and a 3 period SMA for %D.

Sources:
    https://www.tradingview.com/wiki/Stochastic_(STOCH)

Calculation:
    Default Inputs:
        length=14, rsi_length=14, k=3, d=3
    RSI = Relative Strength Index
    SMA = Simple Moving Average

    RSI = RSI(high, low, close, rsi_length)
    LL  = lowest RSI for last rsi_length periods
    HH  = highest RSI for last rsi_length periods

    STOCHRSI  = 100 * (RSI - LL) / (HH - LL)
    STOCHRSIk = SMA(STOCHRSI, k)
    STOCHRSId = SMA(STOCHRSIk, d)

Args:
    high (pd.Series): Series of 'high's

"""
    low (pd.Series): 存储股价的最低价序列
    close (pd.Series): 存储股价的收盘价序列
    length (int): STOCHRSI 的周期。默认为 14
    rsi_length (int): RSI 的周期。默认为 14
    k (int): 快速 %K 的周期。默认为 3
    d (int): 慢速 %K 的周期。默认为 3
    mamode (str): 查看 ```help(ta.ma)```py。默认为 'sma'(简单移动平均)
    offset (int): 结果偏移的周期数。默认为 0
# 参数说明部分,描述函数的参数和返回值
Kwargs:
    fillna (value, optional): pd.DataFrame.fillna(value)
    fill_method (value, optional): Type of fill method

# 返回值说明部分,描述函数返回的数据类型和列名
Returns:
    pd.DataFrame: RSI %K, RSI %D columns.

.\pandas-ta\pandas_ta\momentum\td_seq.py

# -*- coding: utf-8 -*-
# 从 numpy 库中导入 where 函数并重命名为 npWhere
from numpy import where as npWhere
# 从 pandas 库中导入 DataFrame 和 Series 类
from pandas import DataFrame, Series
# 从 pandas_ta.utils 模块中导入 get_offset 和 verify_series 函数
from pandas_ta.utils import get_offset, verify_series

# 定义函数 td_seq,用于计算 Tom Demark Sequential(TD_SEQ)指标
def td_seq(close, asint=None, offset=None, **kwargs):
    """Indicator: Tom Demark Sequential (TD_SEQ)"""
    # 验证参数 close 是否为有效的 Series 对象
    close = verify_series(close)
    # 获取偏移量
    offset = get_offset(offset)
    # 如果 asint 不为布尔值,则设置为 False
    asint = asint if isinstance(asint, bool) else False
    # 获取参数中的 show_all,如果不存在则设置默认值为 True
    show_all = kwargs.setdefault("show_all", True)

    # 定义函数 true_sequence_count,用于计算连续的真值序列数量
    def true_sequence_count(series: Series):
        # 找到最后一个为 False 的索引
        index = series.where(series == False).last_valid_index()

        if index is None:
            # 如果索引为空,则返回序列的总数
            return series.count()
        else:
            # 否则,返回索引之后的序列数量
            s = series[series.index > index]
            return s.count()

    # 定义函数 calc_td,用于计算 TD_SEQ
    def calc_td(series: Series, direction: str, show_all: bool):
        # 计算 TD_SEQ 的布尔值
        td_bool = series.diff(4) > 0 if direction=="up" else series.diff(4) < 0
        # 根据布尔值计算 TD_SEQ 数值
        td_num = npWhere(
            td_bool, td_bool.rolling(13, min_periods=0).apply(true_sequence_count), 0
        )
        td_num = Series(td_num)

        if show_all:
            # 如果 show_all 为 True,则保留所有 TD_SEQ 值
            td_num = td_num.mask(td_num == 0)
        else:
            # 否则,只保留在 6 到 9 之间的 TD_SEQ 值
            td_num = td_num.mask(~td_num.between(6,9))

        return td_num

    # 计算上升序列的 TD_SEQ
    up_seq = calc_td(close, "up", show_all)
    # 计算下降序列的 TD_SEQ
    down_seq = calc_td(close, "down", show_all)

    # 如果需要将结果转换为整数
    if asint:
        if up_seq.hasnans and down_seq.hasnans:
            # 填充缺失值为 0
            up_seq.fillna(0, inplace=True)
            down_seq.fillna(0, inplace=True)
        # 转换结果为整数类型
        up_seq = up_seq.astype(int)
        down_seq = down_seq.astype(int)

    # 如果偏移量不为 0
    if offset != 0:
        # 对结果进行偏移
        up_seq = up_seq.shift(offset)
        down_seq = down_seq.shift(offset)

    # 处理填充值
    if "fillna" in kwargs:
        up_seq.fillna(kwargs["fillna"], inplace=True)
        down_seq.fillna(kwargs["fillna"], inplace=True)

    if "fill_method" in kwargs:
        up_seq.fillna(method=kwargs["fill_method"], inplace=True)
        down_seq.fillna(method=kwargs["fill_method"], inplace=True)

    # 设置上升序列和下降序列的名称和分类
    up_seq.name = f"TD_SEQ_UPa" if show_all else f"TD_SEQ_UP"
    down_seq.name = f"TD_SEQ_DNa" if show_all else f"TD_SEQ_DN"
    up_seq.category = down_seq.category = "momentum"

    # 准备要返回的 DataFrame
    df = DataFrame({up_seq.name: up_seq, down_seq.name: down_seq})
    df.name = "TD_SEQ"
    df.category = up_seq.category

    return df

# 设置函数文档字符串
td_seq.__doc__ = \
"""TD Sequential (TD_SEQ)

Tom DeMark's Sequential indicator attempts to identify a price point where an
uptrend or a downtrend exhausts itself and reverses.

Sources:
    https://tradetrekker.wordpress.com/tdsequential/

Calculation:
    Compare current close price with 4 days ago price, up to 13 days. For the
    consecutive ascending or descending price sequence, display 6th to 9th day
    value.

Args:
    close (pd.Series): Series of 'close's
    asint (bool): If True, fillnas with 0 and change type to int. Default: False
    offset (int): How many periods to offset the result. Default: 0

Kwargs:

"""
    # 定义函数参数show_all,用于控制展示范围,默认为True,即展示1到13;如果设置为False,仅展示6到9。
    show_all (bool): Show 1 - 13. If set to False, show 6 - 9. Default: True
    # 定义函数参数fillna,用于填充缺失值,参数value为填充的数值,默认为空。
    fillna (value, optional): pd.DataFrame.fillna(value)
# 返回类型说明:返回的是一个 Pandas DataFrame,其中包含了生成的新特征。

.\pandas-ta\pandas_ta\momentum\trix.py

# -*- coding: utf-8 -*-
# 从 pandas 库中导入 DataFrame 类
from pandas import DataFrame
# 从 pandas_ta 库中导入 overlap 模块下的 ema 函数
from pandas_ta.overlap.ema import ema
# 从 pandas_ta 库中导入 utils 模块
from pandas_ta.utils import get_drift, get_offset, verify_series

# 定义 Trix 指标函数,用于计算 Trix (TRIX) 指标
def trix(close, length=None, signal=None, scalar=None, drift=None, offset=None, **kwargs):
    """Indicator: Trix (TRIX)"""
    # 验证参数
   length = int(length) if length and length > 0 else 30
    signal = int(signal) if signal and signal > 0 else 9
    scalar = float(scalar) if scalar else 100
    close = verify_series(close, max(length, signal))
    drift = get_drift(drift)
    offset = get_offset(offset)

    # 如果 close 为空,则返回空值
    if close is None: return

    # 计算结果
    ema1 = ema(close=close, length=length, **kwargs)
    ema2 = ema(close=ema1, length=length, **kwargs)
    ema3 = ema(close=ema2, length=length, **kwargs)
    trix = scalar * ema3.pct_change(drift)

    trix_signal = trix.rolling(signal).mean()

    # 偏移
    if offset != 0:
        trix = trix.shift(offset)
        trix_signal = trix_signal.shift(offset)

    # 处理填充
    if "fillna" in kwargs:
        trix.fillna(kwargs["fillna"], inplace=True)
        trix_signal.fillna(kwargs["fillna"], inplace=True)
    if "fill_method" in kwargs:
        trix.fillna(method=kwargs["fill_method"], inplace=True)
        trix_signal.fillna(method=kwargs["fill_method"], inplace=True)

    # 设置名称和类别
    trix.name = f"TRIX_{length}_{signal}"
    trix_signal.name = f"TRIXs_{length}_{signal}"
    trix.category = trix_signal.category = "momentum"

    # 准备返回的 DataFrame
    df = DataFrame({trix.name: trix, trix_signal.name: trix_signal})
    df.name = f"TRIX_{length}_{signal}"
    df.category = "momentum"

    return df

# 设置 trix 函数的文档字符串
trix.__doc__ = \
"""Trix (TRIX)

TRIX is a momentum oscillator to identify divergences.

Sources:
    https://www.tradingview.com/wiki/TRIX

Calculation:
    Default Inputs:
        length=18, drift=1
    EMA = Exponential Moving Average
    ROC = Rate of Change
    ema1 = EMA(close, length)
    ema2 = EMA(ema1, length)
    ema3 = EMA(ema2, length)
    TRIX = 100 * ROC(ema3, drift)

Args:
    close (pd.Series): Series of 'close's
    length (int): It's period. Default: 18
    signal (int): It's period. Default: 9
    scalar (float): How much to magnify. Default: 100
    drift (int): The difference period. Default: 1
    offset (int): How many periods to offset the result. Default: 0

Kwargs:
    fillna (value, optional): pd.DataFrame.fillna(value)
    fill_method (value, optional): Type of fill method

Returns:
    pd.Series: New feature generated.
"""

.\pandas-ta\pandas_ta\momentum\tsi.py

# -*- coding: utf-8 -*-
# 从 pandas 库导入 DataFrame 类
from pandas import DataFrame
# 从 pandas_ta.overlap 模块导入 ema, ma 函数
from pandas_ta.overlap import ema, ma
# 从 pandas_ta.utils 模块导入 get_drift, get_offset, verify_series 函数
from pandas_ta.utils import get_drift, get_offset, verify_series


# 定义 True Strength Index (TSI) 指标函数
def tsi(close, fast=None, slow=None, signal=None, scalar=None, mamode=None, drift=None, offset=None, **kwargs):
    """Indicator: True Strength Index (TSI)"""
    # 验证参数有效性
    # 如果 fast 参数存在且大于 0,则将其转换为整数,否则设为默认值 13
    fast = int(fast) if fast and fast > 0 else 13
    # 如果 slow 参数存在且大于 0,则将其转换为整数,否则设为默认值 25
    slow = int(slow) if slow and slow > 0 else 25
    # 如果 signal 参数存在且大于 0,则将其转换为整数,否则设为默认值 13
    signal = int(signal) if signal and signal > 0 else 13
    # 如果 close 序列为 None,则返回 None
    if close is None: return
    # 如果 scalar 存在,则将其转换为浮点数,否则设为默认值 100
    scalar = float(scalar) if scalar else 100
    # 获取漂移值,用于处理偏移
    drift = get_drift(drift)
    # 获取偏移量,用于处理偏移
    offset = get_offset(offset)
    # 如果 mamode 不是字符串类型,则将其设为默认值 "ema"
    mamode = mamode if isinstance(mamode, str) else "ema"
    # 如果 kwargs 中包含 "length" 键,则将其移除
    if "length" in kwargs: kwargs.pop("length")

    # 计算结果
    # 计算 close 序列的一阶差分
    diff = close.diff(drift)
    # 计算 slow 期 EMA
    slow_ema = ema(close=diff, length=slow, **kwargs)
    # 计算 fast 期 EMA
    fast_slow_ema = ema(close=slow_ema, length=fast, **kwargs)

    # 计算绝对差分
    abs_diff = diff.abs()
    # 计算 slow 期绝对差分的 EMA
    abs_slow_ema = ema(close=abs_diff, length=slow, **kwargs)
    # 计算 fast 期绝对差分的 EMA
    abs_fast_slow_ema = ema(close=abs_slow_ema, length=fast, **kwargs)

    # 计算 TSI
    tsi = scalar * fast_slow_ema / abs_fast_slow_ema
    # 计算 TSI 的信号线
    tsi_signal = ma(mamode, tsi, length=signal)

    # 处理偏移
    if offset != 0:
        tsi = tsi.shift(offset)
        tsi_signal = tsi_signal.shift(offset)

    # 处理填充
    if "fillna" in kwargs:
        tsi.fillna(kwargs["fillna"], inplace=True)
        tsi_signal.fillna(kwargs["fillna"], inplace=True)
    if "fill_method" in kwargs:
        tsi.fillna(method=kwargs["fill_method"], inplace=True)
        tsi_signal.fillna(method=kwargs["fill_method"], inplace=True)

    # 命名并分类化指标
    tsi.name = f"TSI_{fast}_{slow}_{signal}"
    tsi_signal.name = f"TSIs_{fast}_{slow}_{signal}"
    tsi.category = tsi_signal.category =  "momentum"

    # 准备返回的 DataFrame
    df = DataFrame({tsi.name: tsi, tsi_signal.name: tsi_signal})
    df.name = f"TSI_{fast}_{slow}_{signal}"
    df.category = "momentum"

    return df


# 设置 tsi 函数的文档字符串
tsi.__doc__ = \
"""True Strength Index (TSI)

The True Strength Index is a momentum indicator used to identify short-term
swings while in the direction of the trend as well as determining overbought
and oversold conditions.

Sources:
    https://www.investopedia.com/terms/t/tsi.asp

Calculation:
    Default Inputs:
        fast=13, slow=25, signal=13, scalar=100, drift=1
    EMA = Exponential Moving Average
    diff = close.diff(drift)

    slow_ema = EMA(diff, slow)
    fast_slow_ema = EMA(slow_ema, slow)

    abs_diff_slow_ema = absolute_diff_ema = EMA(ABS(diff), slow)
    abema = abs_diff_fast_slow_ema = EMA(abs_diff_slow_ema, fast)

    TSI = scalar * fast_slow_ema / abema
    Signal = EMA(TSI, signal)

Args:
    close (pd.Series): Series of 'close's
    fast (int): The short period. Default: 13
    slow (int): The long period. Default: 25
    signal (int): The signal period. Default: 13
"""
    scalar (float): How much to magnify. Default: 100
    # 定义一个浮点数变量 scalar,表示放大倍数,默认值为 100

    mamode (str): Moving Average of TSI Signal Line.
        See ```help(ta.ma)```py. Default: 'ema'
    # 定义一个字符串变量 mamode,表示 TSI 信号线的移动平均方式,默认值为 'ema',可查看 ta.ma 的帮助文档

    drift (int): The difference period. Default: 1
    # 定义一个整数变量 drift,表示差分周期,默认值为 1

    offset (int): How many periods to offset the result. Default: 0
    # 定义一个整数变量 offset,表示结果的偏移周期数,默认值为 0
# 函数参数说明部分,kwargs表示可变关键字参数
Kwargs:
    # fillna参数,用于填充缺失值的值,类型为任意
    fillna (value, optional): pd.DataFrame.fillna(value)
    # fill_method参数,填充方法的类型
    fill_method (value, optional): Type of fill method

# 返回值说明部分,返回一个pandas DataFrame对象,包含tsi和signal两列
Returns:
    # 返回的pandas DataFrame对象,包含tsi和signal两列数据
    pd.DataFrame: tsi, signal.

.\pandas-ta\pandas_ta\momentum\uo.py

# -*- coding: utf-8 -*-

# 导入 DataFrame 类
from pandas import DataFrame
# 导入 Imports 类
from pandas_ta import Imports
# 导入 get_drift 和 get_offset 函数
from pandas_ta.utils import get_drift, get_offset, verify_series


# 定义 Ultimate Oscillator(UO)指标函数
def uo(high, low, close, fast=None, medium=None, slow=None, fast_w=None, medium_w=None, slow_w=None, talib=None, drift=None, offset=None, **kwargs):
    """Indicator: Ultimate Oscillator (UO)"""
    # 验证参数
    fast = int(fast) if fast and fast > 0 else 7
    fast_w = float(fast_w) if fast_w and fast_w > 0 else 4.0
    medium = int(medium) if medium and medium > 0 else 14
    medium_w = float(medium_w) if medium_w and medium_w > 0 else 2.0
    slow = int(slow) if slow and slow > 0 else 28
    slow_w = float(slow_w) if slow_w and slow_w > 0 else 1.0
    _length = max(fast, medium, slow)
    # 验证 high、low、close 序列长度
    high = verify_series(high, _length)
    low = verify_series(low, _length)
    close = verify_series(close, _length)
    # 获取漂移和偏移量
    drift = get_drift(drift)
    offset = get_offset(offset)
    # 设置是否使用 talib 模式
    mode_tal = bool(talib) if isinstance(talib, bool) else True

    # 如果 high、low、close 有任何一个为 None,则返回空
    if high is None or low is None or close is None: return

    # 计算结果
    if Imports["talib"] and mode_tal:
        # 使用 talib 计算 UO 指标
        from talib import ULTOSC
        uo = ULTOSC(high, low, close, fast, medium, slow)
    else:
        # 否则,使用自定义计算方法
        tdf = DataFrame({
            "high": high,
            "low": low,
            f"close_{drift}": close.shift(drift)
        })
        # 获取最大最小值
        max_h_or_pc = tdf.loc[:, ["high", f"close_{drift}"]].max(axis=1)
        min_l_or_pc = tdf.loc[:, ["low", f"close_{drift}"]].min(axis=1)
        del tdf

        # 计算 buying pressure(bp)和 true range(tr)
        bp = close - min_l_or_pc
        tr = max_h_or_pc - min_l_or_pc

        # 计算 fast、medium、slow 平均值
        fast_avg = bp.rolling(fast).sum() / tr.rolling(fast).sum()
        medium_avg = bp.rolling(medium).sum() / tr.rolling(medium).sum()
        slow_avg = bp.rolling(slow).sum() / tr.rolling(slow).sum()

        # 计算总权重和加权平均值
        total_weight = fast_w + medium_w + slow_w
        weights = (fast_w * fast_avg) + (medium_w * medium_avg) + (slow_w * slow_avg)
        uo = 100 * weights / total_weight

    # 考虑偏移量
    if offset != 0:
        uo = uo.shift(offset)

    # 处理填充
    if "fillna" in kwargs:
        uo.fillna(kwargs["fillna"], inplace=True)
    if "fill_method" in kwargs:
        uo.fillna(method=kwargs["fill_method"], inplace=True)

    # 设置指标名称和分类
    uo.name = f"UO_{fast}_{medium}_{slow}"
    uo.category = "momentum"

    # 返回 Ultimate Oscillator(UO)指标
    return uo


# 设置 Ultimate Oscillator(UO)指标的文档字符串
uo.__doc__ = \
"""Ultimate Oscillator (UO)

The Ultimate Oscillator is a momentum indicator over three different
periods. It attempts to correct false divergence trading signals.

Sources:
    https://www.tradingview.com/wiki/Ultimate_Oscillator_(UO)

Calculation:
    Default Inputs:
        fast=7, medium=14, slow=28,
        fast_w=4.0, medium_w=2.0, slow_w=1.0, drift=1
    min_low_or_pc  = close.shift(drift).combine(low, min)
    max_high_or_pc = close.shift(drift).combine(high, max)

    bp = buying pressure = close - min_low_or_pc
    tr = true range = max_high_or_pc - min_low_or_pc

    fast_avg = SUM(bp, fast) / SUM(tr, fast)
"""
    # 计算中等速度的平均值,中等速度报告的总和除以中等速度报告的数量
    medium_avg = SUM(bp, medium) / SUM(tr, medium)
    
    # 计算慢速度的平均值,慢速度报告的总和除以慢速度报告的数量
    slow_avg = SUM(bp, slow) / SUM(tr, slow)
    
    # 计算所有速度权重的总和
    total_weight = fast_w + medium_w + slow_w
    
    # 计算加权平均值,每个速度报告的权重乘以其对应的平均值,然后相加
    weights = (fast_w * fast_avg) + (medium_w * medium_avg) + (slow_w * slow_avg)
    
    # 计算不确定性指标(UO),即权重总和乘以100除以所有权重的总和
    UO = 100 * weights / total_weight
# 参数说明:
# high (pd.Series): 'high' 数据序列
# low (pd.Series): 'low' 数据序列
# close (pd.Series): 'close' 数据序列
# fast (int): 快速 %K 周期。默认值:7
# medium (int): 慢速 %K 周期。默认值:14
# slow (int): 慢速 %D 周期。默认值:28
# fast_w (float): 快速 %K 周期。默认值:4.0
# medium_w (float): 慢速 %K 周期。默认值:2.0
# slow_w (float): 慢速 %D 周期。默认值:1.0
# talib (bool): 如果安装了 TA Lib 并且 talib 为 True,则返回 TA Lib 版本。默认值:True
# drift (int): 差异周期。默认值:1
# offset (int): 结果的偏移周期数。默认值:0

# 可选参数:
# fillna (value, optional): pd.DataFrame.fillna(value) 的填充值
# fill_method (value, optional): 填充方法的类型

# 返回值:
# pd.Series: 生成的新特征序列

.\pandas-ta\pandas_ta\momentum\willr.py

# -*- coding: utf-8 -*-
# 从 pandas_ta 库中导入 Imports 类
from pandas_ta import Imports
# 从 pandas_ta.utils 模块中导入 get_offset 和 verify_series 函数
from pandas_ta.utils import get_offset, verify_series


def willr(high, low, close, length=None, talib=None, offset=None, **kwargs):
    """Indicator: William's Percent R (WILLR)"""
    # 验证参数
    # 如果 length 存在且大于 0,则将其转换为整数,否则默认为 14
    length = int(length) if length and length > 0 else 14
    # 如果 kwargs 中存在 "min_periods",并且其值不为 None,则将其转换为整数,否则使用 length 的值
    min_periods = int(kwargs["min_periods"]) if "min_periods" in kwargs and kwargs["min_periods"] is not None else length
    # 确定最终使用的长度为 length 和 min_periods 中的较大值
    _length = max(length, min_periods)
    # 验证 high、low、close 系列,并设置它们的长度为 _length
    high = verify_series(high, _length)
    low = verify_series(low, _length)
    close = verify_series(close, _length)
    # 获取偏移量
    offset = get_offset(offset)
    # 确定是否使用 TA-Lib
    mode_tal = bool(talib) if isinstance(talib, bool) else True

    # 如果 high、low、close 中有任何一个为 None,则返回空
    if high is None or low is None or close is None: return

    # 计算结果
    if Imports["talib"] and mode_tal:
        # 如果 TA-Lib 可用且 mode_tal 为 True,则使用 TA-Lib 中的 WILLR 函数计算
        from talib import WILLR
        willr = WILLR(high, low, close, length)
    else:
        # 否则,使用自定义方法计算 WILLR
        # 计算长度为 length 的最低低点
        lowest_low = low.rolling(length, min_periods=min_periods).min()
        # 计算长度为 length 的最高高点
        highest_high = high.rolling(length, min_periods=min_periods).max()
        # 计算 WILLR
        willr = 100 * ((close - lowest_low) / (highest_high - lowest_low) - 1)

    # 根据偏移量对结果进行偏移
    if offset != 0:
        willr = willr.shift(offset)

    # 处理填充值
    if "fillna" in kwargs:
        willr.fillna(kwargs["fillna"], inplace=True)
    if "fill_method" in kwargs:
        willr.fillna(method=kwargs["fill_method"], inplace=True)

    # 设置指标名称和类别
    willr.name = f"WILLR_{length}"
    willr.category = "momentum"

    # 返回计算结果
    return willr


# 设置函数文档字符串
willr.__doc__ = \
"""William's Percent R (WILLR)

William's Percent R is a momentum oscillator similar to the RSI that
attempts to identify overbought and oversold conditions.

Sources:
    https://www.tradingview.com/wiki/Williams_%25R_(%25R)

Calculation:
    Default Inputs:
        length=20
    LL = low.rolling(length).min()
    HH = high.rolling(length).max()

    WILLR = 100 * ((close - LL) / (HH - LL) - 1)

Args:
    high (pd.Series): Series of 'high's
    low (pd.Series): Series of 'low's
    close (pd.Series): Series of 'close's
    length (int): It's period. Default: 14
    talib (bool): If TA Lib is installed and talib is True, Returns the TA Lib
        version. Default: True
    offset (int): How many periods to offset the result. Default: 0

Kwargs:
    fillna (value, optional): pd.DataFrame.fillna(value)
    fill_method (value, optional): Type of fill method

Returns:
    pd.Series: New feature generated.
"""

标签:slow,PandasTA,kwargs,length,源码,offset,close,fillna,解析
From: https://www.cnblogs.com/apachecn/p/18135776

相关文章

  • 【合合TextIn】智能文档处理系列—电子文档解析技术全格式解析
    一、引言在当今的数字化时代,电子文档已成为信息存储和交流的基石。从简单的文本文件到复杂的演示文档,各种格式的电子文档承载着丰富的知识与信息,支撑着教育、科研、商业和日常生活的各个方面。随着信息量的爆炸性增长,如何高效、准确地处理和分析这些电子文档,已经成为信息技术领......
  • 2、APIView执行流程以及request对象源码分析
    一、基于View编写5个接口1、创建模型表models.pyfromdjango.dbimportmodelsclassBook(models.Model):name=models.CharField(max_length=64)price=models.IntegerField()publish=models.CharField(max_length=32)2、视图函数views.pyfrom......
  • JDK 源码阅读:java.lang.Object 类
    记录下自己阅读过程的笔记,如有错误,欢迎指正!源码参考:https://github.com/kangjianwei/LearningJDK/tree/master1.基本介绍在Java中,Object类是类层次结构的根类几乎每个Java类都直接或间接继承自Object类,意味着每个类都继承了Object的方法类结构:2.源码分析2.......
  • AddHandler导致的解析漏洞
    https://www.freebuf.com/vuls/303745.html在etc/apache/site-enabled下设置配置文件添加AddHandlerapplication/x-httpd-php.php然后重启apache服务配置问题导致总结:1)如果在Apache的/etc/apache2/apache2.conf里有这样的配置<FilesMatch"YLion.jpg">​SetHandler......
  • populateBean方法解析
    populateBean方法实现的功能autowired解析Autowired实例,code如下:packagecom.gientech.populateBean.annotation;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Controller;@ControllerpublicclassBook......
  • AOP解析
    AOPbean准备1AOPsample1.1创建切面packagecom.gientech.aop.xml.util;importorg.aspectj.lang.JoinPoint;importorg.aspectj.lang.ProceedingJoinPoint;importorg.aspectj.lang.Signature;importjava.util.Arrays;publicclassLogUtil{publicvoid......
  • SRPCore GenerateHLSL解析及其扩展
    序言在之前的项目开发HDRP的时候,就觉得绑定RT的管理就像一坨屎一样难受,改了管线RT的绑定顺序,原来的Shader输出又会有问题(主要是SV_Target顺序问题),没办法只能够在管线原来的绑定上面,重新写了一份文件,让Shader的共用Pass引用,这样就能规范不同Shaderpass的输出了。但是这只是解......
  • 大模型时代的PDF解析工具
    去年(2023年)是大模型爆发元年。但是大模型具有两个缺点:缺失私有领域知识和幻觉。缺失私有领域知识是指大模型训练时并没有企业私有数据/知识,所以无法正确回答相关问题。并且在这种情况下,大模型会一本正经地胡说八道(即幻觉),给出错误的回答。那么如何解决这两个缺点?目前主要有两种方......
  • 短视频app源码,一文带你轻松搞懂前端大文件上传思路
    短视频app源码,一文带你轻松搞懂前端大文件上传思路文件上传是我们在平时开发短视频app源码中经常会遇到的业务,如果只是简单的文件上传那还不足以作为项目亮点,而当我们给它加上切片、续传的功能,就不一样了。本文会带大家搞明白这些功能的实现思路,主要聚焦于前端部分,基于Vue3......
  • 视频直播源码,不同业务场景需选择不同方案去缓存数据
    视频直播源码,不同业务场景需选择不同方案去缓存数据在开发视频直播源码时,针对不同业务场景,我们应该选择不同的方案去缓存数据。本文就针对最常见的存储方案和场景做一些分类和介绍一些在Vue/React中的高阶用法,助力前端开发体验和应用的稳定性。前端缓存方案确定不同场......