import argparse import json import os import sqlite3 from typing import Dict, List from render import * import db as database import pandas as pd import tqdm class DataStore: def __init__(self) -> None: self.db = sqlite3.connect("stock.db") self.pricesCache: Dict[str,] = {} def getAllKRXCorp(self) -> List[database.KRXCorp]: return database.GetAllKRXCorp(self.db) def _getCorpsInCorpGroup(self, table_name: str) -> List[database.KRXCorp]: cursor = self.db.execute(f"select c.* from {table_name} as k INNER JOIN KRXCorp as c on k.Name = c.Name") return [database.KRXCorp.from_db(r) for r in cursor] def getKosdaq(self) -> List[database.KRXCorp]: return self._getCorpsInCorpGroup("KOSDAQ") def getKospi(self) -> List[database.KRXCorp]: return self._getCorpsInCorpGroup("KOSPI") def getKosdaqAndKospi(self) -> List[database.KRXCorp]: return self.getKospi() + self.getKosdaq() def getStockPrice(self,code,length) -> pd.DataFrame: if code in self.pricesCache and len(self.pricesCache[code]) >= length: return self.pricesCache[code] else: s = GetStockPriceFrom(self.db,code,length) s = pd.DataFrame(s, columns=[s for s in database.STOCK_INDEX.__members__.keys()]) s.set_index("DATE", inplace=True) self.pricesCache[code] = s return self.pricesCache[code] def clearCache(self) -> None: self.pricesCache = {} def __del__(self) -> None: self.db.close() class OutputCollectorElement: def __init__(self, name: str, description: str) -> None: self.name = name self.description = description self.corpListByDate:Dict[str,database.KRXCorp] = {} def __str__(self) -> str: return f"OutputCollectorElement:{self.name}" def addCorp(self, date, corp): self.corpListByDate.setdefault(date, []).append(corp) def toDict(self) -> Dict: return { "name": self.name, "description": self.description, "corpListByDate": {k:[d.toDict() for d in v] for k,v in self.corpListByDate.items()} } class OutputCollector: def __init__(self) -> None: self.data: Dict[str,OutputCollectorElement] = {} def addResult(self, key, help = ""): """ add output category to collect """ self.data[key] = OutputCollectorElement(key, help) def collect(self, key, corp, date): self.data[key].addCorp(date, corp) def isVolumeNTimes(stock: pd.DataFrame, mul: float, nday:int, order=1) -> bool: return stock.iloc[nday]['VOLUME'] > stock.iloc[nday+order]['VOLUME'] * mul def isVolumeMulPriceGreaterThan(stock: pd.DataFrame, threshold: int, nday: int) -> bool: return stock.iloc[nday]['VOLUME'] * stock.iloc[nday]['CLOSE'] > threshold def isMACDCrossSignal(signal: pd.Series, macd: pd.Series, nday: int, order=1) -> bool: return (signal.iloc[nday] > macd.iloc[nday] and signal.iloc[nday+order] < macd.iloc[nday+order]) def isRelativeDiffLessThan(a:pd.Series,b:pd.Series, threshold: float,nday:int) -> bool: return abs(a.iloc[nday] - b.iloc[nday + 1]) / b.iloc[nday + 1] < threshold def isDiffGreaterThan(a:pd.Series,b:pd.Series, nday:int) -> bool: """a is bigger than b""" return (a.iloc[nday] > b.iloc[nday]) def calc_rsi(price: pd.Series, period: int = 14): delta = price.diff() up, down = delta.copy(), delta.copy() up[up < 0] = 0 down[down > 0] = 0 roll_up1 = up.rolling(period).mean() roll_down1 = down.abs().rolling(period).mean() RS1 = roll_up1 / roll_down1 return pd.Series(100 - (100 / (1 + RS1)), name='RSI') def prepareCollector(collector: OutputCollector) -> None: import pages for item in pages.GenLists: collector.addResult(item["name"], item["description"]) def every(f, xs): for x in xs: if not f(x): return False return True def collect(data: DataStore, collector: OutputCollector, corp: database.KRXCorp , ndays: List[int]) -> None: stock = data.getStockPrice(corp.Code,250) if len(stock) <= 245: return for nday in ndays: if (stock.iloc[nday]['VOLUME'] <= 0): return close = stock["CLOSE"] openv = stock["OPEN"] #high = stock["HIGH"] #low = stock["LOW"] #d3 = close.loc[::-1].rolling(window=3 # ).mean().dropna().loc[::-1] fetch_len = len(ndays) + 10 def d(n): return close.iloc[:(n+fetch_len)].loc[::-1].rolling(window=n ).mean().dropna().loc[::-1] def d_std(n): return close.iloc[:(n+fetch_len)].loc[::-1].rolling(window=n ).std().dropna().loc[::-1] d5 = d(5) d20 = d(20) d25 = d(25) d45 = d(45) d60 = d(60) d120 = d(120) d240 = d(240) # 표준편차 d_std25 = d_std(25) bollinger_upperband = d25 + 2* d_std25 a = [d5, d20, d45] for nday in ndays: if openv[nday] <= d240[nday] and d240[nday] <= close[nday] and d240[nday + 1] < d240[nday]: collector.collect("양봉사이240일선증가", corp, stock.index[nday]) if d5[nday + 1] < d5[nday] and d5[nday + 2] > d5[nday + 1] and d20[nday + 1] < d20[nday]: collector.collect("5일선반등120선증가", corp, stock.index[nday]) if openv[nday] <= d20[nday] and d20[nday] <= close[nday]: collector.collect("양봉사이20일선", corp, stock.index[nday]) if bollinger_upperband[nday] <= close[nday]: collector.collect("볼린저 밴드 25", corp, stock.index[nday]) if every(lambda i: isRelativeDiffLessThan(i,close,0.05,nday), a): collector.collect("뭉침", corp, stock.index[nday]) if d120[nday + 1] < d120[nday]: collector.collect("뭉침5% 120선 상승", corp, stock.index[nday]) if every(lambda i: isRelativeDiffLessThan(i,close,0.01,nday), a): collector.collect("뭉침01", corp, stock.index[nday]) if d120[nday + 1] < d120[nday]: collector.collect("뭉침1% 120선 상승", corp, stock.index[nday]) if every(lambda i: isRelativeDiffLessThan(i,close,0.03,nday), a): collector.collect("뭉침03", corp, stock.index[nday]) if d120[nday + 1] < d120[nday]: collector.collect("뭉침3% 120선 상승", corp, stock.index[nday]) if (d5[nday] > d20[nday] and d5[nday + 1] < d20[nday + 1]): collector.collect("cross d20 and d5", corp, stock.index[nday]) if (isDiffGreaterThan(d5, d20, nday)): collector.collect("d20d5", corp, stock.index[nday]) if (isVolumeNTimes(stock, 5, nday)): collector.collect("d20d5VolumeX5", corp, stock.index[nday]) if (isVolumeNTimes(stock, 3, nday)): collector.collect("volume", corp, stock.index[nday]) if (isVolumeMulPriceGreaterThan(stock, 50000000, nday)): collector.collect("volume5", corp, stock.index[nday]) if (isVolumeNTimes(stock, 5, nday)): collector.collect("volumeX5", corp, stock.index[nday]) D240BiggerThanYesterDay = d240[nday + 1] <= d240[nday] D240Bounce = d240[nday + 2] >= d240[nday + 1] and D240BiggerThanYesterDay D120Bounce = d120[nday + 2] >= d120[nday + 1] and d120[nday + 1] <= d120[nday] # D240Cross = low[nday] <= d240[nday] and d240[nday] <= high[nday] if (D240Bounce): collector.collect("240일선 반등",corp,stock.index[nday]) if (D120Bounce): collector.collect("120일선 반등",corp,stock.index[nday]) if (D240BiggerThanYesterDay): collector.collect("240일 증가", corp, stock.index[nday]) if max([d[nday] for d in (d20, d60, d120)]) < min(close[nday], openv[nday]): collector.collect("떠있음", corp, stock.index[nday]) if (d60[nday + 1] < d60[nday]): collector.collect("정배열60", corp, stock.index[nday]) if d5[nday + 2] >= d5[nday + 1] and d5[nday + 1] <= d5[nday]: collector.collect("60일 5일선 반등", corp, stock.index[nday]) if (d20[nday + 1] < d20[nday]): collector.collect("정배열20", corp, stock.index[nday]) if (D240BiggerThanYesterDay): collector.collect("정배열240", corp, stock.index[nday]) if(d5[nday + 1] <= d5[nday] and d120[nday + 1] <= d120[nday]): collector.collect("모두 정배열", corp, stock.index[nday]) if(d120[nday + 1] <= d120[nday] and d120[nday + 1] < d240[nday] and d120[nday] >= d240[nday]): collector.collect("120선240선추월", corp, stock.index[nday]) if (d5[nday + 1] < d20[nday + 1] and d20[nday] < d5[nday]): collector.collect("d20d5돌파", corp, stock.index[nday]) ewm5 = close.loc[::-1].ewm(span=5).mean().loc[::-1] ewm10 = close.loc[::-1].ewm(span=10).mean().loc[::-1] macd = (ewm5 - ewm10) signal = macd.loc[::-1].ewm(span=4).mean().loc[::-1] rsi = calc_rsi(close.loc[::-1],14).dropna().loc[::-1] rsi.reset_index(drop = True, inplace = True) for nday in ndays: if (isMACDCrossSignal(macd, signal, nday)): collector.collect("macd", corp, stock.index[nday]) if (d45[2+nday] > d45[1+nday] and d45[1+nday] < d45[nday]): collector.collect("45일선 반등",corp, stock.index[nday]) if(d20[2+nday] > d20[1+nday] and d20[1+nday] < d20[nday]): collector.collect("20일선 반등",corp, stock.index[nday]) for nday in ndays: if(rsi[nday] < 30): collector.collect("RSI 30 이하", corp, stock.index[nday]) #rsi_signal = macd.loc[::-1].ewm(span=7).mean().loc[::-1] parser = argparse.ArgumentParser(description="주식 검색 정보를 출력합니다.") parser.add_argument("--dir", "-d", default=".", help="출력할 폴더를 지정합니다.") parser.add_argument("--corp", "-c", help="주식 코드를 지정합니다. 지정하지 않으면 kosdaq과 kospi만 검색합니다.") parser.add_argument("--fullSearch", help="모든 주식을 검색합니다.", action='store_true') parser.add_argument("--printStdout", action="store_true", help="출력한 결과를 표준 출력으로 출력합니다.") parser.add_argument("--version", "-v", action="version", version="%(prog)s 1.0") parser.add_argument("--verbose", "-V", action="store_true", help="출력할 내용을 자세히 표시합니다.") if __name__ == "__main__": args = parser.parse_args() dataStore = DataStore() if args.fullSearch: krx_corps = dataStore.getAllKRXCorp() else: krx_corps = dataStore.getKosdaqAndKospi() if args.corp: krx_corps = [corp for corp in krx_corps if corp.Code == args.corp] collector = OutputCollector() prepareCollector(collector) for corp in tqdm.tqdm(krx_corps): ndays = [nday for nday in range(0, 5)] collect(dataStore, collector, corp, ndays) dataStore.clearCache() for k,v in collector.data.items(): data = json.dumps(v.toDict(), ensure_ascii=False) if args.printStdout: print(k) print(data) else: with open(os.path.join(args.dir, k + ".json"), "w", encoding="UTF-8") as f: f.write(data)