import re import datetime import sqlite3 import csv import sqlite3 import datetime from db import GetAllStockCode def GetMovingAverageAt(db,code: str,day = 5,date = datetime.date.today()): if isinstance(day,list): listday = day day = max(day) else: listday = [day] last = date - datetime.timedelta(days=day) l = [row for row in db.execute("SELECT Date,Close FROM STOCK WHERE Code = ? AND Date <= ? ORDER BY Date DESC LIMIT ?", [code,last.isoformat(),day])] return [sum(map(lambda x: x[1],l[0:ad]))/ad for ad in listday] def GetStockPriceFrom(db,code: str,n,date = datetime.date.today()): """ return (code,date,close,diff,open,high,low,volume)[] """ last = date - datetime.timedelta(days=n) return [row for row in db.execute(f"SELECT * FROM STOCK WHERE Code = ? AND Date <= ? ORDER BY Date DESC LIMIT ?", [code,date.isoformat(),n])] def GetAllStockPrice(db,n,date = datetime.date.today()): """ return cursor """ last = date - datetime.timedelta(days=n) return db.execute(f"SELECT * FROM STOCK WHERE Date > ? ORDER BY Date DESC", [last.isoformat()]) #lastest is front def makeMovingAveragePoint(n,arr,reversed = False): if len(arr) < n: raise IndexError if not reversed: start = sum(arr[:n]) ret = [start/n] for i in range(0,len(arr)-n): nex = start-arr[i]+arr[i+n] ret.append(nex/n) start = nex else: start = sum(arr[-n:]) ret = [start/n] for i in range(0,len(arr)-n): nex = start-arr[-i-1]+arr[-i-1-n] ret.append(nex/n) start = nex return ret def detectTF(d5,d20)-> bool: series = [*map(lambda x: x[0] > x[1],zip(d5,d20))] return series[0] and (not series[1]) if __name__ == "__main__": print("start") db = sqlite3.connect("stock.db") #stock_codes = GetAllStockCode(db) # #for stock_code in stock_codes: # arr = GetStockPriceFrom(db,stock_code,25) # if len(arr) < 25: # print(f"stock_code {stock_code} : lack of data") # continue # #print([*map(lambda x: x[2],arr)]) # d5 = makeMovingAveragePoint(5,[*map(lambda x: x[2],arr)]) # d20 = makeMovingAveragePoint(20,[*map(lambda x: x[2],arr)]) # higher = detectTF(d5[:5],d20[:5]) # if higher: # print(f"stock_code {stock_code} : {higher} {d5} {d20}") #print(GetMovingAverageAt(db,"155660",day=[5,20])) #arr = GetStockPriceFrom(db,"155660",25) #arr.sort(key=lambda x:x[2]) #print([*map(lambda x: x[2],arr)]) #print(makeMovingAveragePoint(2,[*map(lambda x: x[2],arr)])) #print(makeMovingAveragePoint(2,[*map(lambda x: x[2],arr)],reversed=True)) #d5 = makeMovingAveragePoint(5,[*map(lambda x: x[2],arr)]) #d20 = makeMovingAveragePoint(20,[*map(lambda x: x[2],arr)]) #print(d5) #print(d20) #print(detectTF(d5,d20)) #print(GetAllStockCode(db)) db.close()