Skip to content

AI顾投高级策略之一:巴菲特的alpha(建议收藏)

写在前面的话:在这个合集的开头,我们来写写巴菲特的alpha。巴菲特的投资策略和其所实现的超额回报(即Alpha)一直是金融学界和实务界的研究焦点。Alpha是一个衡量投资性能的指标,代表了一个投资组合相对于其基准指数的超额回报。在沃伦·巴菲特的案例中,这意味着他管理的伯克希尔·哈撒韦公司的投资回报超过了一般市场的表现。

经济学家和金融分析师经常研究巴菲特的投资策略,试图量化他的Alpha并理解其能够持续战胜市场的原因。研究显示,巴菲特的Alpha部分来自于他选股的能力和行业配置的策略,而且他那看似简单的价值投资策略背后实际上是复杂和精细的财务分析及市场洞察。

巴菲特的成功和高Alpha也常被视为投资领域中的一个标杆,激励了无数投资者和基金经理去模仿他的策略和原则。不过,巴菲特本人常常强调,耐心、纪律和正确的心态是投资成功的关键。

在《巴菲特的alpha》文章里,后人把巴菲特的收益分成六个维度,分别是市场,估值,规模,动量,质量和波动率六个维度。我们今天就开始复现其中的原理,除去市场维度,我们从其他五个维度分别挑选因子,总共6个,组成6因子模型。

对于因子的处理,由于因子来自不同维度,所以无需进行降维或者因子正交处理来解决它的相关性问题,所以简单进行了去极值和标准化处理。

对于股票列表,已经进行剔除ST,上市未满60天的新股,停牌股和开盘涨停股。

对于打分方式:针对升序因子乘以-1;针对降序因子乘以1,最后进行叠加。

在择时方面,采用RSRS的方式对指数进行择时信号。

资金规模在10000000,20日调仓,回测时间从2010-01-01到2018-11-08 

算法代码如下:

"""
以下代码运行后收益还行,可以使用。

"""
import pandas as pd
import numpy as np
import datetime as dt
import talib as ta
from datetime import date, timedelta
import statsmodels.api as sm

# 初始化账户
def init(context):
    set_params(context)
    set_variables(context)
    set_backtest()
    run_daily(stop_loss)

# 设置策略参数
def set_params(context):
    g.tc = 20  # 调仓频率
    g.t = 0
    g.big_small = 'big'  # big是降序,small为升序
    context.stock = '000300.SH'
    g.long_pct = 0.05
    g.stock = '000300.SH'  # 择时选取的指数
    g.total_positionprevious = 0  # 仓位
    g.N = 18  # RSRS选取的回归长度
    g.M = 1100  # RSRS均值窗口

# 设置变量
def set_variables(context):
    context.X_length = 11
    context.flag = True
    g.buy = 0.7  # 买入阀门
    g.sell = -0.7  # 卖出阀门
    g.ans = []
    g.ans_rightdev = []

# 设置回测
def set_backtest():
    set_benchmark('000300.SH')  # 设置基准
    set_slippage(PriceSlippage(0.002))  # 设置可变滑点

# 个股止损
def stop_loss(context, bar_dict):
    for stock in list(context.portfolio.positions):
        cumulative_return = bar_dict[stock].close / context.portfolio.positions[stock].cost_basis
        if cumulative_return < 0.9:
            order_target_value(stock, 0)

