首页 > 编程语言 >PandasTA 源码解析(九)

PandasTA 源码解析(九)

时间:2024-04-15 13:59:23浏览次数:32  
标签:PandasTA sum kwargs length 源码 offset close 解析 pandas

.\pandas-ta\pandas_ta\overlap\jma.py

# -*- coding: utf-8 -*-
# 从 numpy 库中导入 average 函数并重命名为 npAverage
# 从 numpy 库中导入 nan 函数并重命名为 npNaN
# 从 numpy 库中导入 log 函数并重命名为 npLog
# 从 numpy 库中导入 power 函数并重命名为 npPower
# 从 numpy 库中导入 sqrt 函数并重命名为 npSqrt
# 从 numpy 库中导入 zeros_like 函数并重命名为 npZeroslike
# 从 pandas 库中导入 Series 类
# 从 pandas_ta.utils 模块中导入 get_offset 和 verify_series 函数

def jma(close, length=None, phase=None, offset=None, **kwargs):
    """Indicator: Jurik Moving Average (JMA)"""
    # 验证参数
    _length = int(length) if length and length > 0 else 7
    phase = float(phase) if phase and phase != 0 else 0
    close = verify_series(close, _length)
    offset = get_offset(offset)
    if close is None: return

    # 定义基本变量
    jma = npZeroslike(close)
    volty = npZeroslike(close)
    v_sum = npZeroslike(close)

    kv = det0 = det1 = ma2 = 0.0
    jma[0] = ma1 = uBand = lBand = close[0]

    # 静态变量
    sum_length = 10
    length = 0.5 * (_length - 1)
    pr = 0.5 if phase < -100 else 2.5 if phase > 100 else 1.5 + phase * 0.01
    length1 = max((npLog(npSqrt(length)) / npLog(2.0)) + 2.0, 0)
    pow1 = max(length1 - 2.0, 0.5)
    length2 = length1 * npSqrt(length)
    bet = length2 / (length2 + 1)
    beta = 0.45 * (_length - 1) / (0.45 * (_length - 1) + 2.0)

    m = close.shape[0]
    for i in range(1, m):
        price = close[i]

        # 价格波动性
        del1 = price - uBand
        del2 = price - lBand
        volty[i] = max(abs(del1),abs(del2)) if abs(del1)!=abs(del2) else 0

        # 相对价格波动性因子
        v_sum[i] = v_sum[i - 1] + (volty[i] - volty[max(i - sum_length, 0)]) / sum_length
        avg_volty = npAverage(v_sum[max(i - 65, 0):i + 1])
        d_volty = 0 if avg_volty ==0 else volty[i] / avg_volty
        r_volty = max(1.0, min(npPower(length1, 1 / pow1), d_volty))

        # Jurik 波动性带
        pow2 = npPower(r_volty, pow1)
        kv = npPower(bet, npSqrt(pow2))
        uBand = price if (del1 > 0) else price - (kv * del1)
        lBand = price if (del2 < 0) else price - (kv * del2)

        # Jurik 动态因子
        power = npPower(r_volty, pow1)
        alpha = npPower(beta, power)

        # 第一阶段 - 通过自适应 EMA 进行初步平滑
        ma1 = ((1 - alpha) * price) + (alpha * ma1)

        # 第二阶段 - 通过 Kalman 滤波器进行一次额外的初步平滑
        det0 = ((price - ma1) * (1 - beta)) + (beta * det0)
        ma2 = ma1 + pr * det0

        # 第三阶段 - 通过独特的 Jurik 自适应滤波器进行最终平滑
        det1 = ((ma2 - jma[i - 1]) * (1 - alpha) * (1 - alpha)) + (alpha * alpha * det1)
        jma[i] = jma[i-1] + det1

    # 移除初始回看数据并转换为 pandas Series
    jma[0:_length - 1] = npNaN
    jma = Series(jma, index=close.index)

    # 偏移
    if offset != 0:
        jma = jma.shift(offset)

    # 处理填充
    if "fillna" in kwargs:
        jma.fillna(kwargs["fillna"], inplace=True)
    # 检查是否存在 "fill_method" 参数在传入的关键字参数 kwargs 中
    if "fill_method" in kwargs:
        # 如果存在,使用指定的填充方法对 DataFrame 进行填充,并在原地修改
        jma.fillna(method=kwargs["fill_method"], inplace=True)

    # 设置 JMA 对象的名称为格式化后的字符串,包含长度和相位信息
    jma.name = f"JMA_{_length}_{phase}"
    # 设置 JMA 对象的类别为 "overlap"
    jma.category = "overlap"

    # 返回填充后的 JMA 对象
    return jma
# 将 jma 的文档字符串设置为 JMA 指标的说明文档
jma.__doc__ = \
"""Jurik Moving Average Average (JMA)

Mark Jurik's Moving Average (JMA) attempts to eliminate noise to see the "true"
underlying activity. It has extremely low lag, is very smooth and is responsive
to market gaps.

Sources:
    https://c.mql5.com/forextsd/forum/164/jurik_1.pdf
    https://www.prorealcode.com/prorealtime-indicators/jurik-volatility-bands/

Calculation:
    Default Inputs:
        length=7, phase=0

Args:
    close (pd.Series): Series of 'close's
    length (int): Period of calculation. Default: 7
    phase (float): How heavy/light the average is [-100, 100]. Default: 0
    offset (int): How many lengths to offset the result. Default: 0

Kwargs:
    fillna (value, optional): pd.DataFrame.fillna(value)
    fill_method (value, optional): Type of fill method

Returns:
    pd.Series: New feature generated.
"""

