首页 > 编程语言 >PandasTA 源码解析(十四)

PandasTA 源码解析(十四)

时间:2024-04-15 13:57:31浏览次数:33  
标签:return PandasTA Series kwargs returns 源码 import close 解析

.\pandas-ta\pandas_ta\trend\xsignals.py

# -*- coding: utf-8 -*-
# 从 numpy 中导入 nan 并重命名为 npNaN
from numpy import nan as npNaN
# 从 pandas 中导入 DataFrame
from pandas import DataFrame
# 从当前包中导入 tsignals 模块
from .tsignals import tsignals
# 从 pandas_ta.utils._signals 中导入 cross_value 函数
from pandas_ta.utils._signals import cross_value
# 从 pandas_ta.utils 中导入 get_offset 和 verify_series 函数
from pandas_ta.utils import get_offset, verify_series

# 定义函数 xsignals,用于计算交叉信号
def xsignals(signal, xa, xb, above:bool=True, long:bool=True, asbool:bool=None, trend_reset:int=0, trade_offset:int=None, offset:int=None, **kwargs):
    """Indicator: Cross Signals"""
    # 验证参数
    signal = verify_series(signal)
    offset = get_offset(offset)

    # 计算结果
    if above:
        # 如果 above 为 True,计算 signal 与 xa 交叉的位置
        entries = cross_value(signal, xa)
        # 计算 signal 与 xb 交叉的位置,注意指定 above=False
        exits = -cross_value(signal, xb, above=False)
    else:
        # 如果 above 为 False,计算 signal 与 xa 交叉的位置,注意指定 above=False
        entries = cross_value(signal, xa, above=False)
        # 计算 signal 与 xb 交叉的位置
        exits = -cross_value(signal, xb)
    # 计算交叉信号
    trades = entries + exits

    # 修改交叉信号以填充趋势间的间隙
    trades.replace({0: npNaN}, inplace=True)
    trades.interpolate(method="pad", inplace=True)
    trades.fillna(0, inplace=True)

    # 将交叉信号转换为趋势
    trends = (trades > 0).astype(int)
    if not long:
        trends = 1 - trends

    # 构建传递给 tsignals 函数的关键字参数字典
    tskwargs = {
        "asbool":asbool,
        "trade_offset":trade_offset,
        "trend_reset":trend_reset,
        "offset":offset
    }
    # 调用 tsignals 函数计算趋势信号
    df = tsignals(trends, **tskwargs)

    # 处理偏移,由 tsignals 函数处理
    DataFrame({
        f"XS_LONG": df.TS_Trends,
        f"XS_SHORT": 1 - df.TS_Trends
    })

    # 处理填充
    if "fillna" in kwargs:
        df.fillna(kwargs["fillna"], inplace=True)
    if "fill_method" in kwargs:
        df.fillna(method=kwargs["fill_method"], inplace=True)

    # 设定名称和类别
    df.name = f"XS"
    df.category = "trend"

    return df

# 设定函数文档字符串
xsignals.__doc__ = \
"""Cross Signals (XSIGNALS)

Cross Signals returns Trend Signal (TSIGNALS) results for Signal Crossings. This
is useful for indicators like RSI, ZSCORE, et al where one wants trade Entries
and Exits (and Trends).

Cross Signals has two kinds of modes: above and long.

The first mode 'above', default True, xsignals determines if the signal first
crosses above 'xa' and then below 'xb'. If 'above' is False, xsignals determines
if the signal first crosses below 'xa' and then above 'xb'.

The second mode 'long', default True, passes the long trend result into
tsignals so it can determine the appropriate Entries and Exits. When 'long' is
False, it does the same but for the short side.

Example:
# These are two different outcomes and depends on the indicator and it's
# characteristics. Please check BOTH outcomes BEFORE making an Issue.
rsi = df.ta.rsi()
# Returns tsignal DataFrame when RSI crosses above 20 and then below 80
ta.xsignals(rsi, 20, 80, above=True)
# Returns tsignal DataFrame when RSI crosses below 20 and then above 80
ta.xsignals(rsi, 20, 80, above=False)

Source: Kevin Johnson

Calculation:
    Default Inputs:
        asbool=False, trend_reset=0, trade_offset=0, drift=1

    trades = trends.diff().shift(trade_offset).fillna(0).astype(int)
    entries = (trades > 0).astype(int)
    exits = (trades < 0).abs().astype(int)

Args:
"""
    # 定义一个布尔值,表示信号是在'xa'之上首次穿越,然后再穿越'xb',还是在'xa'之下首次穿越,然后再穿越'xb'
    above (bool): When the signal crosses above 'xa' first and then 'xb'. When
        False, then when the signal crosses below 'xa' first and then 'xb'.
        Default: True
    # 将长期趋势传递给tsignals的趋势参数。当为False时,将短期趋势传递给tsignals的趋势参数
    long (bool): Passes the long trend into tsignals' trend argument. When
        False, it passes the short trend into tsignals trend argument.
        Default: True
    # 差异期。默认值为1
    drift (int): The difference period. Default: 1
    # 结果的偏移量。默认值为0
    offset (int): How many periods to offset the result. Default: 0

    # TSIGNAL传递参数
    # 如果为True,则将Trends、Entries和Exits列转换为布尔值。当为布尔值时,也可用于使用vectorbt的Portfolio.from_signal(close, entries, exits)进行回测
    asbool (bool): If True, it converts the Trends, Entries and Exits columns to
        booleans. When boolean, it is also useful for backtesting with
        vectorbt's Portfolio.from_signal(close, entries, exits) Default: False
    # 用于识别趋势是否结束的值。默认值为0
    trend_reset (value): Value used to identify if a trend has ended. Default: 0
    # 用于移动交易进出的值。使用1进行回测,使用0进行实时交易。默认值为0
    trade_offset (value): Value used shift the trade entries/exits Use 1 for
        backtesting and 0 for live. Default: 0
# 函数参数说明,使用关键字参数传递给函数的参数列表
Kwargs:
    # fillna参数,用于填充缺失值的值,采用pd.DataFrame.fillna(value)方式
    fillna (value, optional): pd.DataFrame.fillna(value)
    # fill_method参数,填充缺失值的方法类型
    fill_method (value, optional): Type of fill method