# 处理K线
def handle_bar(context, bar_dict):
    stock = g.stock
    beta = 0
    r2 = 0
    prices = history(stock, ['high', 'low'], g.N, '1d', False, 'pre', is_panel=1)
    highs = prices.high
    lows = prices.low
    X = sm.add_constant(lows)
    model = sm.OLS(highs, X)
    # 得到beta
    beta = model.fit().params[1]
    # 将新的beta添加到装有历史数据列表
    g.ans.append(beta)
    # 得到rsquare数据
    r2 = model.fit().rsquared
    # 将新的rsquare添加到装有历史数据列表
    g.ans_rightdev.append(r2)
    # 为了标准化当下的beta数值,拿过去1100天的数据作为均值的窗口
    section = g.ans[-g.M:]
    # 计算均值序列
    mu = np.mean(section)
    # 计算标准化RSRS指标序列
    sigma = np.std(section)
    zscore = (section[-1] - mu) / sigma
    # 计算右偏RSRS标准分,就是将标准化后的beta数据乘以原始beta再乘以拟合度
    zscore_rightdev = zscore * beta * r2
    # 根据交易信号买入卖出
    if zscore_rightdev > g.buy:
        total_position = 1
    elif zscore_rightdev < g.sell:
        total_position = 0
    else:
        total_position = g.total_positionprevious
    if (g.total_positionprevious!= total_position) or (g.t % g.tc == 0):
        g.total_positionprevious = total_position
    last_date = get_last_datetime().strftime('%Y%m%d')
    stock_list = list(get_all_securities('stock', date=last_date).index)
    # 对stock_list进行去除st,停牌等处理
    stock_list = fun_unpaused(bar_dict, stock_list)
    stock_list = fun_st(bar_dict, stock_list)
    stock_list = fun_highlimit(bar_dict, stock_list)
    stock_list = fun_remove_new(stock_list, 60)
    # 以下是各单因子
    # 规模因子
    cap_df = market_cap(stock_list, 'valuation_market_cap', last_date)
    cap_df = cap_df * -1
    # 估值因子
    PB_df = PB(stock_list, 'valuation_pb', last_date)
    PB_df = PB_df * -1
    # 动量因子
    MTM20_df = MTM20(stock_list, 'MTM20')
    MTM20_df = MTM20_df * -1
    # 质量因子
    # 1.ROE(高利润)
    roe_df = roe(stock_list, 'profit_roe_ths', last_date)
    # 2.净利润同比增长率(高成长)
    net_profit_growth_ratio_df = net_profit_growth_ratio(stock_list, 'growth_net_profit_growth_ratio', last_date)
    # 波动率因子
    ATR20_df = ATR20(stock_list, 'ATR20')
    ATR20_df = ATR20_df * -1
    # 合并多因子
    concat_obj = [cap_df, PB_df, MTM20_df, roe_df, net_profit_growth_ratio_df, ATR20_df]
    df = pd.concat(concat_obj, axis=1)
    df = df.dropna()
    # log.info(type(df))
    sum = df.sum(axis=1)
    # log.info(sum)
    # 进行排序
    if g.big_small == 'big':
        # 按照大排序
        sum.sort_values(ascending=False, inplace=True)
    if g.big_small =='small':
        # 按照小排序
        sum.sort_values(ascending=True, inplace=True)
    # 根据比例取出排序后靠前部分
    stock_list1 = sum[0:int(len(stock_list) * g.long_pct)].index
    # log.info(stock_list1)
    buy_list = []
    for stock in stock_list1:
        buy_list.append(stock)
    # 买卖操作
    for stock in list(context.portfolio.positions):
        if stock not in buy_list:
            order_target(stock, 0)
    cash = context.portfolio.portfolio_value
    position = cash * g.total_positionprevious
    num = int(len(stock_list) * g.long_pct)
    ## 买入
    for stock in buy_list:
        order_target_value(stock, position / num)
    g.t = g.t + 1

# 以下是单因子
def market_cap(stocklist, factor, last_date):
    # 取数据
    df = get_fundamentals(query(valuation.symbol, valuation.market_cap).filter(valuation.symbol.in_(stocklist)),
                          date=last_date)
    # log.info(df)
    df = df.set_index('valuation_symbol')
    # 绝对中位数法取极值
    after_MAD = MAD(factor, df)
    # z-score法标准化
    after_zscore = zscore(factor, after_MAD)
    return after_zscore

def PB(stocklist, factor, last_date):
    # 取数据
    df = get_fundamentals(query(valuation.symbol, valuation.pb).filter(valuation.symbol.in_(stocklist)),
                          date=last_date)
    df = df.set_index('valuation_symbol')
    # 绝对中位数法取极值
    after_MAD = MAD(factor, df)
    # z-score法标准化
    after_zscore = zscore(factor, after_MAD)
    return after_zscore

def MTM20(stocklist, factor):
    # 取数据
    for stock in stocklist:
        df1 = history(stock, ['close'], 20, '1d')
        # log.info(df1)
        s = pd.DataFrame([(df1['close'][-1] - df1['close'][0]) / df1['close'][0]], index=[stock])
        # log.info(s)
        if 'df' in locals():
            df = df.append(s)
        else:
            df = s
    # log.info(df)
    df.columns = ['MTM20']
    df.index.name = 'valuation_symbol'
    # 绝对中位数法取极值
    after_MAD = MAD(factor, df)
    # z-score法标准化
    after_zscore = zscore(factor, after_MAD)
    return after_zscore

def roe(stocklist, factor, last_date):
    # 取数据
    df = get_fundamentals(query(valuation.symbol, profit.roe_ths).filter(valuation.symbol.in_(stocklist)),
                          date=last_date)