.\pandas-ta\pandas_ta\overlap\kama.py

# -*- coding: utf-8 -*-
# 导入 numpy 库中的 nan 别名 npNaN
from numpy import nan as npNaN
# 导入 pandas 库中的 Series 类
from pandas import Series
# 从 pandas_ta 库中的 utils 模块中导入 get_drift、get_offset、non_zero_range、verify_series 函数
from pandas_ta.utils import get_drift, get_offset, non_zero_range, verify_series


# 定义 Kaufman's Adaptive Moving Average (KAMA) 函数
def kama(close, length=None, fast=None, slow=None, drift=None, offset=None, **kwargs):
    """Indicator: Kaufman's Adaptive Moving Average (KAMA)"""
    # 验证参数
    # 如果 length 不为空且大于 0,则将其转换为整数,否则设为默认值 10
    length = int(length) if length and length > 0 else 10
    # 如果 fast 不为空且大于 0,则将其转换为整数,否则设为默认值 2
    fast = int(fast) if fast and fast > 0 else 2
    # 如果 slow 不为空且大于 0,则将其转换为整数,否则设为默认值 30
    slow = int(slow) if slow and slow > 0 else 30
    # 验证 close 序列,长度为 fast、slow、length 中的最大值
    close = verify_series(close, max(fast, slow, length))
    # 获取 drift 参数
    drift = get_drift(drift)
    # 获取 offset 参数
    offset = get_offset(offset)

    # 如果 close 为空,则返回空
    if close is None: return

    # 计算结果
    # 定义 weight 函数,用于计算权重
    def weight(length: int) -> float:
        return 2 / (length + 1)

    # 计算 fast 和 slow 的权重
    fr = weight(fast)
    sr = weight(slow)

    # 计算绝对差和同侧差
    abs_diff = non_zero_range(close, close.shift(length)).abs()
    peer_diff = non_zero_range(close, close.shift(drift)).abs()
    peer_diff_sum = peer_diff.rolling(length).sum()
    er = abs_diff / peer_diff_sum
    x = er * (fr - sr) + sr
    sc = x * x

    # 获取 close 序列的长度
    m = close.size
    # 初始化结果列表,前 length-1 个值为 npNaN,最后一个值为 0
    result = [npNaN for _ in range(0, length - 1)] + [0]
    # 遍历计算 KAMA
    for i in range(length, m):
        result.append(sc.iloc[i] * close.iloc[i] + (1 - sc.iloc[i]) * result[i - 1])

    # 将结果转换为 Series 类型,索引为 close 序列的索引
    kama = Series(result, index=close.index)

    # 偏移结果
    if offset != 0:
        kama = kama.shift(offset)

    # 处理填充值
    if "fillna" in kwargs:
        kama.fillna(kwargs["fillna"], inplace=True)
    if "fill_method" in kwargs:
        kama.fillna(method=kwargs["fill_method"], inplace=True)

    # 设置指标名称和类别
    kama.name = f"KAMA_{length}_{fast}_{slow}"
    kama.category = "overlap"

    # 返回 KAMA 序列
    return kama


# 设置 KAMA 函数的文档字符串
kama.__doc__ = \
"""Kaufman's Adaptive Moving Average (KAMA)

Developed by Perry Kaufman, Kaufman's Adaptive Moving Average (KAMA) is a moving average
designed to account for market noise or volatility. KAMA will closely follow prices when
the price swings are relatively small and the noise is low. KAMA will adjust when the
price swings widen and follow prices from a greater distance. This trend-following indicator
can be used to identify the overall trend, time turning points and filter price movements.

Sources:
    https://stockcharts.com/school/doku.php?id=chart_school:technical_indicators:kaufman_s_adaptive_moving_average
    https://www.tradingview.com/script/wZGOIz9r-REPOST-Indicators-3-Different-Adaptive-Moving-Averages/

Calculation:
    Default Inputs:
        length=10

Args:
    close (pd.Series): Series of 'close's
    length (int): It's period. Default: 10
    fast (int): Fast MA period. Default: 2
    slow (int): Slow MA period. Default: 30
    drift (int): The difference period. Default: 1
    offset (int): How many periods to offset the result. Default: 0

Kwargs:
    fillna (value, optional): pd.DataFrame.fillna(value)
    fill_method (value, optional): Type of fill method

Returns:
    pd.Series: New feature generated.
"""

.\pandas-ta\pandas_ta\overlap\linreg.py

# -*- coding: utf-8 -*-
# 从 numpy 库导入 array 并重命名为 npArray,导入 arctan 并重命名为 npAtan,导入 nan 并重命名为 npNaN,导入 pi 并重命名为 npPi,从 numpy 版本导入 version 并重命名为 npVersion
from numpy import array as npArray
from numpy import arctan as npAtan
from numpy import nan as npNaN
from numpy import pi as npPi
from numpy.version import version as npVersion
# 从 pandas 库导入 Series,从 pandas_ta.utils 导入 get_offset 和 verify_series 函数
from pandas import Series
from pandas_ta.utils import get_offset, verify_series