# 返回值说明,返回一个pd.DataFrame对象,其包含以下列:
Returns:
    # Trends列,趋势(有趋势: 1,无趋势: 0)
    Trends (trend: 1, no trend: 0),
    # Trades列,交易(进入: 1,退出: -1,其他: 0)
    Trades (Enter: 1, Exit: -1, Otherwise: 0),
    # Entries列,入口(入口: 1,无: 0)
    Entries (entry: 1, nothing: 0),
    # Exits列,出口(出口: 1,无: 0)
    Exits (exit: 1, nothing: 0)

.\pandas-ta\pandas_ta\trend\__init__.py

# -*- coding: utf-8 -*-  
# 指定文件编码为 UTF-8,确保正确处理中文字符

# 导入各个指标模块
from .adx import adx  # 导入 adx 指标模块
from .amat import amat  # 导入 amat 指标模块
from .aroon import aroon  # 导入 aroon 指标模块
from .chop import chop  # 导入 chop 指标模块
from .cksp import cksp  # 导入 cksp 指标模块
from .decay import decay  # 导入 decay 指标模块
from .decreasing import decreasing  # 导入 decreasing 指标模块
from .dpo import dpo  # 导入 dpo 指标模块
from .increasing import increasing  # 导入 increasing 指标模块
from .long_run import long_run  # 导入 long_run 指标模块
from .psar import psar  # 导入 psar 指标模块
from .qstick import qstick  # 导入 qstick 指标模块
from .short_run import short_run  # 导入 short_run 指标模块
from .tsignals import tsignals  # 导入 tsignals 指标模块
from .ttm_trend import ttm_trend  # 导入 ttm_trend 指标模块
from .vhf import vhf  # 导入 vhf 指标模块
from .vortex import vortex  # 导入 vortex 指标模块
from .xsignals import xsignals  # 导入 xsignals 指标模块

.\pandas-ta\pandas_ta\utils\data\alphavantage.py

# -*- coding: utf-8 -*-
# 导入 DataFrame 类
from pandas import DataFrame
# 导入 Imports 对象,RATE 对象,version 对象
from pandas_ta import Imports, RATE, version

# 定义 av 函数,获取 alphaVantage 数据
def av(ticker: str, **kwargs):
    # 打印关键字参数 kwargs
    print(f"[!] kwargs: {kwargs}")
    # 从 kwargs 中弹出 verbose 参数,默认为 False
    verbose = kwargs.pop("verbose", False)
    # 从 kwargs 中弹出 kind 参数,默认为 "history"
    kind = kwargs.pop("kind", "history")
    # 将 kind 转换为小写
    kind = kind.lower()
    # 从 kwargs 中弹出 interval 参数,默认为 "D"
    interval = kwargs.pop("interval", "D")
    # 从 kwargs 中弹出 show 参数,默认为 None
    show = kwargs.pop("show", None)
    # 从 kwargs 中弹出 last 参数,但是没有使用到

    # 如果 ticker 不为空且是字符串类型,则将其转换为大写,否则为 None
    ticker = ticker.upper() if ticker is not None and isinstance(ticker, str) else None

    # 如果 alphaVantage-api 可用且 ticker 不为空
    if Imports["alphaVantage-api"] and ticker is not None:
        # 导入 alphaVantageAPI 模块并重命名为 AV
        import alphaVantageAPI as AV
        # 定义 AVC 字典,包含 API 密钥和其他参数
        AVC = {"api_key": "YOUR API KEY", "clean": True, "export": False, "output_size": "full", "premium": False}
        # 从 kwargs 中获取 av_kwargs 参数,如果不存在则使用 AVC
        _config = kwargs.pop("av_kwargs", AVC)
        # 创建 AlphaVantage 对象 av
        av = AV.AlphaVantage(**_config)

        # 从 kwargs 中获取 period 参数,默认为 av.output_size
        period = kwargs.pop("period", av.output_size)

        # 定义 _all 列表和 div 变量
        _all, div = ["all"], "=" * 53 # Max div width is 80

        # 如果 kind 在 _all 列表中或者 verbose 为真,则执行下面的代码
        if kind in _all or verbose: pass

        # 如果 kind 在 _all 列表或者 ["history", "h"] 列表中
        if kind in _all + ["history", "h"]:
            # 如果 verbose 为真
            if verbose:
                # 打印信息,显示 Pandas TA 版本和 alphaVantage-api
                print("\n====  Chart History       " + div + f"\n[*] Pandas TA v{version} & alphaVantage-api")
                # 打印下载信息,显示下载的股票信息和时间间隔
                print(f"[+] Downloading {ticker}[{interval}:{period}] from {av.API_NAME} (https://www.alphavantage.co/)")
            # 获取股票数据并保存到 df 变量中
            df = av.data(ticker, interval)
            # 设置 DataFrame 的名称为 ticker
            df.name = ticker
            # 如果 show 不为空且是正整数且大于 0
            if show is not None and isinstance(show, int) and show > 0:
                # 打印 DataFrame 最后几行数据
                print(f"\n{df.name}\n{df.tail(show)}\n")
            # 返回 DataFrame 对象
            return df

    # 如果上述条件都不满足,则返回一个空的 DataFrame 对象
    return DataFrame()

# `.\pandas-ta\pandas_ta\utils\data\yahoofinance.py`