log.info(df)
df = df.set_index('valuation_symbol')
绝对中位数法取极值
after_MAD = MAD(factor, df)
z-score法标准化
after_zscore = zscore(factor, after_MAD)
return after_zscore
def net_profit_growth_ratio(stocklist, factor, last_date):
取数据
df = get_fundamentals(query(valuation.symbol, growth.net_profit_growth_ratio).filter(valuation.symbol.in_(stocklist)),
date=last_date)
log.info(df)
df = df.set_index('valuation_symbol')
绝对中位数法取极值
after_MAD = MAD(factor, df)
z-score法标准化
after_zscore = zscore(factor, after_MAD)
return after_zscore
def ATR20(stocklist, new_factor):
取数据
for stock in stocklist:
Data_ATR = history(stock, ['close', 'high', 'low'], 20, '1d')
close_ATR = np.array(Data_ATR['close'])
high_ATR = np.array(Data_ATR['high'])
low_ATR = np.array(Data_ATR['low'])
'''
if np.isnan(close_ATR).any():
continue
'''
ATR = ta.ATR(high_ATR, low_ATR, close_ATR, timeperiod=1)
indices = ~np.isnan(ATR)
result = np.average(ATR[indices])
s = pd.Series(result.astype(float), index=[stock])
if 'ATR_df' in locals():
ATR_df = ATR_df.append(s)
else:
ATR_df = s
df = ATR_df.to_frame()
df.index.name = 'valuation_symbol'
df.columns = [new_factor]
绝对中位数法取极值
after_MAD = MAD(new_factor, df)
z-score法标准化
after_zscore = zscore(new_factor, after_MAD)
return after_zscore
以下是进行因子数据处理,对因子进行MAD去极值,以及标准化处理
def MAD(factor, df):
取得中位数
median = df[factor].median()
取得数据与中位数差值
df1 = df - median
取得差值绝对值
df1 = df1.abs()
取得绝对中位数
MAD = df1[factor].median()
得到数据上下边界
extreme_upper = median + 3 * 1.483 * MAD
extreme_lower = median - 3 * 1.483 * MAD
将数据上下边界外的数值归到边界上
df.ix[(df[factor] < extreme_lower), factor] = extreme_lower
df.ix[(df[factor] > extreme_upper), factor] = extreme_upper
return df
z-score标准化
def zscore(factor, df):
取得均值
mean = df[factor].mean()
取得标准差
std = df[factor].std()
取得标准化后数据
df = (df - mean) / std
return df
以下对股票列表进行去除ST,停牌,去新股,以及去除开盘涨停股
去除开盘涨停股票
def fun_highlimit(bar_dict, stock_list):
return [stock for stock in stock_list if bar_dict[stock].open!= bar_dict[stock].high_limit]
去除st股票
def fun_st(bar_dict, stock_list):
return [stock for stock in stock_list if not bar_dict[stock].is_st]
def fun_unpaused(bar_dict, stock_list):
return [s for s in stock_list if not bar_dict[s].is_paused]
def fun_remove_new(_stock_list, days):
deltaDate = get_datetime() - dt.timedelta(days)
stock_list = []
for stock in _stock_list:
if get_security_info(stock).listed_date < deltaDate:
stock_list.append(stock)
return stock_list
import pandas as pd
import numpy as np
import datetime as dt
import talib as ta
from datetime import date, timedelta
import statsmodels.api as sm

# 初始化账户
def init(context):
    set_params(context)
    set_variables(context)
    set_backtest()
    run_daily(stop_loss)

# 设置策略参数
def set_params(context):
    g.tc = 20  # 调仓频率
    g.t = 0
    g.big_small = 'big'  # big是降序,small为升序
    context.stock = '000300.SH'
    g.long_pct = 0.05
    g.stock = '000300.SH'  # 择时选取的指数
    g.total_positionprevious = 0  # 仓位
    g.N = 18  # RSRS选取的回归长度
    g.M = 1100  # RSRS均值窗口

# 设置变量
def set_variables(context):
    context.X_length = 11
    context.flag = True
    g.buy = 0.7  # 买入阀门
    g.sell = -0.7  # 卖出阀门
    g.ans = []
    g.ans_rightdev = []

# 设置回测
def set_backtest():
    set_benchmark('000300.SH')  # 设置基准
    set_slippage(PriceSlippage(0.002))  # 设置可变滑点

# 个股止损
def stop_loss(context, bar_dict):
    for stock in list(context.portfolio.positions):
        cumulative_return = bar_dict[stock].close / context.portfolio.positions[stock].cost_basis
        if cumulative_return < 0.9:
            order_target_value(stock, 0)