def linreg(close, length=None, offset=None, **kwargs):
    """Indicator: Linear Regression"""
    # 验证参数
    # 如果 length 存在且大于 0,则将其转换为整数,否则设为默认值 14
    length = int(length) if length and length > 0 else 14
    # 验证 close 是否为有效的 Series,并设定长度
    close = verify_series(close, length)
    # 获取偏移量
    offset = get_offset(offset)
    # 获取其他参数:angle、intercept、degrees、r、slope、tsf
    angle = kwargs.pop("angle", False)
    intercept = kwargs.pop("intercept", False)
    degrees = kwargs.pop("degrees", False)
    r = kwargs.pop("r", False)
    slope = kwargs.pop("slope", False)
    tsf = kwargs.pop("tsf", False)

    # 如果 close 为空,则返回 None
    if close is None: return

    # 计算结果
    # 生成 x 轴的值,范围从 1 到 length
    x = range(1, length + 1)  # [1, 2, ..., n] from 1 to n keeps Sum(xy) low
    # 计算 x 的和
    x_sum = 0.5 * length * (length + 1)
    # 计算 x 的平方和
    x2_sum = x_sum * (2 * length + 1) / 3
    # 计算除数
    divisor = length * x2_sum - x_sum * x_sum

    # 定义线性回归函数
    def linear_regression(series):
        # 计算 y 的和
        y_sum = series.sum()
        # 计算 x*y 的和
        xy_sum = (x * series).sum()

        # 计算斜率
        m = (length * xy_sum - x_sum * y_sum) / divisor
        # 如果 slope 为 True,则返回斜率
        if slope:
            return m
        # 计算截距
        b = (y_sum * x2_sum - x_sum * xy_sum) / divisor
        # 如果 intercept 为 True,则返回截距
        if intercept:
            return b

        # 如果 angle 为 True,则计算角度
        if angle:
            theta = npAtan(m)
            # 如果 degrees 为 True,则将角度转换为度
            if degrees:
                theta *= 180 / npPi
            return theta

        # 如果 r 为 True,则计算相关系数
        if r:
            # 计算 y^2 的和
            y2_sum = (series * series).sum()
            # 计算相关系数的分子
            rn = length * xy_sum - x_sum * y_sum
            # 计算相关系数的分母
            rd = (divisor * (length * y2_sum - y_sum * y_sum)) ** 0.5
            return rn / rd

        # 如果 tsf 为 True,则进行时间序列调整
        return m * length + b if tsf else m * (length - 1) + b

    # 定义滚动窗口函数
    def rolling_window(array, length):
        """https://github.com/twopirllc/pandas-ta/issues/285"""
        strides = array.strides + (array.strides[-1],)
        shape = array.shape[:-1] + (array.shape[-1] - length + 1, length)
        return as_strided(array, shape=shape, strides=strides)

    # 如果 numpy 版本大于等于 1.20.0,则使用 sliding_window_view 函数
    if npVersion >= "1.20.0":
        from numpy.lib.stride_tricks import sliding_window_view
        # 对于滑动窗口内的每个窗口,应用线性回归函数
        linreg_ = [linear_regression(_) for _ in sliding_window_view(npArray(close), length)]
    else:
        # 否则,使用 rolling_window 函数
        from numpy.lib.stride_tricks import as_strided
        # 对于滚动窗口内的每个窗口,应用线性回归函数
        linreg_ = [linear_regression(_) for _ in rolling_window(npArray(close), length)]

    # 创建 Series 对象,索引为 close 的索引,值为线性回归的结果
    linreg = Series([npNaN] * (length - 1) + linreg_, index=close.index)

    # 偏移结果
    if offset != 0:
        linreg = linreg.shift(offset)

    # 处理填充
    if "fillna" in kwargs:
        linreg.fillna(kwargs["fillna"], inplace=True)
    if "fill_method" in kwargs:
        linreg.fillna(method=kwargs["fill_method"], inplace=True)

    # 设置名称和分类
    linreg.name = f"LR"
    if slope: linreg.name += "m"
    if intercept: linreg.name += "b"
    if angle: linreg.name += "a"
    if r: linreg.name += "r"

    linreg.name += f"_{length}"
    linreg.category = "overlap"

    return linreg