```py
# -*- coding: utf-8 -*-
# 导入 DataFrame 类
from pandas import DataFrame
# 导入 Imports、RATE、version 变量
from pandas_ta import Imports, RATE, version
# 导入 _camelCase2Title 函数和 ytd 函数
from .._core import _camelCase2Title
from .._time import ytd

# 定义函数 yf,用于包装 yfinance
def yf(ticker: str, **kwargs):
    """yf - yfinance wrapper

    It retrieves market data (ohlcv) from Yahoo Finance using yfinance.
    To install yfinance. (pip install yfinance) This method can also pull
    additional data using the 'kind' kwarg. By default kind=None and retrieves
    Historical Chart Data.

    Other options of 'kind' include:
    * All: "all"
        - Prints everything below but only returns Chart History to Pandas TA
    * Company Information: "info"
    * Institutional Holders: "institutional_holders" or "ih"
    * Major Holders: "major_holders" or "mh"
    * Mutual Fund Holders: "mutualfund_holders" or "mfh"
    * Recommendations (YTD): "recommendations" or "rec"
    * Earnings Calendar: "calendar" or "cal"
    * Earnings: "earnings" or "earn"
    * Sustainability/ESG Scores: "sustainability", "sus" or "esg"
    * Financials: "financials" or "fin"
        - Returns in order: Income Statement, Balance Sheet and Cash Flow
    * Option Chain: "option_chain" or "oc"
        - Uses the nearest expiration date by default
        - Change the expiration date using kwarg "exp"
        - Show ITM options, set kwarg "itm" to True. Or OTM options, set
        kwarg "itm" to False.
    * Chart History:
        - The only data returned to Pandas TA.

    Args:
        ticker (str): Any string for a ticker you would use with yfinance.
            Default: "SPY"
    Kwargs:
        calls (bool): When True, prints only Option Calls for the Option Chain.
            Default: None
        desc (bool): Will print Company Description when printing Company
            Information. Default: False
        exp (str): Used to print other Option Chains for the given Expiration
            Date. Default: Nearest Expiration Date for the Option Chains
        interval (str): A yfinance argument. Default: "1d"
        itm (bool): When printing Option Chains, shows ITM Options when True.
            When False, it shows OTM Options: Default: None
        kind (str): Options see above. Default: None
        period (str): A yfinance argument. Default: "max"
        proxy (dict): Proxy for yfinance to use. Default: {}
        puts (bool): When True, prints only Option Puts for the Option Chain.
            Default: None
        show (int > 0): How many last rows of Chart History to show.
            Default: None
        snd (int): How many recent Splits and Dividends to show in Company
            Information. Default: 5
        verbose (bool): Prints Company Information "info" and a Chart History
            header to the screen. Default: False

    Returns:
        Exits if the DataFrame is empty or None
        Otherwise it returns a DataFrame of the Chart History
    """
    # 从 kwargs 中获取 verbose 参数,默认为 False
    verbose = kwargs.pop("verbose", False)
    # 如果 ticker 不为空且为字符串类型且长度大于0,则将 ticker 转换为大写
    if ticker is not None and isinstance(ticker, str) and len(ticker):
        ticker = ticker.upper()
    else:
        # 如果 ticker 为空或不是字符串类型或长度为0,则将 ticker 设置为 "SPY"
        ticker = "SPY"

    # 从 kwargs 中弹出 "kind" 键对应的值,如果不存在则为 None
    kind = kwargs.pop("kind", None)
    # 如果 kind 不为空且为字符串类型且长度大于0,则将 kind 转换为小写
    if kind is not None and isinstance(kind, str) and len(kind):
        kind = kind.lower()

    # 从 kwargs 中弹出 "period" 键对应的值,如果不存在则为 "max"
    period = kwargs.pop("period", "max")
    # 从 kwargs 中弹出 "interval" 键对应的值,如果不存在则为 "1d"
    interval = kwargs.pop("interval", "1d")
    # 从 kwargs 中弹出 "proxy" 键对应的值,如果不存在则为一个空字典
    proxy = kwargs.pop("proxy", {})
    # 从 kwargs 中弹出 "show" 键对应的值,如果不存在则为 None
    show = kwargs.pop("show", None)

    # 如果 Imports 中没有 yfinance 模块,则打印提示信息并返回
    if not Imports["yfinance"]:
        print(f"[X] Please install yfinance to use this method. (pip install yfinance)")
        return
    else:
        # 如果有 yfinance 模块,则返回一个空的 DataFrame 对象
        return DataFrame()

.\pandas-ta\pandas_ta\utils\data\__init__.py

# 设置文件编码为 UTF-8,以支持包含非 ASCII 字符的内容
# 导入自定义模块中的 alphavantage 和 yahoofinance 子模块
from .alphavantage import av
from .yahoofinance import yf

.\pandas-ta\pandas_ta\utils\_candles.py

# -*- coding: utf-8 -*-
# 导入 Series 类
from pandas import Series
# 导入 non_zero_range 函数
from ._core import non_zero_range

# 计算蜡烛图的颜色
def candle_color(open_: Series, close: Series) -> Series:
    # 复制收盘价 Series,并将其类型转换为整数
    color = close.copy().astype(int)
    # 当收盘价大于等于开盘价时,将颜色设置为1
    color[close >= open_] = 1
    # 当收盘价小于开盘价时,将颜色设置为-1
    color[close < open_] = -1
    # 返回颜色 Series
    return color

# 计算最高价和最低价的范围
def high_low_range(high: Series, low: Series) -> Series:
    # 调用 non_zero_range 函数计算高低价的范围
    return non_zero_range(high, low)

# 计算实体部分(实体部分指收盘价与开盘价之间的绝对值)
def real_body(open_: Series, close: Series) -> Series:
    # 调用 non_zero_range 函数计算实体部分
    return non_zero_range(close, open_)

.\pandas-ta\pandas_ta\utils\_core.py

# 设置文件编码为 utf-8
# 导入 re 模块并重命名为 re_
# 从 pathlib 模块中导入 Path 类
# 从 sys 模块中导入 float_info 并重命名为 sflt
# 从 numpy 模块中导入 argmax 和 argmin 函数
# 从 pandas 模块中导入 DataFrame 和 Series 类
# 从 pandas 模块中导入 is_datetime64_any_dtype 函数
# 从 pandas_ta 模块中导入 Imports

def _camelCase2Title(x: str):
    """将驼峰命名转换为标题格式"""
    return re_.sub("([a-z])([A-Z])","\g<1> \g<2>", x).title()

def category_files(category: str) -> list:
    """返回类别目录中所有文件名的帮助函数"""
    files = [
        x.stem
        for x in list(Path(f"pandas_ta/{category}/").glob("*.py"))
        if x.stem != "__init__"
    ]
    return files

def get_drift(x: int) -> int:
    """如果不为零,则返回一个整数,否则默认为一"""
    return int(x) if isinstance(x, int) and x != 0 else 1

def get_offset(x: int) -> int:
    """返回一个整数,否则默认为零"""
    return int(x) if isinstance(x, int) else 0

def is_datetime_ordered(df: DataFrame or Series) -> bool:
    """如果索引是日期时间且有序,则返回 True"""
    index_is_datetime = is_datetime64_any_dtype(df.index)
    try:
        ordered = df.index[0] < df.index[-1]
    except RuntimeWarning:
        pass
    finally:
        return True if index_is_datetime and ordered else False

def is_percent(x: int or float) -> bool:
    """检查是否为百分比"""
    if isinstance(x, (int, float)):
        return x is not None and x >= 0 and x <= 100
    return False

def non_zero_range(high: Series, low: Series) -> Series:
    """返回两个序列的差异,并对任何零值添加 epsilon。在加密数据中常见情况是 'high' = 'low'。"""
    diff = high - low
    if diff.eq(0).any().any():
        diff += sflt.epsilon
    return diff

def recent_maximum_index(x):
    """返回最近最大值的索引"""
    return int(argmax(x[::-1]))

def recent_minimum_index(x):
    """返回最近最小值的索引"""
    return int(argmin(x[::-1]))

def signed_series(series: Series, initial: int = None) -> Series:
    """返回带有或不带有初始值的有符号序列"""
    series = verify_series(series)
    sign = series.diff(1)
    sign[sign > 0] = 1
    sign[sign < 0] = -1
    sign.iloc[0] = initial
    return sign

def tal_ma(name: str) -> int:
    """返回 TA Lib 的 MA 类型的枚举值"""
    # 检查是否导入了 talib 模块,并且 name 是否是字符串类型,并且长度大于1
    if Imports["talib"] and isinstance(name, str) and len(name) > 1:
        # 如果满足条件,从 talib 模块导入 MA_Type
        from talib import MA_Type
        # 将 name 转换为小写
        name = name.lower()
        # 根据 name 的不同取值返回对应的 MA_Type 枚举值
        if   name == "sma":   return MA_Type.SMA   # 0
        elif name == "ema":   return MA_Type.EMA   # 1
        elif name == "wma":   return MA_Type.WMA   # 2
        elif name == "dema":  return MA_Type.DEMA  # 3
        elif name == "tema":  return MA_Type.TEMA  # 4
        elif name == "trima": return MA_Type.TRIMA # 5
        elif name == "kama":  return MA_Type.KAMA  # 6
        elif name == "mama":  return MA_Type.MAMA  # 7
        elif name == "t3":    return MA_Type.T3    # 8
    # 如果不满足条件,返回默认值 0,代表 SMA
    return 0 # Default: SMA -> 0
# 定义一个函数,计算给定 Series 的无符号差值
def unsigned_differences(series: Series, amount: int = None, **kwargs) -> Series:
    """Unsigned Differences
    返回两个 Series,一个是原始 Series 的无符号正差值,另一个是无符号负差值。
    正差值 Series 仅包含增加值,负差值 Series 仅包含减少值。

    默认示例:
    series   = Series([3, 2, 2, 1, 1, 5, 6, 6, 7, 5, 3]) 返回
    positive  = Series([0, 0, 0, 0, 0, 1, 1, 0, 1, 0, 0])
    negative = Series([0, 1, 0, 1, 0, 0, 0, 0, 0, 1, 1])
    """
    # 如果未提供 amount 参数,则默认为 1
    amount = int(amount) if amount is not None else 1
    # 计算 Series 的负差值
    negative = series.diff(amount)
    # 将 NaN 值填充为 0
    negative.fillna(0, inplace=True)
    # 复制负差值 Series 以备后用
    positive = negative.copy()

    # 将正差值 Series 中小于等于 0 的值设置为 0
    positive[positive <= 0] = 0
    # 将正差值 Series 中大于 0 的值设置为 1
    positive[positive > 0] = 1

    # 将负差值 Series 中大于等于 0 的值设置为 0
    negative[negative >= 0] = 0
    # 将负差值 Series 中小于 0 的值设置为 1
    negative[negative < 0] = 1

    # 如果 kwargs 中包含 asint 参数且值为 True,则将 Series 转换为整数类型
    if kwargs.pop("asint", False):
        positive = positive.astype(int)
        negative = negative.astype(int)

    # 返回正差值 Series 和负差值 Series
    return positive, negative


# 定义一个函数,验证给定的 Series 是否满足指示器的最小长度要求
def verify_series(series: Series, min_length: int = None) -> Series:
    """If a Pandas Series and it meets the min_length of the indicator return it."""
    # 判断是否指定了最小长度,并且最小长度是整数类型
    has_length = min_length is not None and isinstance(min_length, int)
    # 如果给定的 series 不为空且是 Pandas Series 类型
    if series is not None and isinstance(series, Series):
        # 如果指定了最小长度,并且 series 的大小小于最小长度,则返回 None,否则返回 series
        return None if has_length and series.size < min_length else series

.\pandas-ta\pandas_ta\utils\_math.py

# 设置文件编码为 UTF-8
# 导入 functools 模块中的 reduce 函数
# 从 math 模块中导入 floor 函数并将其命名为 mfloor
# 从 operator 模块中导入 mul 函数
# 从 sys 模块中导入 float_info 对象并将其命名为 sflt
# 从 typing 模块中导入 List、Optional 和 Tuple 类型
from functools import reduce
from math import floor as mfloor
from operator import mul
from sys import float_info as sflt
from typing import List, Optional, Tuple

# 从 numpy 模块中导入 ones、triu、all、append、array、corrcoef、dot、fabs、exp、log、nan、ndarray、seterr、sqrt 和 sum 函数
from numpy import ones, triu
from numpy import all as npAll
from numpy import append as npAppend
from numpy import array as npArray
from numpy import corrcoef as npCorrcoef
from numpy import dot as npDot
from numpy import fabs as npFabs
from numpy import exp as npExp
from numpy import log as npLog
from numpy import nan as npNaN
from numpy import ndarray as npNdArray
from numpy import seterr
from numpy import sqrt as npSqrt
from numpy import sum as npSum

# 从 pandas 模块中导入 DataFrame 和 Series 类
from pandas import DataFrame, Series

# 从 pandas_ta 包中导入 Imports 模块
from pandas_ta import Imports
# 从 ._core 模块中导入 verify_series 函数
from ._core import verify_series

# 定义 combination 函数,接收任意关键字参数并返回整型值
def combination(**kwargs: dict) -> int:
    """https://stackoverflow.com/questions/4941753/is-there-a-math-ncr-function-in-python"""
    # 从 kwargs 中取出键为 "n" 的值,若不存在则默认为 1,并转换为整型
    n = int(npFabs(kwargs.pop("n", 1)))
    # 从 kwargs 中取出键为 "r" 的值,若不存在则默认为 0,并转换为整型
    r = int(npFabs(kwargs.pop("r", 0)))

    # 如果参数中存在 "repetition" 或 "multichoose" 键,则执行以下操作
    if kwargs.pop("repetition", False) or kwargs.pop("multichoose", False):
        # 计算修正后的 n 值
        n = n + r - 1

    # 若 r 小于 0,则返回 None
    # 如果 r 大于 n,则令 r 等于 n
    r = min(n, n - r)
    # 若 r 为 0,则返回 1
    if r == 0:
        return 1

    # 计算组合数的分子部分
    numerator = reduce(mul, range(n, n - r, -1), 1)
    # 计算组合数的分母部分
    denominator = reduce(mul, range(1, r + 1), 1)
    # 返回组合数结果
    return numerator // denominator


# 定义错误函数 erf(x),接收一个参数 x,返回 erf(x) 的值
def erf(x):
    """Error Function erf(x)
    The algorithm comes from Handbook of Mathematical Functions, formula 7.1.26.
    Source: https://stackoverflow.com/questions/457408/is-there-an-easily-available-implementation-of-erf-for-python
    """
    # 保存 x 的符号
    sign = 1 if x >= 0 else -1
    x = abs(x)

    # 定义常数
    a1 =  0.254829592
    a2 = -0.284496736
    a3 =  1.421413741
    a4 = -1.453152027
    a5 =  1.061405429
    p  =  0.3275911

    # 使用 A&S 公式 7.1.26 计算 erf(x) 的值
    t = 1.0 / (1.0 + p * x)
    y = 1.0 - (((((a5 * t + a4) * t) + a3) * t + a2) * t + a1) * t * npExp(-x * x)
    # 返回 erf(x) 的值,若 x 为负数,则取相反数
    return sign * y # erf(-x) = -erf(x)


# 定义斐波那契数列函数 fibonacci,接收一个整型参数 n 和任意关键字参数,返回一个 numpy 数组
def fibonacci(n: int = 2, **kwargs: dict) -> npNdArray:
    """Fibonacci Sequence as a numpy array"""
    # 将 n 转换为非负整数
    n = int(npFabs(n)) if n >= 0 else 2

    # 从 kwargs 中取出键为 "zero" 的值,若存在且为 True,则斐波那契数列以 0 开头
    zero = kwargs.pop("zero", False)
    if zero:
        a, b = 0, 1
    else:
        # 若不以 0 开头,则斐波那契数列从 1 开始,n 减 1
        n -= 1
        a, b = 1, 1

    # 初始化结果数组,包含斐波那契数列的第一个元素
    result = npArray([a])
    # 循环生成斐波那契数列
    for _ in range(0, n):
        a, b = b, a + b
        result = npAppend(result, a)

    # 从 kwargs 中取出键为 "weighted" 的值,若存在且为 True,则返回加权后的斐波那契数列
    weighted = kwargs.pop("weighted", False)
    if weighted:
        # 计算斐波那契数列的总和
        fib_sum = npSum(result)
        # 若总和大于 0,则返回斐波那契数列的每个元素除以总和的结果
        if fib_sum > 0:
            return result / fib_sum
        else:
            # 若总和小于等于 0,则直接返回斐波那契数列
            return result
    else:
        # 若不加权,则直接返回斐波那契数
    """Classic Linear Regression in Numpy or Scikit-Learn"""
    # 确保 x 和 y 是 Series 类型的数据
    x, y = verify_series(x), verify_series(y)
    # 获取 x 和 y 的大小
    m, n = x.size, y.size

    # 如果 x 和 y 的大小不相等,则打印错误信息并返回空字典
    if m != n:
        print(f"[X] Linear Regression X and y have unequal total observations: {m} != {n}")
        return {}

    # 如果导入了 sklearn 模块,则使用 sklearn 进行线性回归
    if Imports["sklearn"]:
        return _linear_regression_sklearn(x, y)
    # 否则使用 numpy 进行线性回归
    else:
        return _linear_regression_np(x, y)
# 返回给定序列的对数几何平均值
def log_geometric_mean(series: Series) -> float:
    n = series.size  # 获取序列的大小
    if n < 2: return 0  # 如果序列大小小于2,则返回0
    else:
        series = series.fillna(0) + 1  # 将序列中的空值填充为0,并加1
        if npAll(series > 0):  # 检查序列中的所有值是否大于0
            return npExp(npLog(series).sum() / n) - 1  # 计算序列的对数和的均值的指数,然后减去1
        return 0  # 如果序列中存在小于等于0的值,则返回0


# 返回帕斯卡三角形的第n行
def pascals_triangle(n: int = None, **kwargs: dict) -> npNdArray:
    n = int(npFabs(n)) if n is not None else 0  # 将n转换为整数并取绝对值,如果n为None则设为0

    # 计算
    triangle = npArray([combination(n=n, r=i) for i in range(0, n + 1)])  # 创建帕斯卡三角形的第n行
    triangle_sum = npSum(triangle)  # 计算三角形的总和
    triangle_weights = triangle / triangle_sum  # 计算每个元素的权重
    inverse_weights = 1 - triangle_weights  # 计算逆权重

    weighted = kwargs.pop("weighted", False)  # 获取weighted参数,默认为False
    inverse = kwargs.pop("inverse", False)  # 获取inverse参数,默认为False
    if weighted and inverse:  # 如果weighted和inverse都为True
        return inverse_weights  # 返回逆权重
    if weighted:  # 如果weighted为True
        return triangle_weights  # 返回权重
    if inverse:  # 如果inverse为True
        return None  # 返回None

    return triangle  # 返回帕斯卡三角形的第n行


# 返回对称三角形的第n行
def symmetric_triangle(n: int = None, **kwargs: dict) -> Optional[List[int]]:
    n = int(npFabs(n)) if n is not None else 2  # 将n转换为整数并取绝对值,如果n为None则设为2

    triangle = None
    if n == 2:  # 如果n为2
        triangle = [1, 1]  # 返回固定的列表

    if n > 2:  # 如果n大于2
        if n % 2 == 0:  # 如果n为偶数
            front = [i + 1 for i in range(0, mfloor(n / 2))]  # 创建前半部分列表
            triangle = front + front[::-1]  # 创建对称三角形
        else:
            front = [i + 1 for i in range(0, mfloor(0.5 * (n + 1)))]  # 创建前半部分列表
            triangle = front.copy()  # 复制前半部分列表
            front.pop()  # 移除最后一个元素
            triangle += front[::-1]  # 创建对称三角形

    if kwargs.pop("weighted", False) and isinstance(triangle, list):  # 如果weighted为True且triangle是列表类型
        triangle_sum = npSum(triangle)  # 计算三角形的总和
        triangle_weights = triangle / triangle_sum  # 计算每个元素的权重
        return triangle_weights  # 返回权重

    return triangle  # 返回对称三角形的第n行


# 返回权重与值x的点积
def weights(w: npNdArray):
    def _dot(x):
        return npDot(w, x)
    return _dot


# 如果值接近于零,则返回零,否则返回自身
def zero(x: Tuple[int, float]) -> Tuple[int, float]:
    return 0 if abs(x) < sflt.epsilon else x


# DataFrame相关性分析辅助函数
def df_error_analysis(dfA: DataFrame, dfB: DataFrame, **kwargs: dict) -> DataFrame:
    corr_method = kwargs.pop("corr_method", "pearson")  # 获取相关性计算方法,默认为pearson

    # 计算它们的差异和相关性
    diff = dfA - dfB  # 计算DataFrame的差异
    corr = dfA.corr(dfB, method=corr_method)  # 计算DataFrame的相关性

    # 用于绘图
    if kwargs.pop("plot", False):  # 如果plot为True
        diff.hist()  # 绘制差异的直方图
        if diff[diff > 0].any():  # 如果差异中存在大于0的值
            diff.plot(kind="kde")  # 绘制密度曲线图

    if kwargs.pop("triangular", False):  # 如果triangular为True
        return corr.where(triu(ones(corr.shape)).astype(bool))  # 返回上三角部分的相关性矩阵

    return corr  # 返回相关性矩阵


# 私有函数
# 使用 Numpy 实现简单的线性回归,适用于没有安装 sklearn 包的环境,接受两个一维数组作为输入
def _linear_regression_np(x: Series, y: Series) -> dict:
    # 初始化结果字典,所有值设为 NaN
    result = {"a": npNaN, "b": npNaN, "r": npNaN, "t": npNaN, "line": npNaN}
    # 计算 x 和 y 的总和
    x_sum = x.sum()
    y_sum = y.sum()

    # 如果 x 的总和不为 0
    if int(x_sum) != 0:
        # 计算 x 和 y 之间的相关系数
        r = npCorrcoef(x, y)[0, 1]

        m = x.size
        # 计算回归系数 b
        r_mix = m * (x * y).sum() - x_sum * y_sum
        b = r_mix // (m * (x * x).sum() - x_sum * x_sum)
        # 计算截距 a 和回归线
        a = y.mean() - b * x.mean()
        line = a + b * x

        # 临时保存 Numpy 的错误设置
        _np_err = seterr()
        # 忽略除零和无效值的错误
        seterr(divide="ignore", invalid="ignore")
        # 更新结果字典
        result = {
            "a": a, "b": b, "r": r,
            "t": r / npSqrt((1 - r * r) / (m - 2)),
            "line": line,
        }
        # 恢复 Numpy 的错误设置
        seterr(divide=_np_err["divide"], invalid=_np_err["invalid"])

    return result

# 使用 Scikit Learn 实现简单的线性回归,适用于安装了 sklearn 包的环境,接受两个一维数组作为输入
def _linear_regression_sklearn(x: Series, y: Series) -> dict:
    # 导入 LinearRegression 类
    from sklearn.linear_model import LinearRegression

    # 将 x 转换为 DataFrame,创建 LinearRegression 模型并拟合数据
    X = DataFrame(x)
    lr = LinearRegression().fit(X, y=y)
    # 计算决定系数
    r = lr.score(X, y=y)
    # 获取截距和斜率
    a, b = lr.intercept_, lr.coef_[0]

    # 更新结果字典
    result = {
        "a": a, "b": b, "r": r,
        "t": r / npSqrt((1 - r * r) / (x.size - 2)),
        "line": a + b * x
    }
    return result

.\pandas-ta\pandas_ta\utils\_metrics.py

# -*- coding: utf-8 -*-
# 引入需要的类型提示模块
from typing import Tuple

# 引入 numpy 库的 log、nan、sqrt 函数
from numpy import log as npLog
from numpy import nan as npNaN
from numpy import sqrt as npSqrt

# 引入 pandas 库的 Series、Timedelta 类
from pandas import Series, Timedelta

# 引入自定义的核心模块中的 verify_series 函数
from ._core import verify_series

# 引入自定义的时间模块中的 total_time 函数
from ._time import total_time

# 引入自定义的数学模块中的 linear_regression、log_geometric_mean 函数
from ._math import linear_regression, log_geometric_mean

# 引入 pandas_ta 库中的 RATE 常量
from pandas_ta import RATE

# 引入 pandas_ta 库中的性能模块中的 drawdown、log_return、percent_return 函数
from pandas_ta.performance import drawdown, log_return, percent_return


def cagr(close: Series) -> float:
    """复合年增长率

    Args:
        close (pd.Series): 'close' 的序列

    >>> result = ta.cagr(df.close)
    """
    # 确保 close 是有效的 Series
    close = verify_series(close)
    # 获取序列的起始和结束值
    start, end = close.iloc[0], close.iloc[-1]
    # 计算并返回复合年增长率
    return ((end / start) ** (1 / total_time(close))) - 1


def calmar_ratio(close: Series, method: str = "percent", years: int = 3) -> float:
    """Calmar 比率通常是在过去三年内的最大回撤率的百分比。

    Args:
        close (pd.Series): 'close' 的序列
        method (str): 最大回撤计算选项:'dollar'、'percent'、'log'。默认值:'dollar'
        years (int): 使用的年数。默认值:3

    >>> result = ta.calmar_ratio(close, method="percent", years=3)
    """
    if years <= 0:
        # 如果年数参数小于等于 0,则打印错误消息并返回
        print(f"[!] calmar_ratio 'years' 参数必须大于零。")
        return
    # 确保 close 是有效的 Series
    close = verify_series(close)

    # 获取指定年数前的日期
    n_years_ago = close.index[-1] - Timedelta(days=365.25 * years)
    # 从指定日期开始截取序列
    close = close[close.index > n_years_ago]

    # 计算并返回 Calmar 比率
    return cagr(close) / max_drawdown(close, method=method)


def downside_deviation(returns: Series, benchmark_rate: float = 0.0, tf: str = "years") -> float:
    """Sortino 比率的下行偏差。假定基准利率是年化的。根据数据中每年的期数进行调整。

    Args:
        returns (pd.Series): 'returns' 的序列
        benchmark_rate (float): 要使用的基准利率。默认值:0.0
        tf (str): 时间范围选项:'days'、'weeks'、'months'、'years'。默认值:'years'

    >>> result = ta.downside_deviation(returns, benchmark_rate=0.0, tf="years")
    """
    # 用于去年化基准利率和年化结果的天数
    # 确保 returns 是有效的 Series
    returns = verify_series(returns)
    days_per_year = returns.shape[0] / total_time(returns, tf)

    # 调整后的基准利率
    adjusted_benchmark_rate = ((1 + benchmark_rate) ** (1 / days_per_year)) - 1

    # 计算下行偏差
    downside = adjusted_benchmark_rate - returns
    downside_sum_of_squares = (downside[downside > 0] ** 2).sum()
    downside_deviation = npSqrt(downside_sum_of_squares / (returns.shape[0] - 1))
    return downside_deviation * npSqrt(days_per_year)


def jensens_alpha(returns: Series, benchmark_returns: Series) -> float:
    """一系列与基准的 Jensen's 'Alpha'。

    Args:
        returns (pd.Series): 'returns' 的序列
        benchmark_returns (pd.Series): 'benchmark_returns' 的序列

    >>> result = ta.jensens_alpha(returns, benchmark_returns)
    """
    # 确保 returns 是有效的 Series
    returns = verify_series(returns)
    # 确保 benchmark_returns 是一个 Series 对象,并返回一个验证过的 Series 对象
    benchmark_returns = verify_series(benchmark_returns)
    
    # 对 benchmark_returns 进行插值处理,使得其中的缺失值被填充,原地修改(不创建新对象)
    benchmark_returns.interpolate(inplace=True)
    
    # 对 benchmark_returns 和 returns 进行线性回归分析,并返回其中的斜率参数(截距参数不返回)
    return linear_regression(benchmark_returns, returns)["a"]
def log_max_drawdown(close: Series) -> float:
    """Calculate the logarithmic maximum drawdown of a series.

    Args:
        close (pd.Series): Series of 'close' prices.

    >>> result = ta.log_max_drawdown(close)
    """
    # Ensure 'close' series is valid
    close = verify_series(close)
    # Calculate the log return from the beginning to the end of the series
    log_return = npLog(close.iloc[-1]) - npLog(close.iloc[0])
    # Return the log return minus the maximum drawdown of the series
    return log_return - max_drawdown(close, method="log")


def max_drawdown(close: Series, method:str = None, all:bool = False) -> float:
    """Calculate the maximum drawdown from a series of closing prices.

    Args:
        close (pd.Series): Series of 'close' prices.
        method (str): Options for calculating max drawdown: 'dollar', 'percent', 'log'.
            Default: 'dollar'.
        all (bool): If True, return all three methods as a dictionary.
            Default: False.

    >>> result = ta.max_drawdown(close, method="dollar", all=False)
    """
    # Ensure 'close' series is valid
    close = verify_series(close)
    # Calculate the maximum drawdown using the drawdown function
    max_dd = drawdown(close).max()

    # Dictionary containing maximum drawdown values for different methods
    max_dd_ = {
        "dollar": max_dd.iloc[0],
        "percent": max_dd.iloc[1],
        "log": max_dd.iloc[2]
    }
    # If 'all' is True, return all methods as a dictionary
    if all: return max_dd_

    # If 'method' is specified and valid, return the corresponding value
    if isinstance(method, str) and method in max_dd_.keys():
        return max_dd_[method]
    # Default to dollar method if 'method' is not specified or invalid
    return max_dd_["dollar"]


def optimal_leverage(
        close: Series, benchmark_rate: float = 0.0,
        period: Tuple[float, int] = RATE["TRADING_DAYS_PER_YEAR"],
        log: bool = False, capital: float = 1., **kwargs
    ) -> float:
    """Calculate the optimal leverage of a series. WARNING: Incomplete. Do NOT use.

    Args:
        close (pd.Series): Series of 'close' prices.
        benchmark_rate (float): Benchmark Rate to use. Default: 0.0.
        period (int, float): Period to use to calculate Mean Annual Return and
            Annual Standard Deviation.
            Default: None or the default sharpe_ratio.period().
        log (bool): If True, calculates log_return. Otherwise, it returns
            percent_return. Default: False.

    >>> result = ta.optimal_leverage(close, benchmark_rate=0.0, log=False)
    """
    # Ensure 'close' series is valid
    close = verify_series(close)

    # Check if use_cagr is specified
    use_cagr = kwargs.pop("use_cagr", False)
    # Calculate returns based on whether log or percent return is specified
    returns = percent_return(close=close) if not log else log_return(close=close)

    # Calculate period mean and standard deviation
    period_mu = period * returns.mean()
    period_std = npSqrt(period) * returns.std()

    # Calculate mean excess return and optimal leverage
    mean_excess_return = period_mu - benchmark_rate
    opt_leverage = (period_std ** -2) * mean_excess_return

    # Calculate the amount based on capital and optimal leverage
    amount = int(capital * opt_leverage)
    return amount


def pure_profit_score(close: Series) -> Tuple[float, int]:
    """Calculate the pure profit score of a series.

    Args:
        close (pd.Series): Series of 'close' prices.

    >>> result = ta.pure_profit_score(df.close)
    """
    # Ensure 'close' series is valid
    close = verify_series(close)
    # Create a series of zeros with the same index as 'close'
    close_index = Series(0, index=close.reset_index().index)

    # Calculate the linear regression 'r' value
    r = linear_regression(close_index, close)["r"]
    # If 'r' value is not NaN, return 'r' multiplied by CAGR of 'close'
    if r is not npNaN:
        return r * cagr(close)
    # Otherwise, return 0
    return 0
# 计算夏普比率的函数
def sharpe_ratio(close: Series, benchmark_rate: float = 0.0, log: bool = False, use_cagr: bool = False, period: int = RATE["TRADING_DAYS_PER_YEAR"]) -> float:
    """Sharpe Ratio of a series.

    Args:
        close (pd.Series): Series of 'close's
        benchmark_rate (float): Benchmark Rate to use. Default: 0.0
        log (bool): If True, calculates log_return. Otherwise it returns
            percent_return. Default: False
        use_cagr (bool): Use cagr - benchmark_rate instead. Default: False
        period (int, float): Period to use to calculate Mean Annual Return and
            Annual Standard Deviation.
            Default: RATE["TRADING_DAYS_PER_YEAR"] (currently 252)

    >>> result = ta.sharpe_ratio(close, benchmark_rate=0.0, log=False)
    """
    # 验证输入的数据是否为Series类型
    close = verify_series(close)
    # 根据log参数选择计算百分比收益率或对数收益率
    returns = percent_return(close=close) if not log else log_return(close=close)

    # 如果使用cagr参数,则返回年复合增长率与波动率的比率
    if use_cagr:
        return cagr(close) / volatility(close, returns, log=log)
    else:
        # 计算期望收益率和标准差
        period_mu = period * returns.mean()
        period_std = npSqrt(period) * returns.std()
        return (period_mu - benchmark_rate) / period_std


# 计算Sortino比率的函数
def sortino_ratio(close: Series, benchmark_rate: float = 0.0, log: bool = False) -> float:
    """Sortino Ratio of a series.

    Args:
        close (pd.Series): Series of 'close's
        benchmark_rate (float): Benchmark Rate to use. Default: 0.0
        log (bool): If True, calculates log_return. Otherwise it returns
            percent_return. Default: False

    >>> result = ta.sortino_ratio(close, benchmark_rate=0.0, log=False)
    """
    # 验证输入的数据是否为Series类型
    close = verify_series(close)
    # 根据log参数选择计算百分比收益率或对数收益率
    returns = percent_return(close=close) if not log else log_return(close=close)

    # 计算Sortino比率
    result  = cagr(close) - benchmark_rate
    result /= downside_deviation(returns)
    return result


# 计算波动率的函数
def volatility(close: Series, tf: str = "years", returns: bool = False, log: bool = False, **kwargs) -> float:
    """Volatility of a series. Default: 'years'

    Args:
        close (pd.Series): Series of 'close's
        tf (str): Time Frame options: 'days', 'weeks', 'months', and 'years'.
            Default: 'years'
        returns (bool): If True, then it replace the close Series with the user
            defined Series; typically user generated returns or percent returns
            or log returns. Default: False
        log (bool): If True, calculates log_return. Otherwise it calculates
            percent_return. Default: False

    >>> result = ta.volatility(close, tf="years", returns=False, log=False, **kwargs)
    """
    # 验证输入的数据是否为Series类型
    close = verify_series(close)

    # 如果returns参数为False,则计算百分比收益率或对数收益率
    if not returns:
        returns = percent_return(close=close) if not log else log_return(close=close)
    else:
        returns = close

    # 计算对数几何平均值的标准差作为波动率
    returns = log_geometric_mean(returns).std()
    return returns

标签:return,PandasTA,Series,kwargs,returns,源码,import,close,解析
From: https://www.cnblogs.com/apachecn/p/18135794

相关文章

  • PandasTA 源码解析(十六)
    .\pandas-ta\pandas_ta\volatility\kc.py#-*-coding:utf-8-*-#从pandas库中导入DataFrame类frompandasimportDataFrame#从.true_range模块中导入true_range函数from.true_rangeimporttrue_range#从pandas_ta.overlap模块中导入ma函数frompandas_......
  • PandasTA 源码解析(十五)
    .\pandas-ta\pandas_ta\utils\_signals.py#-*-coding:utf-8-*-#导入DataFrame和Series类frompandasimportDataFrame,Series#导入自定义函数from._coreimportget_offset,verify_seriesfrom._mathimportzero#定义函数_above_below,用于比较两个Seri......
  • PandasTA 源码解析(十八)
    .\pandas-ta\pandas_ta\volume\pvol.py#-*-coding:utf-8-*-#导入所需的库和函数frompandas_ta.utilsimportget_offset,signed_series,verify_series#定义函数pvol,计算价格和成交量的乘积defpvol(close,volume,offset=None,**kwargs):"""Indicator:Pr......
  • PandasTA 源码解析(十七)
    .\pandas-ta\pandas_ta\volume\adosc.py#-*-coding:utf-8-*-#导入ad模块from.adimportad#从pandas_ta库中导入Imports模块frompandas_taimportImports#从pandas_ta.overlap模块中导入ema函数frompandas_ta.overlapimportema#从pandas_ta.util......
  • PandasTA 源码解析(二十)
    .\pandas-ta\tests\test_indicator_cycles.py#从config模块中导入error_analysis,sample_data,CORRELATION,CORRELATION_THRESHOLD,VERBOSE变量#从context模块中导入pandas_tafrom.configimporterror_analysis,sample_data,CORRELATION,CORRELATION_THRESHOLD,VER......
  • PandasTA 源码解析(十九)
    .\pandas-ta\tests\test_ext_indicator_overlap_ext.py#从当前包中导入sample_data和pandas_ta模块from.configimportsample_datafrom.contextimportpandas_ta#从unittest模块中导入skip和TestCase类fromunittestimportskip,TestCase#从pandas模块......
  • PandasTA 源码解析(二十一)
    .\pandas-ta\tests\test_indicator_performance.py#导入所需的模块和函数from.configimportsample_datafrom.contextimportpandas_ta#从unittest模块中导入TestCase类fromunittestimportTestCase#从pandas模块中导入Series类frompandasimportSeries......
  • PandasTA 源码解析(一)
    .\pandas-ta\docs\conf.py#-*-coding:utf-8-*-##ConfigurationfilefortheSphinxdocumentationbuilder.##Thisfiledoesonlycontainaselectionofthemostcommonoptions.Fora#fulllistseethedocumentation:#http://www.sphinx-doc.org/e......
  • PandasTA 源码解析(二)
    .\pandas-ta\pandas_ta\candles\cdl_inside.py#-*-coding:utf-8-*-#从pandas_ta.utils中导入candle_color和get_offset函数frompandas_ta.utilsimportcandle_color,get_offset#从pandas_ta.utils中导入verify_series函数frompandas_ta.utilsimportve......
  • PandasTA 源码解析(四)
    .\pandas-ta\pandas_ta\momentum\cg.py#-*-coding:utf-8-*-#从pandas_ta.utils中导入get_offset,verify_series,weights函数frompandas_ta.utilsimportget_offset,verify_series,weights#定义CenterofGravity(CG)指标函数defcg(close,length=None,......