# 处理K线
def handle_bar(context, bar_dict):
    stock = g.stock
    beta = 0
    r2 = 0
    prices = history(stock, ['high', 'low'], g.N, '1d', False, 'pre', is_panel=1)
    highs = prices.high
    lows = prices.low
    X = sm.add_constant(lows)
    model = sm.OLS(highs, X)
    # 得到beta
    beta = model.fit().params[1]
    # 将新的beta添加到装有历史数据列表
    g.ans.append(beta)
    # 得到rsquare数据
    r2 = model.fit().rsquared
    # 将新的rsquare添加到装有历史数据列表
    g.ans_rightdev.append(r2)
    # 为了标准化当下的beta数值,拿过去1100天的数据作为均值的窗口
    section = g.ans[-g.M:]
    # 计算均值序列
    mu = np.mean(section)
    # 计算标准化RSRS指标序列
    sigma = np.std(section)
    zscore = (section[-1] - mu) / sigma
    # 计算右偏RSRS标准分,就是将标准化后的beta数据乘以原始beta再乘以拟合度
    zscore_rightdev = zscore * beta * r2
    # 根据交易信号买入卖出
    if zscore_rightdev > g.buy:
        total_position = 1
    elif zscore_rightdev < g.sell:
        total_position = 0
    else:
        total_position = g.total_positionprevious
    if (g.total_positionprevious!= total_position) or (g.t % g.tc == 0):
        g.total_positionprevious = total_position
    last_date = get_last_datetime().strftime('%Y%m%d')
    stock_list = list(get_all_securities('stock', date=last_date).index)
    # 对stock_list进行去除st,停牌等处理
    stock_list = fun_unpaused(bar_dict, stock_list)
    stock_list = fun_st(bar_dict, stock_list)
    stock_list = fun_highlimit(bar_dict, stock_list)
    stock_list = fun_remove_new(stock_list, 60)
    # 以下是各单因子
    # 规模因子
    cap_df = market_cap(stock_list, 'valuation_market_cap', last_date)
    cap_df = cap_df * -1
    # 估值因子
    PB_df = PB(stock_list, 'valuation_pb', last_date)
    PB_df = PB_df * -1
    # 动量因子
    MTM20_df = MTM20(stock_list, 'MTM20')
    MTM20_df = MTM20_df * -1
    # 质量因子
    # 1.ROE(高利润)
    roe_df = roe(stock_list, 'profit_roe_ths', last_date)
    # 2.净利润同比增长率(高成长)
    net_profit_growth_ratio_df = net_profit_growth_ratio(stock_list, 'growth_net_profit_growth_ratio', last_date)
    # 波动率因子
    ATR20_df = ATR20(stock_list, 'ATR20')
    ATR20_df = ATR20_df * -1
    # 合并多因子
    concat_obj = [cap_df, PB_df, MTM20_df, roe_df, net_profit_growth_ratio_df, ATR20_df]
    df = pd.concat(concat_obj, axis=1)
    df = df.dropna()
    # log.info(type(df))
    sum = df.sum(axis=1)
    # log.info(sum)
    # 进行排序
    if g.big_small == 'big':
        # 按照大排序
        sum.sort_values(ascending=False, inplace=True)
    if g.big_small =='small':
        # 按照小排序
        sum.sort_values(ascending=True, inplace=True)
    # 根据比例取出排序后靠前部分
    stock_list1 = sum[0:int(len(stock_list) * g.long_pct)].index
    # log.info(stock_list1)
    buy_list = []
    for stock in stock_list1:
        buy_list.append(stock)
    # 买卖操作
    for stock in list(context.portfolio.positions):
        if stock not in buy_list:
            order_target(stock, 0)
    cash = context.portfolio.portfolio_value
    position = cash * g.total_positionprevious
    num = int(len(stock_list) * g.long_pct)
    ## 买入
    for stock in buy_list:
        order_target_value(stock, position / num)
    g.t = g.t + 1

# 以下是单因子
def market_cap(stocklist, factor, last_date):
    # 取数据
    df = get_fundamentals(query(valuation.symbol, valuation.market_cap).filter(valuation.symbol.in_(stocklist)),
                          date=last_date)
    # log.info(df)
    df = df.set_index('valuation_symbol')
    # 绝对中位数法取极值
    after_MAD = MAD(factor, df)
    # z-score法标准化
    after_zscore = zscore(factor, after_MAD)
    return after_zscore