# 将文档字符串赋值给线性回归移动平均函数的__doc__属性,用于函数说明和文档生成
linreg.__doc__ = \
"""Linear Regression Moving Average (linreg)

Linear Regression Moving Average (LINREG). This is a simplified version of a
Standard Linear Regression. LINREG is a rolling regression of one variable. A
Standard Linear Regression is between two or more variables.

Source: TA Lib

Calculation:
    Default Inputs:
        length=14
    x = [1, 2, ..., n]
    x_sum = 0.5 * length * (length + 1)
    x2_sum = length * (length + 1) * (2 * length + 1) / 6
    divisor = length * x2_sum - x_sum * x_sum

    # 定义线性回归函数lr(series),用于计算移动平均
    lr(series):
        # 计算系列的总和、平方和以及x和y的乘积和
        y_sum = series.sum()
        y2_sum = (series* series).sum()
        xy_sum = (x * series).sum()

        # 计算回归线的斜率m和截距b
        m = (length * xy_sum - x_sum * y_sum) / divisor
        b = (y_sum * x2_sum - x_sum * xy_sum) / divisor
        return m * (length - 1) + b

    # 使用rolling函数对close进行移动窗口处理,并应用lr函数计算移动平均
    linreg = close.rolling(length).apply(lr)

Args:
    close (pd.Series): Series of 'close's
    length (int): It's period.  Default: 10
    offset (int): How many periods to offset the result.  Default: 0

Kwargs:
    angle (bool, optional): If True, returns the angle of the slope in radians.
        Default: False.
    degrees (bool, optional): If True, returns the angle of the slope in
        degrees. Default: False.
    intercept (bool, optional): If True, returns the angle of the slope in
        radians. Default: False.
    r (bool, optional): If True, returns it's correlation 'r'. Default: False.
    slope (bool, optional): If True, returns the slope. Default: False.
    tsf (bool, optional): If True, returns the Time Series Forecast value.
        Default: False.
    fillna (value, optional): pd.DataFrame.fillna(value)
    fill_method (value, optional): Type of fill method

Returns:
    pd.Series: New feature generated.
"""

.\pandas-ta\pandas_ta\overlap\ma.py

# 设置文件编码为 UTF-8
# 导入 Series 类
from pandas import Series

# 导入不同的移动平均方法
# 从模块中导入指定的函数
from .dema import dema
from .ema import ema
from .fwma import fwma
from .hma import hma
from .linreg import linreg
from .midpoint import midpoint
from .pwma import pwma
from .rma import rma
from .sinwma import sinwma
from .sma import sma
from .swma import swma
from .t3 import t3
from .tema import tema
from .trima import trima
from .vidya import vidya
from .wma import wma
from .zlma import zlma

# 定义移动平均函数,用于简化移动平均的选择
def ma(name:str = None, source:Series = None, **kwargs) -> Series:
    """Simple MA Utility for easier MA selection

    Available MAs:
        dema, ema, fwma, hma, linreg, midpoint, pwma, rma,
        sinwma, sma, swma, t3, tema, trima, vidya, wma, zlma

    Examples:
        ema8 = ta.ma("ema", df.close, length=8)
        sma50 = ta.ma("sma", df.close, length=50)
        pwma10 = ta.ma("pwma", df.close, length=10, asc=False)

    Args:
        name (str): One of the Available MAs. Default: "ema"
        source (pd.Series): The 'source' Series.

    Kwargs:
        Any additional kwargs the MA may require.

    Returns:
        pd.Series: New feature generated.
    """

    # 支持的移动平均方法列表
    _mas = [
        "dema", "ema", "fwma", "hma", "linreg", "midpoint", "pwma", "rma",
        "sinwma", "sma", "swma", "t3", "tema", "trima", "vidya", "wma", "zlma"
    ]
    # 如果没有指定移动平均方法和数据源,则返回支持的移动平均方法列表
    if name is None and source is None:
        return _mas
    # 如果指定的移动平均方法是字符串,并且在支持的方法列表中,则转换为小写
    elif isinstance(name, str) and name.lower() in _mas:
        name = name.lower()
    else: # "ema"
        # 如果未指定移动平均方法,则默认选择 EMA
        name = _mas[1]

    # 根据选择的移动平均方法调用相应的函数
    if   name == "dema": return dema(source, **kwargs)
    elif name == "fwma": return fwma(source, **kwargs)
    elif name == "hma": return hma(source, **kwargs)
    elif name == "linreg": return linreg(source, **kwargs)
    elif name == "midpoint": return midpoint(source, **kwargs)
    elif name == "pwma": return pwma(source, **kwargs)
    elif name == "rma": return rma(source, **kwargs)
    elif name == "sinwma": return sinwma(source, **kwargs)
    elif name == "sma": return sma(source, **kwargs)
    elif name == "swma": return swma(source, **kwargs)
    elif name == "t3": return t3(source, **kwargs)
    elif name == "tema": return tema(source, **kwargs)
    elif name == "trima": return trima(source, **kwargs)
    elif name == "vidya": return vidya(source, **kwargs)
    elif name == "wma": return wma(source, **kwargs)
    elif name == "zlma": return zlma(source, **kwargs)
    else: return ema(source, **kwargs)

.\pandas-ta\pandas_ta\overlap\mcgd.py

# -*- coding: utf-8 -*-
# 导入所需模块和函数
from pandas_ta.utils import get_offset, verify_series


