import argparse import json import os import sqlite3 from typing import Dict, List from render import * import db as database from jinja2 import Environment, PackageLoader, select_autoescape import pandas as pd import tqdm class DataStore: def __init__(self) -> None: self.db = sqlite3.connect("stock.db") self.pricesCache: Dict[str,] = {} def getAllKRXCorp(self) -> List[database.KRXCorp]: return database.GetAllKRXCorp(self.db) def getStockPrice(self,code,length) -> pd.DataFrame: if code in self.pricesCache and len(self.pricesCache[code]) >= length: return self.pricesCache[code] else: s = GetStockPriceFrom(self.db,code,length) s = pd.DataFrame(s, columns=[s for s in database.STOCK_INDEX.__members__.keys()]) s.set_index("DATE", inplace=True) self.pricesCache[code] = s return self.pricesCache[code] def clearCache(self) -> None: self.pricesCache = {} def __del__(self) -> None: self.db.close() class OutputCollectorElement: def __init__(self, name: str, description: str) -> None: self.name = name self.description = description self.corpListByDate:Dict[str,database.KRXCorp] = {} def __str__(self) -> str: return f"OutputCollectorElement:{self.name}" def addCorp(self, date, corp): self.corpListByDate.setdefault(date, []).append(corp) def toDict(self) -> Dict: return { "name": self.name, "description": self.description, "corpListByDate": {k:[d.toDict() for d in v] for k,v in self.corpListByDate.items()} } class OutputCollector: def __init__(self) -> None: self.data: Dict[str,OutputCollectorElement] = {} def addResult(self, key, help = ""): """ add output category to collect """ self.data[key] = OutputCollectorElement(key, help) def collect(self, key, corp, date): self.data[key].addCorp(date, corp) def isVolumeNTimes(stock: pd.DataFrame, mul: float, nday:int, order=1) -> bool: return stock.iloc[nday]['VOLUME'] > stock.iloc[nday+order]['VOLUME'] * mul def isVolumeMulPriceGreaterThan(stock: pd.DataFrame, threshold: int, nday: int) -> bool: return stock.iloc[nday]['VOLUME'] * stock.iloc[nday]['CLOSE'] > threshold def isMACDCrossSignal(signal: pd.Series, macd: pd.Series, nday: int, order=1) -> bool: return (signal.iloc[nday] < macd.iloc[nday] and signal.iloc[nday+order] > macd.iloc[nday+order]) def isRelativeDiffLessThan(a:pd.Series,b:pd.Series, threshold: float,nday:int) -> bool: return (a.iloc[nday] - b.iloc[nday]) / b.iloc[nday] < threshold def isDiffGreaterThan(a:pd.Series,b:pd.Series, nday:int) -> bool: """a is bigger than b""" return (a.iloc[nday] > b.iloc[nday]) def prepareCollector(collector: OutputCollector) -> None: import pages for item in pages.GenLists: collector.addResult(item.name, item.description) def collect(data: DataStore, collector: OutputCollector, corp: database.KRXCorp , nday: int) -> None: stock = data.getStockPrice(corp.Code,70) if len(stock) < 70: return d5 = stock["CLOSE"].loc[::-1].rolling(window=5 ).mean().dropna().loc[::-1] d20 = stock["CLOSE"].loc[::-1].rolling(window=20 ).mean().dropna().loc[::-1] d60 = stock["CLOSE"].loc[::-1].rolling(window=60 ).mean().dropna().loc[::-1] if (isRelativeDiffLessThan(d5, d20, 0.01, nday) and isRelativeDiffLessThan(d5, d60, 0.01, nday)): collector.collect("cross 2", corp, stock.index[nday]) if (isVolumeNTimes(stock, 3, 0) and isVolumeMulPriceGreaterThan(stock, 100000, nday)): collector.collect("cross 3", corp, stock.index[nday]) if (isRelativeDiffLessThan(d20, d60, 0.01, nday) and isVolumeMulPriceGreaterThan(stock, 1000000, nday)): collector.collect("cross 4", corp, stock.index[nday]) if (isDiffGreaterThan(d5, d20, nday)): collector.collect("d20d5", corp, stock.index[nday]) if (isVolumeNTimes(stock, 5, nday)): collector.collect("d20d5VolumeX5", corp, stock.index[nday]) if (isRelativeDiffLessThan(d5, d20, 0.03, nday) and isRelativeDiffLessThan(d5, d60, 0.03, nday) and isVolumeNTimes(stock, 3, nday)): collector.collect("DiffDistance", corp, stock.index[nday]) if (isVolumeNTimes(stock, 3, nday)): collector.collect("volume", corp, stock.index[nday]) if (isVolumeMulPriceGreaterThan(stock, 50000000, nday)): collector.collect("volume5", corp, stock.index[nday]) if (isVolumeNTimes(stock, 5, nday)): collector.collect("volumeX5", corp, stock.index[nday]) ewm12 = stock["CLOSE"].loc[::-1].ewm(span=12).mean().loc[::-1] ewm26 = stock["CLOSE"].loc[::-1].ewm(span=26).mean().loc[::-1] macd = (ewm12 - ewm26) signal = macd.ewm(span=9).mean() if (isMACDCrossSignal(macd, signal, nday)): collector.collect("macd", corp, stock.index[nday]) parser = argparse.ArgumentParser(description="주식 검색 정보를 출력합니다.") parser.add_argument("--format", "-f", choices=["json", "html"], default="html", help="출력 포맷을 지정합니다. 기본값은 html입니다.") parser.add_argument("--dir", "-d", default=".", help="출력할 폴더를 지정합니다.") parser.add_argument("--corp", "-c", help="주식 코드를 지정합니다. 지정하지 않으면 모든 주식을 검색합니다.") parser.add_argument("--printStdout", action="store_true", help="출력한 결과를 표준 출력으로 출력합니다.") parser.add_argument("--version", "-v", action="version", version="%(prog)s 1.0") parser.add_argument("--verbose", "-V", action="store_true", help="출력할 내용을 자세히 표시합니다.") if __name__ == "__main__": args = parser.parse_args() dataStore = DataStore() krx_corps = dataStore.getAllKRXCorp() if args.corp: krx_corps = [corp for corp in krx_corps if corp.Code == args.corp] env = Environment( loader=PackageLoader('render', 'templates'), autoescape=select_autoescape(['html', 'xml']) ) collector = OutputCollector() prepareCollector(collector) for corp in tqdm.tqdm(krx_corps): for nday in range(0, 5): collect(dataStore, collector, corp, nday) dataStore.clearCache() for k,v in collector.data.items(): if args.format == "json": data = json.dumps(v.toDict(), indent=4, ensure_ascii=False) if args.printStdout: print(k) print(data) else: with open(os.path.join(args.dir, k + ".json", encoding="UTF-8"), "w") as f: f.write(data) else: template = env.get_template("Lists.html") days = v.corpListByDate.keys() days = list(days) days.sort(reverse=True) days = days[:5] html = template.render(collected=v, title=k, days=days) if args.printStdout: print(html) else: with open(os.path.join(args.dir, k + ".html"), "w", encoding="UTF-8") as f: f.write(html)