def PB(stocklist, factor, last_date):
    # 取数据
    df = get_fundamentals(query(valuation.symbol, valuation.pb).filter(valuation.symbol.in_(stocklist)),
                          date=last_date)
    df = df.set_index('valuation_symbol')
    # 绝对中位数法取极值
    after_MAD = MAD(factor, df)
    # z-score法标准化
    after_zscore = zscore(factor, after_MAD)
    return after_zscore

def MTM20(stocklist, factor):
    # 取数据
    for stock in stocklist:
        df1 = history(stock, ['close'], 20, '1d')
        # log.info(df1)
        s = pd.DataFrame([(df1['close'][-1] - df1['close'][0]) / df1['close'][0]], index=[stock])
        # log.info(s)
        if 'df' in locals():
            df = df.append(s)
        else:
            df = s
    # log.info(df)
    df.columns = ['MTM20']
    df.index.name = 'valuation_symbol'
    # 绝对中位数法取极值
    after_MAD = MAD(factor, df)
    # z-score法标准化
    after_zscore = zscore(factor, after_MAD)
    return after_zscore

def roe(stocklist, factor, last_date):
    # 取数据
    df = get_fundamentals(query(valuation.symbol, profit.roe_ths).filter(valuation.symbol.in_(stocklist)),
                          date=last_date)
log.info(df)
df = df.set_index('valuation_symbol')
绝对中位数法取极值
after_MAD = MAD(factor, df)
z-score法标准化
after_zscore = zscore(factor, after_MAD)
return after_zscore
def net_profit_growth_ratio(stocklist, factor, last_date):
取数据
df = get_fundamentals(query(valuation.symbol, growth.net_profit_growth_ratio).filter(valuation.symbol.in_(stocklist)),
date=last_date)
log.info(df)
df = df.set_index('valuation_symbol')
绝对中位数法取极值
after_MAD = MAD(factor, df)
z-score法标准化
after_zscore = zscore(factor, after_MAD)
return after_zscore
def ATR20(stocklist, new_factor):
取数据
for stock in stocklist:
Data_ATR = history(stock, ['close', 'high', 'low'], 20, '1d')
close_ATR = np.array(Data_ATR['close'])
high_ATR = np.array(Data_ATR['high'])
low_ATR = np.array(Data_ATR['low'])
'''
if np.isnan(close_ATR).any():
continue
'''
ATR = ta.ATR(high_ATR, low_ATR, close_ATR, timeperiod=1)
indices = ~np.isnan(ATR)
result = np.average(ATR[indices])
s = pd.Series(result.astype(float), index=[stock])
if 'ATR_df' in locals():
ATR_df = ATR_df.append(s)
else:
ATR_df = s
df = ATR_df.to_frame()
df.index.name = 'valuation_symbol'
df.columns = [new_factor]
绝对中位数法取极值
after_MAD = MAD(new_factor, df)
z-score法标准化
after_zscore = zscore(new_factor, after_MAD)
return after_zscore
以下是进行因子数据处理,对因子进行MAD去极值,以及标准化处理
def MAD(factor, df):
取得中位数
median = df[factor].median()
取得数据与中位数差值
df1 = df - median
取得差值绝对值
df1 = df1.abs()
取得绝对中位数
MAD = df1[factor].median()
得到数据上下边界
extreme_upper = median + 3 * 1.483 * MAD
extreme_lower = median - 3 * 1.483 * MAD
将数据上下边界外的数值归到边界上
df.ix[(df[factor] < extreme_lower), factor] = extreme_lower
df.ix[(df[factor] > extreme_upper), factor] = extreme_upper
return df
z-score标准化
def zscore(factor, df):
取得均值
mean = df[factor].mean()
取得标准差
std = df[factor].std()
取得标准化后数据
df = (df - mean) / std
return df
以下对股票列表进行去除ST,停牌,去新股,以及去除开盘涨停股
去除开盘涨停股票
def fun_highlimit(bar_dict, stock_list):
return [stock for stock in stock_list if bar_dict[stock].open!= bar_dict[stock].high_limit]
去除st股票
def fun_st(bar_dict, stock_list):
return [stock for stock in stock_list if not bar_dict[stock].is_st]
def fun_unpaused(bar_dict, stock_list):
return [s for s in stock_list if not bar_dict[s].is_paused]
def fun_remove_new(_stock_list, days):
deltaDate = get_datetime() - dt.timedelta(days)
stock_list = []
for stock in _stock_list:
if get_security_info(stock).listed_date < deltaDate:
stock_list.append(stock)
return stock_list

Published inAI&Invest专栏

Be First to Comment

    发表回复