def mcgd(close, length=None, offset=None, c=None, **kwargs):
    """Indicator: McGinley Dynamic Indicator"""
    # 验证参数有效性
    # 如果 length 存在且大于 0,则将其转换为整数;否则,默认为 10
    length = int(length) if length and length > 0 else 10
    # 如果 c 存在且在 0 到 1 之间,则将其转换为浮点数;否则,默认为 1
    c = float(c) if c and 0 < c <= 1 else 1
    # 验证 close 是否为有效的 Series,并将其长度限制为 length
    close = verify_series(close, length)
    # 获取偏移量
    offset = get_offset(offset)

    # 如果 close 为 None,则返回空值
    if close is None: return

    # 计算结果
    # 复制 close Series,避免直接修改原始数据
    close = close.copy()

    # 定义 McGinley Dynamic Indicator 计算函数
    def mcg_(series):
        # 计算分母
        denom = (c * length * (series.iloc[1] / series.iloc[0]) ** 4)
        # 计算 McGinley Dynamic Indicator
        series.iloc[1] = (series.iloc[0] + ((series.iloc[1] - series.iloc[0]) / denom))
        return series.iloc[1]

    # 应用 mcg_ 函数到 rolling window 上,计算 McGinley Dynamic Indicator
    mcg_cell = close[0:].rolling(2, min_periods=2).apply(mcg_, raw=False)
    # 将第一个值添加回结果 Series 中
    mcg_ds = close[:1].append(mcg_cell[1:])

    # 偏移结果 Series
    if offset != 0:
        mcg_ds = mcg_ds.shift(offset)

    # 处理填充值
    if "fillna" in kwargs:
        mcg_ds.fillna(kwargs["fillna"], inplace=True)
    if "fill_method" in kwargs:
        mcg_ds.fillna(method=kwargs["fill_method"], inplace=True)

    # 设置结果 Series 的名称和类别
    mcg_ds.name = f"MCGD_{length}"
    mcg_ds.category = "overlap"

    return mcg_ds


# 设置 McGinley Dynamic Indicator 的文档字符串
mcgd.__doc__ = \
"""McGinley Dynamic Indicator

The McGinley Dynamic looks like a moving average line, yet it is actually a
smoothing mechanism for prices that minimizes price separation, price whipsaws,
and hugs prices much more closely. Because of the calculation, the Dynamic Line
speeds up in down markets as it follows prices yet moves more slowly in up
markets. The indicator was designed by John R. McGinley, a Certified Market
Technician and former editor of the Market Technicians Association's Journal
of Technical Analysis.

Sources:
    https://www.investopedia.com/articles/forex/09/mcginley-dynamic-indicator.asp

Calculation:
    Default Inputs:
        length=10
        offset=0
        c=1

    def mcg_(series):
        denom = (constant * length * (series.iloc[1] / series.iloc[0]) ** 4)
        series.iloc[1] = (series.iloc[0] + ((series.iloc[1] - series.iloc[0]) / denom))
        return series.iloc[1]
    mcg_cell = close[0:].rolling(2, min_periods=2).apply(mcg_, raw=False)
    mcg_ds = close[:1].append(mcg_cell[1:])

Args:
    close (pd.Series): Series of 'close's
    length (int): Indicator's period. Default: 10
    offset (int): Number of periods to offset the result. Default: 0
    c (float): Multiplier for the denominator, sometimes set to 0.6. Default: 1

Kwargs:
    fillna (value, optional): pd.DataFrame.fillna(value)
    fill_method (value, optional): Type of fill method

Returns:
    pd.Series: New feature generated.
"""

.\pandas-ta\pandas_ta\overlap\midpoint.py

# 设置文件编码为 UTF-8
# -*- coding: utf-8 -*-

# 从 pandas_ta 包中导入 Imports 模块
from pandas_ta import Imports
# 从 pandas_ta.utils 模块中导入 get_offset 和 verify_series 函数
from pandas_ta.utils import get_offset, verify_series

# 定义函数 midpoint,用于计算 Midpoint 指标
def midpoint(close, length=None, talib=None, offset=None, **kwargs):
    """Indicator: Midpoint"""
    # 验证参数
    # 如果 length 不为空且大于 0,则转换为整数,否则设为默认值 2
    length = int(length) if length and length > 0 else 2
    # 如果 kwargs 中包含 "min_periods" 键并且值不为空,则转换为整数,否则设为与 length 相同的值
    min_periods = int(kwargs["min_periods"]) if "min_periods" in kwargs and kwargs["min_periods"] is not None else length
    # 验证 close 是否为有效序列,长度至少为 length 或 min_periods
    close = verify_series(close, max(length, min_periods))
    # 获取偏移量
    offset = get_offset(offset)
    # 判断是否使用 talib 库,默认为 True
    mode_tal = bool(talib) if isinstance(talib, bool) else True

    # 如果 close 为空,则返回 None
    if close is None: return

    # 计算结果
    # 如果导入了 talib 并且 mode_tal 为 True
    if Imports["talib"] and mode_tal:
        # 从 talib 库中导入 MIDPOINT 函数,计算 Midpoint 指标
        from talib import MIDPOINT
        midpoint = MIDPOINT(close, length)
    else:
        # 使用 rolling 函数计算最低价和最高价的移动窗口,然后计算 Midpoint
        lowest = close.rolling(length, min_periods=min_periods).min()
        highest = close.rolling(length, min_periods=min_periods).max()
        midpoint = 0.5 * (lowest + highest)

    # 偏移结果
    if offset != 0:
        midpoint = midpoint.shift(offset)

    # 处理填充值
    if "fillna" in kwargs:
        midpoint.fillna(kwargs["fillna"], inplace=True)
    if "fill_method" in kwargs:
        midpoint.fillna(method=kwargs["fill_method"], inplace=True)

    # 设置指标名称和类别
    midpoint.name = f"MIDPOINT_{length}"
    midpoint.category = "overlap"

    # 返回 Midpoint 指标
    return midpoint

