Load Data
데이터 출처 : 증권사 API, 네이버 금융, 금튜형, 유료 데이터 벤더
- 데이터의 cleaning, validation 必
- Missing Value 전처리 必
- 존재하지 않는 지표는 다른 지표로 대체
# 코드를 돌릴 때 warning이 안나오게 하기
import warnings
warnings.filterwarnings('ignore')
# Load
df = pd.read_csv("my_data/fin_statement_new.csv")
df.head()
# Drop column
df = df.drop(["상장일"], axis=1)
# Rename Column
df = df.rename(columns={
"DPS(보통주, 현금+주식, 연간)": "DPS",
"P/E(Adj., FY End)": "PER",
"P/B(Adj., FY End)": "PBR",
"P/S(Adj., FY End)": "PSR",
})
Nan 체크
df.groupby(['year'])['Name'].nunique()
# Sol 1. 'code'-'name' 중복 체크
# 갯수만으로는 정확한 code-name mapping의 비교가 어렵다.
df.groupby(['year'])['Name'].nunique().equals(df.groupby(['year'])['Code'].nunique())
# True
# Sol 2. 'code'-'name' 중복 체크
df.groupby(['year', 'Name'])['Code'].nunique()
df.groupby(['year', 'Name'])['Code'].nunique().nunique()
# 1
Yearly Returns
yearly_price_df = df.pivot(index="year", columns="Name", values="수정주가")
# pivot : 형태 변경 함수.
yearly_price_df.head()
rtn 구하기 : (Pn+1 / Pn) - 1
# 1. year_price_df.pct_change() == year_price_df / year_price_df.shift() - 1
# 2. `shift(-1)`을 하는 이유?
# - 데이터를 "xx년도에서 1년동안 들고있었더니, xx만큼 수익이 났다"로 해석하고 싶기 때문
yearly_rtn_df = yearly_price_df.pct_change(fill_method=None).shift(-1)
yearly_rtn_df.head()
* 상장 폐지 종목은 어떻게 처리?
- 2011/12에 매수했다면, 1년의 rtn value는 보장됨
- 2012/12에 매수했다면, 2013/1 or 2013/12에 상폐됐을 수도 있기에 rtn이 nan 처리됨.
백테스팅의 종류
1. Vectorized backtesting
- 과거 데이터를 하나의 벡터화(numpy, pandas)하여, 벡터들간의 연산으로 진행하는 것
- 수백~수천개의 과거 데이터를 한번에 처리할 수 있기 때문에 빠르다
2. Event-based backtesting
- 데이터가 실제 이용 가능한 시점을 event화 하여, 실제 각각의 데이터를 event마다 받아서 백테스팅하는 방법
- Vectorized 방법보다는 느리지만, 실제 주식 시장에서의 로직을 그대로 반영하기 때문에 최소한의 코드 수정으로 실전에 바로 투입 可
return_df = pd.DataFrame(
[
[np.nan, np.nan, 2 ],
[3, np.nan, 3 ],
[5, 6, np.nan],
],
columns=["삼성", "현대", "SK"]
)
asset_on_df = pd.DataFrame(
[
[0, 1],
[0, 1],
[1, 0],
],
columns=["삼성", "SK"]
)
return_df
asset_on_df
선택하지 않은 종목을 0으로 메꿔버리면, 선택은 했지만 실제 가격 변동이 없어서 수익률이 0인 경우와 선택을 안했는데 0으로 표시한 경우를 구분할 수 없게된다. 따라서 nan으로 메꾸는 것이 더 타당하다.
asset_on_df = asset_on_df.replace(0, np.nan)
return_df * asset_on_df
top_n
indicator = "ROA"
top_n = 10
# 연도별, ROA 수치가 top 10인 종목
top_n_indicator_df = df.groupby(['year'])[indicator].nlargest(top_n).reset_index()
top_n_indicator_df.head()
top_n_indicator_df.tail()
# 종목 indexing
top_n_roa_df = df.loc[top_n_indicator_df['level_1']]
top_n_roa_df.head()
indicator_df = top_n_roa_df.pivot(index="year", columns="Name", values="ROA")
indicator_df.head()
* nan값을 가지고 있는 종목은 아예 고려대상에서 제외됨
Backtest
asset_on_df = indicator_df.notna().astype(int).replace(0, np.nan)
selected_return_df = yearly_rtn_df * asset_on_df
selected_return_df.notna().sum(axis=1)
# year
# 2006 10
# 2007 10
# 2008 10
# 2009 10
# 2010 10
# 2011 10
# 2012 10
# 2013 10
# 2014 10
# 2015 9
# 2016 8
# 2017 0
# dtype: int64
1. nan이 아닌 데이터 조회하기
a = asset_on_df.iloc[0]
a[a.notna()]
# Name
# DMS 1.000
# 루미마이크로 1.000
# 시너지이노베이션 1.000
# 아이디에스 1.000
# 에스엔유 1.000
# 유진테크 1.000
# 이노와이어리스 1.000
# 제우스 1.000
# 제주반도체 1.000
# 플랜티넷 1.000
# Name: 2006, dtype: float64
# 기존값은 2006 부터 시작하므로, 2005를 0으로 설정
rtn_series.loc[2005] = 0
rtn_series = rtn_series.sort_index()
rtn_series
2. 포트폴리오 누적 수익률 데이터
# rtn_series + 1 : 1+a
# cumprod : acummulated production , 누적 곱
cum_rtn_series = (rtn_series + 1).cumprod().dropna()
cum_rtn_series
# year
# 2005 1.000
# 2006 1.057
# 2007 0.417
# 2008 0.672
3. 시각화
%matplotlib inline
import matplotlib.pyplot as plt
fig, axes = plt.subplots(nrows=2, figsize=(15, 6), sharex=True)
axes[0].plot(cum_rtn_series.index, cum_rtn_series, marker='o');
axes[0].set_title("Cum return(line)");
axes[1].bar(rtn_series.index, rtn_series);
axes[1].set_title("Yearly return(bar)");
4. 함수화
def get_return_series(selected_return_df):
rtn_series = selected_return_df.mean(axis=1)
rtn_series.loc[2005] = 0 # 주의: 영상속의 데이터와는 달리, 새로 업로드 된 데이터는 2006부터 존재하므로
# 2004가 아니라 2005를 0으로 설정한 점에 주의해주세요
rtn_series = rtn_series.sort_index()
cum_rtn_series = (rtn_series + 1).cumprod().dropna()
return rtn_series, cum_rtn_series
def plot_return(cum_rtn_series, rtn_series):
fig, axes = plt.subplots(nrows=2, figsize=(15, 6), sharex=True)
axes[0].plot(cum_rtn_series.index, cum_rtn_series, marker='o');
axes[1].bar(rtn_series.index, rtn_series);
axes[0].set_title("Cum return(line)");
axes[1].set_title("Yearly return(bar)");
rtn_series, cum_rtn_series = get_return_series(selected_return_df)
plot_return(cum_rtn_series, rtn_series)
"할 수 있다 퀀트투자" 전략 구현
"소형주 + 저 PBR 전략"
- Filter : 소형주 (시가총액 하위 20%)
- Select : (PBR 0.2) 이상, PBR이 가장 낮은 주식순으로 20~30개 매수
#
# Filter
#
market_cap_quantile_series = df.groupby("year")['시가총액'].quantile(.2)
filtered_df = df.join(market_cap_quantile_series, on="year", how="left", rsuf fix="20%_quantile")
filtered_df = filtered_df[filtered_df['시가총액'] <= filtered_df['시가총액20%_quantile']]
filtered_df.head()
#
# Selector
#
filtered_df = filtered_df[filtered_df['PBR'] >= 0.2]
smallest_pbr_series = filtered_df.groupby("year")['PBR'].nsmallest(15)
smallest_pbr_series
selected_index = smallest_pbr_series.index.get_level_values(1)
selector_df = filtered_df.loc[selected_index].pivot(
index='year', columns="Name", values="PBR"
)
selector_df.head()
asset_on_df = selector_df.notna().astype(int).replace(0, np.nan)
selected_return_df = yearly_rtn_df * asset_on_df
rtn_series, cum_rtn_series = get_return_series(selected_return_df)
plot_return(cum_rtn_series, rtn_series)
"투자전략 24"
- Filter : 시가총액 하위 20%
- Selector : PBR, PCR, PER, PSR 순위를 매김 --> 순위의 sum을 해서 통합 순위를 구함 --> 통합 순위가 가장 높은 종목 50개 매수
#
# Filter
#
market_cap_quantile_series = df.groupby("year")['시가총액'].quantile(.2)
filtered_df = df.join(market_cap_quantile_series, on="year", how="left", rsuffix="20%_quantile")
filtered_df = filtered_df[filtered_df['시가총액'] <= filtered_df['시가총액20%_quantile']]
pbr_rank_series = filtered_df.groupby("year")['PBR'].rank(method="max")
per_rank_series = filtered_df.groupby("year")['PER'].rank(method="max")
psr_rank_series = filtered_df.groupby("year")['PSR'].rank(method="max")
psr_rank_series.head()
# 21 39.000
# 34 43.000
# 40 119.000
# 44 292.000
# 52 29.000
# Name: PSR, dtype: float64
psr_rank_series.sort_values().dropna()
filtered_df = filtered_df.join(pbr_rank_series, how="left", rsuffix="_rank")
filtered_df = filtered_df.join(per_rank_series, how="left", rsuffix="_rank")
filtered_df = filtered_df.join(psr_rank_series, how="left", rsuffix="_rank")
filtered_df.head()
* 어떻게 각 rank column의 nan을 메꿔야 하는가
#
# 주의: 종목을 선택하는 로직에 따라, '가장 작은 rank'로 부여하는게 타당할 수도 있고, '가장 큰 rank'로 부여하는 것이 타당할 수도 있습니다.
# 예를들어, PER이 작을수록 종목 선정에 우선 순위가 있도록 할 예정이고, PER이 작을수록 rank값이 작도록 설정했다면,
# PER이 nan인 종목들은 PER rank가 가장 큰 값(혹은 그 값보다 +1인 값)으로 메꿔져야 penalty를 받을 수 있습니다.
#
# 1. 0으로 메꾸는 법
filtered_df.loc[:, filtered_df.filter(like="rank").columns] = filtered_df.filter(like="rank").fillna(0)
# 2. 각 rank별 max 값 (혹은 그것보다 1 큰 값)으로 메꾸는 법
# filtered_df['PBR_rank'] = filtered_df['PBR_rank'].fillna(filtered_df['PBR_rank'].max() + 1)
# filtered_df['PER_rank'] = filtered_df['PER_rank'].fillna(filtered_df['PER_rank'].max() + 1)
# filtered_df['PSR_rank'] = filtered_df['PSR_rank'].fillna(filtered_df['PSR_rank'].max() + 1)
filtered_df['rank_sum'] = filtered_df.filter(like="_rank").sum(axis=1)
#
# Selector
#
max_rank_series = filtered_df.groupby("year")['rank_sum'].nlargest(15)
selected_index = max_rank_series.index.get_level_values(1)
selector_df = filtered_df.loc[selected_index].pivot(
index='year', columns="Name", values="rank_sum"
)
asset_on_df = selector_df.notna().astype(int).replace(0, np.nan)
selected_return_df = yearly_rtn_df * asset_on_df
rtn_series, cum_rtn_series = get_return_series(selected_return_df)
plot_return(cum_rtn_series, rtn_series)
'Quant' 카테고리의 다른 글
BackTesting : 매매 내역 분석 (0) | 2025.02.03 |
---|---|
Parameter Optimization (0) | 2025.02.03 |
Quant : Visualization - Seaborn (0) | 2022.04.13 |
Quant : Visualization - Matplotlib (0) | 2022.04.13 |
Quant : Pandas - 데이터 합치기 (0) | 2022.04.05 |