Stock/gen.py

305 lines
12 KiB
Python

import argparse
import json
import os
import sqlite3
from typing import Dict, List
from render import *
import db as database
from jinja2 import Environment, PackageLoader, select_autoescape
import pandas as pd
import tqdm
class DataStore:
def __init__(self) -> None:
self.db = sqlite3.connect("stock.db")
self.pricesCache: Dict[str,] = {}
def getAllKRXCorp(self) -> List[database.KRXCorp]:
return database.GetAllKRXCorp(self.db)
def _getCorpsInCorpGroup(self, table_name: str) -> List[database.KRXCorp]:
cursor = self.db.execute(f"select c.* from {table_name} as k INNER JOIN KRXCorp as c on k.Name = c.Name")
return [database.KRXCorp.from_db(r) for r in cursor]
def getKosdaq(self) -> List[database.KRXCorp]:
return self._getCorpsInCorpGroup("KOSDAQ")
def getKospi(self) -> List[database.KRXCorp]:
return self._getCorpsInCorpGroup("KOSPI")
def getKosdaqAndKospi(self) -> List[database.KRXCorp]:
return self.getKospi() + self.getKosdaq()
def getStockPrice(self,code,length) -> pd.DataFrame:
if code in self.pricesCache and len(self.pricesCache[code]) >= length:
return self.pricesCache[code]
else:
s = GetStockPriceFrom(self.db,code,length)
s = pd.DataFrame(s,
columns=[s for s in database.STOCK_INDEX.__members__.keys()])
s.set_index("DATE", inplace=True)
self.pricesCache[code] = s
return self.pricesCache[code]
def clearCache(self) -> None:
self.pricesCache = {}
def __del__(self) -> None:
self.db.close()
class OutputCollectorElement:
def __init__(self, name: str, description: str) -> None:
self.name = name
self.description = description
self.corpListByDate:Dict[str,database.KRXCorp] = {}
def __str__(self) -> str:
return f"OutputCollectorElement:{self.name}"
def addCorp(self, date, corp):
self.corpListByDate.setdefault(date, []).append(corp)
def toDict(self) -> Dict:
return {
"name": self.name,
"description": self.description,
"corpListByDate": {k:[d.toDict() for d in v]
for k,v in self.corpListByDate.items()}
}
class OutputCollector:
def __init__(self) -> None:
self.data: Dict[str,OutputCollectorElement] = {}
def addResult(self, key, help = ""):
"""
add output category to collect
"""
self.data[key] = OutputCollectorElement(key, help)
def collect(self, key, corp, date):
self.data[key].addCorp(date, corp)
def isVolumeNTimes(stock: pd.DataFrame, mul: float, nday:int, order=1) -> bool:
return stock.iloc[nday]['VOLUME'] > stock.iloc[nday+order]['VOLUME'] * mul
def isVolumeMulPriceGreaterThan(stock: pd.DataFrame, threshold: int, nday: int) -> bool:
return stock.iloc[nday]['VOLUME'] * stock.iloc[nday]['CLOSE'] > threshold
def isMACDCrossSignal(signal: pd.Series, macd: pd.Series, nday: int, order=1) -> bool:
return (signal.iloc[nday] > macd.iloc[nday] and
signal.iloc[nday+order] < macd.iloc[nday+order])
def isRelativeDiffLessThan(a:pd.Series,b:pd.Series, threshold: float,nday:int) -> bool:
return abs(a.iloc[nday] - b.iloc[nday]) / b.iloc[nday] < threshold
def isDiffGreaterThan(a:pd.Series,b:pd.Series, nday:int) -> bool:
"""a is bigger than b"""
return (a.iloc[nday] > b.iloc[nday])
def calc_rsi(price: pd.Series, period: int = 14):
delta = price.diff()
up, down = delta.copy(), delta.copy()
up[up < 0] = 0
down[down > 0] = 0
roll_up1 = up.rolling(period).mean()
roll_down1 = down.abs().rolling(period).mean()
RS1 = roll_up1 / roll_down1
return pd.Series(100 - (100 / (1 + RS1)), name='RSI')
def prepareCollector(collector: OutputCollector) -> None:
import pages
for item in pages.GenLists:
collector.addResult(item["name"], item["description"])
def every(f, xs):
for x in xs:
if not f(x):
return False
return True
def collect(data: DataStore, collector: OutputCollector, corp: database.KRXCorp
, ndays: List[int]) -> None:
stock = data.getStockPrice(corp.Code,250)
if len(stock) <= 245:
return
for nday in ndays:
if (stock.iloc[nday]['VOLUME'] <= 0):
return
close = stock["CLOSE"]
openv = stock["OPEN"]
#high = stock["HIGH"]
#low = stock["LOW"]
#d3 = close.loc[::-1].rolling(window=3
# ).mean().dropna().loc[::-1]
fetch_len = len(ndays) + 10
def d(n):
return close.iloc[:(n+fetch_len)].loc[::-1].rolling(window=n
).mean().dropna().loc[::-1]
def d_std(n):
return close.iloc[:(n+fetch_len)].loc[::-1].rolling(window=n
).std().dropna().loc[::-1]
d5 = d(5)
d20 = d(20)
d25 = d(25)
d30 = d(30)
d45 = d(45)
d60 = d(60)
d120 = d(120)
d240 = d(240)
# 표준편차
d_std25 = d_std(25)
bollinger_upperband = d25 + 2* d_std25
a = [d20, d30, d60, d120]
for nday in ndays:
if openv[nday] <= d20[nday] and d20[nday] <= close[nday]:
collector.collect("양봉사이20일선", corp, stock.index[nday])
if bollinger_upperband[nday] <= close[nday]:
collector.collect("볼린저 밴드 25", corp, stock.index[nday])
if every(lambda i: isRelativeDiffLessThan(i,close,0.05,nday), a):
collector.collect("뭉침", corp, stock.index[nday])
if d120[nday + 1] < d120[nday]:
collector.collect("뭉침5% 120선 상승", corp, stock.index[nday])
if every(lambda i: isRelativeDiffLessThan(i,close,0.01,nday), a):
collector.collect("뭉침01", corp, stock.index[nday])
if every(lambda i: isRelativeDiffLessThan(i,close,0.03,nday), a):
collector.collect("뭉침03", corp, stock.index[nday])
if d120[nday + 1] < d120[nday]:
collector.collect("뭉침3% 120선 상승", corp, stock.index[nday])
if every(lambda i: isRelativeDiffLessThan(i,close,0.05,nday), [d20, d30, d60, d120, d240]):
collector.collect("뭉침 240선까지", corp, stock.index[nday])
if (d5[nday] > d20[nday] and d5[nday + 1] < d20[nday + 1]):
collector.collect("cross d20 and d5", corp, stock.index[nday])
if (isDiffGreaterThan(d5, d20, nday)):
collector.collect("d20d5", corp, stock.index[nday])
if (isVolumeNTimes(stock, 5, nday)):
collector.collect("d20d5VolumeX5", corp, stock.index[nday])
if (isVolumeNTimes(stock, 3, nday)):
collector.collect("volume", corp, stock.index[nday])
if (isVolumeMulPriceGreaterThan(stock, 50000000, nday)):
collector.collect("volume5", corp, stock.index[nday])
if (isVolumeNTimes(stock, 5, nday)):
collector.collect("volumeX5", corp, stock.index[nday])
D240BiggerThanYesterDay = d240[nday + 1] <= d240[nday]
D240Bounce = d240[nday + 2] >= d240[nday + 1] and D240BiggerThanYesterDay
D120Bounce = d120[nday + 2] >= d120[nday + 1] and d120[nday + 1] <= d120[nday]
# D240Cross = low[nday] <= d240[nday] and d240[nday] <= high[nday]
if (D240Bounce):
collector.collect("240일선 반등",corp,stock.index[nday])
if (D120Bounce):
collector.collect("120일선 반등",corp,stock.index[nday])
if (D240BiggerThanYesterDay):
collector.collect("240일 증가", corp, stock.index[nday])
if (d60[nday + 1] < d60[nday]):
collector.collect("정배열60", corp, stock.index[nday])
if (d20[nday + 1] < d20[nday]):
collector.collect("정배열20", corp, stock.index[nday])
if (D240BiggerThanYesterDay):
collector.collect("정배열240", corp, stock.index[nday])
if(d5[nday + 1] <= d5[nday] and
d120[nday + 1] <= d120[nday]):
collector.collect("모두 정배열", corp, stock.index[nday])
if (d5[nday + 1] < d20[nday + 1] and d20[nday] < d5[nday]):
collector.collect("d20d5돌파", corp, stock.index[nday])
ewm5 = close.loc[::-1].ewm(span=5).mean().loc[::-1]
ewm10 = close.loc[::-1].ewm(span=10).mean().loc[::-1]
macd = (ewm5 - ewm10)
signal = macd.loc[::-1].ewm(span=4).mean().loc[::-1]
rsi = calc_rsi(close.loc[::-1],14).dropna().loc[::-1]
rsi.reset_index(drop = True, inplace = True)
for nday in ndays:
if (isMACDCrossSignal(macd, signal, nday)):
collector.collect("macd", corp, stock.index[nday])
if (d45[2+nday] > d45[1+nday] and d45[1+nday] < d45[nday]):
collector.collect("45일선 반등",corp, stock.index[nday])
if(d60[10+nday] <= d60[nday]):
collector.collect("60일 10일 반등", corp, stock.index[nday])
if(d20[2+nday] > d20[1+nday] and d20[1+nday] < d20[nday]):
collector.collect("20일선 반등",corp, stock.index[nday])
for nday in ndays:
if(rsi[nday] < 30):
collector.collect("RSI 30 이하", corp, stock.index[nday])
#rsi_signal = macd.loc[::-1].ewm(span=7).mean().loc[::-1]
parser = argparse.ArgumentParser(description="주식 검색 정보를 출력합니다.")
parser.add_argument("--format", "-f", choices=["json", "html"], default="html",
help="출력 포맷을 지정합니다. 기본값은 html입니다.")
parser.add_argument("--dir", "-d", default=".", help="출력할 폴더를 지정합니다.")
parser.add_argument("--corp", "-c", help="주식 코드를 지정합니다. 지정하지 않으면 kosdaq과 kospi만 검색합니다.")
parser.add_argument("--fullSearch", help="모든 주식을 검색합니다.", action='store_true')
parser.add_argument("--printStdout", action="store_true", help="출력한 결과를 표준 출력으로 출력합니다.")
parser.add_argument("--version", "-v", action="version", version="%(prog)s 1.0")
parser.add_argument("--verbose", "-V", action="store_true", help="출력할 내용을 자세히 표시합니다.")
if __name__ == "__main__":
args = parser.parse_args()
dataStore = DataStore()
if args.fullSearch:
krx_corps = dataStore.getAllKRXCorp()
else:
krx_corps = dataStore.getKosdaqAndKospi()
if args.corp:
krx_corps = [corp for corp in krx_corps if corp.Code == args.corp]
env = Environment(
loader=PackageLoader('render', 'templates'),
autoescape=select_autoescape(['html', 'xml'])
)
collector = OutputCollector()
prepareCollector(collector)
for corp in tqdm.tqdm(krx_corps):
ndays = [nday for nday in range(0, 5)]
collect(dataStore, collector, corp, ndays)
dataStore.clearCache()
for k,v in collector.data.items():
if args.format == "json":
data = json.dumps(v.toDict(), indent=4, ensure_ascii=False)
if args.printStdout:
print(k)
print(data)
else:
with open(os.path.join(args.dir, k + ".json"), "w", encoding="UTF-8") as f:
f.write(data)
else:
template = env.get_template("Lists.html")
days = v.corpListByDate.keys()
days = list(days)
days.sort(reverse=True)
days = days[:5]
html = template.render(collected=v, title=k, days=days, lastUpdate=datetime.date.today().isoformat())
if args.printStdout:
print(html)
else:
with open(os.path.join(args.dir, k + ".html"), "w", encoding="UTF-8") as f:
f.write(html)