.\pandas-ta\pandas_ta\overlap\midprice.py

# 设置文件编码为 UTF-8
# -*- coding: utf-8 -*-

# 从 pandas_ta 库导入 Imports 类
from pandas_ta import Imports
# 从 pandas_ta.utils 模块导入 get_offset 和 verify_series 函数
from pandas_ta.utils import get_offset, verify_series


def midprice(high, low, length=None, talib=None, offset=None, **kwargs):
    """Indicator: Midprice"""
    # 验证参数
    # 如果 length 存在且大于 0,则转换为整数;否则,默认为 2
    length = int(length) if length and length > 0 else 2
    # 如果 kwargs 中存在 "min_periods",则将其转换为整数;否则,默认为 length
    min_periods = int(kwargs["min_periods"]) if "min_periods" in kwargs and kwargs["min_periods"] is not None else length
    # 计算有效期长度
    _length = max(length, min_periods)
    # 验证 high 和 low 系列
    high = verify_series(high, _length)
    low = verify_series(low, _length)
    # 获取偏移量
    offset = get_offset(offset)
    # 判断是否启用 talib 模式
    mode_tal = bool(talib) if isinstance(talib, bool) else True

    # 如果 high 或 low 为 None,则返回
    if high is None or low is None: return

    # 计算结果
    if Imports["talib"] and mode_tal:
        # 使用 talib 计算 MIDPRICE 指标
        from talib import MIDPRICE
        midprice = MIDPRICE(high, low, length)
    else:
        # 计算最低低点和最高高点的滚动窗口
        lowest_low = low.rolling(length, min_periods=min_periods).min()
        highest_high = high.rolling(length, min_periods=min_periods).max()
        # 计算中间价
        midprice = 0.5 * (lowest_low + highest_high)

    # 偏移结果
    if offset != 0:
        midprice = midprice.shift(offset)

    # 处理填充值
    if "fillna" in kwargs:
        midprice.fillna(kwargs["fillna"], inplace=True)
    if "fill_method" in kwargs:
        midprice.fillna(method=kwargs["fill_method"], inplace=True)

    # 设置指标名称和分类
    midprice.name = f"MIDPRICE_{length}"
    midprice.category = "overlap"

    # 返回中间价指标结果
    return midprice

.\pandas-ta\pandas_ta\overlap\ohlc4.py

# -*- coding: utf-8 -*-
# 从 pandas_ta.utils 模块导入 get_offset 和 verify_series 函数
from pandas_ta.utils import get_offset, verify_series

# 定义一个名为 ohlc4 的函数,用于计算 OHLC4 指标
def ohlc4(open_, high, low, close, offset=None, **kwargs):
    """Indicator: OHLC4"""
    # 验证输入参数,确保它们都是 pandas Series 类型
    open_ = verify_series(open_)
    high = verify_series(high)
    low = verify_series(low)
    close = verify_series(close)
    # 获取位移值
    offset = get_offset(offset)

    # 计算 OHLC4 指标的值,使用开盘价、最高价、最低价和收盘价的均值
    ohlc4 = 0.25 * (open_ + high + low + close)

    # 如果存在位移值,则将 OHLC4 指标的值向前位移相应数量的周期
    if offset != 0:
        ohlc4 = ohlc4.shift(offset)

    # 设置 OHLC4 指标的名称和类别
    ohlc4.name = "OHLC4"
    ohlc4.category = "overlap"

    # 返回计算得到的 OHLC4 指标
    return ohlc4

.\pandas-ta\pandas_ta\overlap\pwma.py

# -*- coding: utf-8 -*-

# 从 pandas_ta.utils 模块中导入 get_offset, pascals_triangle, verify_series, weights 函数
from pandas_ta.utils import get_offset, pascals_triangle, verify_series, weights

# 定义函数:Pascals Weighted Moving Average (PWMA)
def pwma(close, length=None, asc=None, offset=None, **kwargs):
    """Indicator: Pascals Weighted Moving Average (PWMA)"""
    
    # 验证参数
    length = int(length) if length and length > 0 else 10
    asc = asc if asc else True
    close = verify_series(close, length)
    offset = get_offset(offset)

    if close is None: return

    # 计算结果
    triangle = pascals_triangle(n=length - 1, weighted=True)
    pwma = close.rolling(length, min_periods=length).apply(weights(triangle), raw=True)

    # 偏移
    if offset != 0:
        pwma = pwma.shift(offset)

    # 处理填充
    if "fillna" in kwargs:
        pwma.fillna(kwargs["fillna"], inplace=True)
    if "fill_method" in kwargs:
        pwma.fillna(method=kwargs["fill_method"], inplace=True)

    # 设置名称和类别
    pwma.name = f"PWMA_{length}"
    pwma.category = "overlap"

    return pwma


# 设置函数文档字符串
pwma.__doc__ = \
"""Pascal's Weighted Moving Average (PWMA)

Pascal's Weighted Moving Average is similar to a symmetric triangular window
except PWMA's weights are based on Pascal's Triangle.

Source: Kevin Johnson

Calculation:
    Default Inputs:
        length=10

    def weights(w):
        def _compute(x):
            return np.dot(w * x)
        return _compute

    triangle = utils.pascals_triangle(length + 1)
    PWMA = close.rolling(length)_.apply(weights(triangle), raw=True)

Args:
    close (pd.Series): Series of 'close's
    length (int): It's period.  Default: 10
    asc (bool): Recent values weigh more. Default: True
    offset (int): How many periods to offset the result. Default: 0

Kwargs:
    fillna (value, optional): pd.DataFrame.fillna(value)
    fill_method (value, optional): Type of fill method

Returns:
    pd.Series: New feature generated.
"""

.\pandas-ta\pandas_ta\overlap\rma.py

# -*- coding: utf-8 -*-
# 从 pandas_ta.utils 模块中导入 get_offset 和 verify_series 函数
from pandas_ta.utils import get_offset, verify_series

# 定义函数 rma,计算 wildeR 的移动平均值(RMA)
def rma(close, length=None, offset=None, **kwargs):
    """Indicator: wildeR's Moving Average (RMA)"""
    # 验证参数
    # 如果 length 存在且大于 0,则将其转换为整数,否则设为 10
    length = int(length) if length and length > 0 else 10
    # 计算 alpha 值,如果 length 大于 0 则为 1/length,否则为 0.5
    alpha = (1.0 / length) if length > 0 else 0.5
    # 验证 close 参数是否为有效序列,并指定长度为 length
    close = verify_series(close, length)
    # 获取偏移量
    offset = get_offset(offset)

    # 如果 close 为空,则返回空值
    if close is None: return

    # 计算结果
    # 使用指数加权移动平均(Exponential Moving Average,EMA)计算 RMA
    rma = close.ewm(alpha=alpha, min_periods=length).mean()

    # 偏移结果
    if offset != 0:
        rma = rma.shift(offset)

    # 处理填充值
    if "fillna" in kwargs:
        rma.fillna(kwargs["fillna"], inplace=True)
    if "fill_method" in kwargs:
        rma.fillna(method=kwargs["fill_method"], inplace=True)

    # 设置名称和类别
    rma.name = f"RMA_{length}"
    rma.category = "overlap"

    return rma


# 设置 rma 函数的文档字符串
rma.__doc__ = \
"""wildeR's Moving Average (RMA)

The WildeR's Moving Average is simply an Exponential Moving Average (EMA) with
a modified alpha = 1 / length.

Sources:
    https://tlc.thinkorswim.com/center/reference/Tech-Indicators/studies-library/V-Z/WildersSmoothing
    https://www.incrediblecharts.com/indicators/wilder_moving_average.php

Calculation:
    Default Inputs:
        length=10
    EMA = Exponential Moving Average
    alpha = 1 / length
    RMA = EMA(close, alpha=alpha)

Args:
    close (pd.Series): Series of 'close's
    length (int): It's period. Default: 10
    offset (int): How many periods to offset the result. Default: 0

Kwargs:
    fillna (value, optional): pd.DataFrame.fillna(value)
    fill_method (value, optional): Type of fill method

Returns:
    pd.Series: New feature generated.
"""

.\pandas-ta\pandas_ta\overlap\sinwma.py

# -*- coding: utf-8 -*-
# 导入必要的库
from numpy import pi as npPi  # 导入 numpy 库中的 pi 并重命名为 npPi
from numpy import sin as npSin  # 导入 numpy 库中的 sin 函数并重命名为 npSin
from pandas import Series  # 导入 pandas 库中的 Series 类
from pandas_ta.utils import get_offset, verify_series, weights  # 从 pandas_ta.utils 模块导入 get_offset, verify_series, weights 函数


def sinwma(close, length=None, offset=None, **kwargs):
    """Indicator: Sine Weighted Moving Average (SINWMA) by Everget of TradingView"""
    # Validate Arguments
    # 验证参数
    length = int(length) if length and length > 0 else 14  # 将 length 转换为整数,如果未提供或小于等于 0,则设置为默认值 14
    close = verify_series(close, length)  # 验证 close 参数是否为 Series 类型,并确保长度为 length
    offset = get_offset(offset)  # 获取偏移量

    if close is None: return

    # Calculate Result
    # 计算结果
    sines = Series([npSin((i + 1) * npPi / (length + 1)) for i in range(0, length)])  # 生成长度为 length 的正弦周期权重序列
    w = sines / sines.sum()  # 将权重序列标准化

    sinwma = close.rolling(length, min_periods=length).apply(weights(w), raw=True)  # 使用权重计算 SINWMA

    # Offset
    # 偏移结果
    if offset != 0:
        sinwma = sinwma.shift(offset)  # 将结果向前或向后偏移 offset 个周期

    # Handle fills
    # 处理填充
    if "fillna" in kwargs:
        sinwma.fillna(kwargs["fillna"], inplace=True)  # 使用指定的值填充缺失值
    if "fill_method" in kwargs:
        sinwma.fillna(method=kwargs["fill_method"], inplace=True)  # 使用指定的填充方法填充缺失值

    # Name & Category
    # 设置指标名称和类别
    sinwma.name = f"SINWMA_{length}"  # 设置指标名称
    sinwma.category = "overlap"  # 设置指标类别为 overlap

    return sinwma  # 返回计算结果


sinwma.__doc__ = \
"""Sine Weighted Moving Average (SWMA)

A weighted average using sine cycles. The middle term(s) of the average have the
highest weight(s).

Source:
    https://www.tradingview.com/script/6MWFvnPO-Sine-Weighted-Moving-Average/
    Author: Everget (https://www.tradingview.com/u/everget/)

Calculation:
    Default Inputs:
        length=10

    def weights(w):
        def _compute(x):
            return np.dot(w * x)
        return _compute

    sines = Series([sin((i + 1) * pi / (length + 1)) for i in range(0, length)])
    w = sines / sines.sum()
    SINWMA = close.rolling(length, min_periods=length).apply(weights(w), raw=True)

Args:
    close (pd.Series): Series of 'close's
    length (int): It's period. Default: 10
    offset (int): How many periods to offset the result. Default: 0

Kwargs:
    fillna (value, optional): pd.DataFrame.fillna(value)
    fill_method (value, optional): Type of fill method

Returns:
    pd.Series: New feature generated.
"""

标签:PandasTA,sum,kwargs,length,源码,offset,close,解析,pandas
From: https://www.cnblogs.com/apachecn/p/18135782

相关文章

  • PandasTA 源码解析(十一)
    .\pandas-ta\pandas_ta\overlap\wcp.py#-*-coding:utf-8-*-#从pandas_ta库中导入Imports模块frompandas_taimportImports#从pandas_ta.utils中导入get_offset和verify_series函数frompandas_ta.utilsimportget_offset,verify_series#定义函数wcp......
  • PandasTA 源码解析(十)
    .\pandas-ta\pandas_ta\overlap\sma.py#-*-coding:utf-8-*-#从pandas_ta库中导入Imports对象frompandas_taimportImports#从pandas_ta.utils模块中导入get_offset和verify_series函数frompandas_ta.utilsimportget_offset,verify_series#定义简单......
  • PandasTA 源码解析(十三)
    .\pandas-ta\pandas_ta\trend\decreasing.py#-*-coding:utf-8-*-#从pandas_ta.utils模块中导入所需函数和类frompandas_ta.utilsimportget_drift,get_offset,is_percent,verify_series#定义一个名为decreasing的函数,用于计算序列是否递减defdecreasing(clo......
  • PandasTA 源码解析(十二)
    .\pandas-ta\pandas_ta\statistics\stdev.py#-*-coding:utf-8-*-#从numpy导入sqrt函数,并将其命名为npsqrtfromnumpyimportsqrtasnpsqrt#从variance模块导入variance函数from.varianceimportvariance#从pandas_ta模块导入Imports类frompandas_......
  • PandasTA 源码解析(十四)
    .\pandas-ta\pandas_ta\trend\xsignals.py#-*-coding:utf-8-*-#从numpy中导入nan并重命名为npNaNfromnumpyimportnanasnpNaN#从pandas中导入DataFramefrompandasimportDataFrame#从当前包中导入tsignals模块from.tsignalsimporttsignals#从......
  • PandasTA 源码解析(十六)
    .\pandas-ta\pandas_ta\volatility\kc.py#-*-coding:utf-8-*-#从pandas库中导入DataFrame类frompandasimportDataFrame#从.true_range模块中导入true_range函数from.true_rangeimporttrue_range#从pandas_ta.overlap模块中导入ma函数frompandas_......
  • PandasTA 源码解析(十五)
    .\pandas-ta\pandas_ta\utils\_signals.py#-*-coding:utf-8-*-#导入DataFrame和Series类frompandasimportDataFrame,Series#导入自定义函数from._coreimportget_offset,verify_seriesfrom._mathimportzero#定义函数_above_below,用于比较两个Seri......
  • PandasTA 源码解析(十八)
    .\pandas-ta\pandas_ta\volume\pvol.py#-*-coding:utf-8-*-#导入所需的库和函数frompandas_ta.utilsimportget_offset,signed_series,verify_series#定义函数pvol,计算价格和成交量的乘积defpvol(close,volume,offset=None,**kwargs):"""Indicator:Pr......
  • PandasTA 源码解析(十七)
    .\pandas-ta\pandas_ta\volume\adosc.py#-*-coding:utf-8-*-#导入ad模块from.adimportad#从pandas_ta库中导入Imports模块frompandas_taimportImports#从pandas_ta.overlap模块中导入ema函数frompandas_ta.overlapimportema#从pandas_ta.util......
  • PandasTA 源码解析(二十)
    .\pandas-ta\tests\test_indicator_cycles.py#从config模块中导入error_analysis,sample_data,CORRELATION,CORRELATION_THRESHOLD,VERBOSE变量#从context模块中导入pandas_tafrom.configimporterror_analysis,sample_data,CORRELATION,CORRELATION_THRESHOLD,VER......