🧓 ปู่ชวนไปเที่ยว
🎮 เล่นเกมส์
⏰ Time:
🌡️ Temp: Loading...

📌 ❤️❤️Pu@Bot💋💋Ally V.2.0

โดย puk

2025-09-02 00:12

import ccxt.async_support as ccxt_async import pandas as pd import ta from colorama import init, Fore, Style import time import logging import sqlite3 import requests import numpy as np from datetime import datetime, timedelta import os from dotenv import load_dotenv from tqdm import tqdm import asyncio import random from itertools import product # Load environment variables load_dotenv() # Setup logging logging.basicConfig(filename='trading_bot.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # Initialize colorama init(autoreset=True) # Setup SQLite database def init_db(): with sqlite3.connect('trades.db') as conn: c = conn.cursor() c.execute('''CREATE TABLE IF NOT EXISTS trades (timestamp TEXT, exchange TEXT, symbol TEXT, type TEXT, entry_price REAL, exit_price REAL, amount REAL, profit REAL)''') c.execute('''CREATE TABLE IF NOT EXISTS backtest_results (timestamp TEXT, start_date TEXT, end_date TEXT, win_rate REAL, total_profit REAL, total_trades INTEGER, parameters TEXT)''') conn.commit() log_status("Database initialized successfully") # Log status to console, log file, and Telegram with custom header last_alert_time = 0 def log_status(message, color=Fore.CYAN): header = "❤️❤️Pu@Bot💋💋Ally V.2.1" full_message = f"{header} | {message}" print(f"{color}{full_message}{Style.RESET_ALL}") logging.info(full_message) send_telegram_alert(full_message) # Telegram alert function with rate limiting def send_telegram_alert(message): global last_alert_time if message.startswith("❤️❤️Pu@Bot💋💋Ally V.2.1 | Waiting for next check"): return # Skip non-critical messages current_time = time.time() if current_time - last_alert_time < 10: # 10-second cooldown time.sleep(10 - (current_time - last_alert_time)) retries = 3 for attempt in range(retries): try: url = f"https://api.telegram.org/bot{os.getenv('TELEGRAM_BOT_TOKEN')}/sendMessage" payload = {'chat_id': os.getenv('TELEGRAM_CHAT_ID'), 'text': message} response = requests.post(url, json=payload) response.raise_for_status() last_alert_time = time.time() return except requests.exceptions.HTTPError as e: if e.response.status_code == 429: log_status(f"Telegram Rate Limit Exceeded. Retrying in 10 seconds... (Attempt {attempt + 1}/{retries})", Fore.YELLOW) time.sleep(10) else: log_status(f"Telegram Alert Error: {str(e)}", Fore.RED) break except Exception as e: log_status(f"Telegram Alert Error: {str(e)}", Fore.RED) break # Calculate Fibonacci levels def calculate_fibonacci_levels(high, low): diff = high - low return { '0.0%': low, '23.6%': low + diff * 0.236, '38.2%': low + diff * 0.382, '50.0%': low + diff * 0.50, '61.8%': low + diff * 0.618, '78.6%': low + diff * 0.786, '100.0%': high, '127.2%': high + diff * 0.272, '161.8%': high + diff * 0.618 } # Initialize exchanges def init_exchanges(): exchanges = { 'binance': ccxt_async.binance({ 'apiKey': os.getenv('BINANCE_API_KEY'), 'secret': os.getenv('BINANCE_SECRET'), 'enableRateLimit': True, }) } log_status("Exchanges initialized successfully") return exchanges # Fetch historical data for backtesting async def fetch_historical_data(exchange, symbol, timeframe, start_date, end_date): log_status(f"Loading historical OHLCV data for {symbol} from {start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')}...") try: since = int(start_date.timestamp() * 1000) ohlcv = await exchange.fetch_ohlcv(symbol, timeframe, since=since, limit=1000) df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') df = df[df['timestamp'] <= end_date] log_status(f"Historical data loaded successfully: {len(df)} candles") return df except Exception as e: log_status(f"Error fetching historical data: {str(e)}", Fore.RED) return None # Apply indicators def apply_indicators(df, params): log_status("Applying technical indicators...") df['sma_fast'] = ta.trend.SMAIndicator(close=df['close'], window=params['sma_fast_window']).sma_indicator() df['sma_slow'] = ta.trend.SMAIndicator(close=df['close'], window=params['sma_slow_window']).sma_indicator() df['rsi'] = ta.momentum.RSIIndicator(close=df['close'], window=params['rsi_window']).rsi() macd = ta.trend.MACD(close=df['close'], window_fast=params['macd_fast'], window_slow=params['macd_slow'], window_sign=params['macd_signal']) df['macd'] = macd.macd() df['macd_signal'] = macd.macd_signal() df['macd_hist'] = macd.macd_diff() df['atr'] = ta.volatility.AverageTrueRange(high=df['high'], low=df['low'], close=df['close'], window=params['atr_window']).average_true_range() bb = ta.volatility.BollingerBands(close=df['close'], window=params['bb_window'], window_dev=params['bb_std']) df['bb_upper'] = bb.bollinger_hband() df['bb_lower'] = bb.bollinger_lband() df['bb_mid'] = bb.bollinger_mavg() df['bb_percent'] = (df['close'] - df['bb_lower']) / (df['bb_upper'] - df['bb_lower']) stoch = ta.momentum.StochasticOscillator(high=df['high'], low=df['low'], close=df['close'], window=params['stoch_k'], smooth_window=params['stoch_smooth']) df['stoch_k'] = stoch.stoch() df['stoch_d'] = stoch.stoch_signal() log_status("Indicators applied successfully") return df # Evaluate signals def evaluate_signals(df, params, exchange_name, symbol): last_row = df.iloc[-1] prev_row = df.iloc[-2] signals = [] signal_desc = [] # 1. SMA Crossover if prev_row['sma_fast'] < prev_row['sma_slow'] and last_row['sma_fast'] > last_row['sma_slow']: signals.append('BUY_SMA_CROSS') signal_desc.append("SMA Crossover (Fast > Slow)") elif prev_row['sma_fast'] > prev_row['sma_slow'] and last_row['sma_fast'] < last_row['sma_slow']: signals.append('SELL_SMA_CROSS') signal_desc.append("SMA Crossover (Fast < Slow)") # 2. SMA Position if last_row['close'] > last_row['sma_slow']: signals.append('BUY_SMA_POS') signal_desc.append("Price > SMA Slow") elif last_row['close'] < last_row['sma_slow']: signals.append('SELL_SMA_POS') signal_desc.append("Price < SMA Slow") # 3. MACD Crossover if prev_row['macd'] < prev_row['macd_signal'] and last_row['macd'] > last_row['macd_signal']: signals.append('BUY_MACD_CROSS') signal_desc.append("MACD > Signal Line") elif prev_row['macd'] > prev_row['macd_signal'] and last_row['macd'] < last_row['macd_signal']: signals.append('SELL_MACD_CROSS') signal_desc.append("MACD < Signal Line") # 4. MACD Histogram if last_row['macd_hist'] > 0: signals.append('BUY_MACD_HIST') signal_desc.append("MACD Histogram > 0") elif last_row['macd_hist'] < 0: signals.append('SELL_MACD_HIST') signal_desc.append("MACD Histogram < 0") # 5. RSI Threshold if last_row['rsi'] < params['rsi_buy_threshold']: signals.append('BUY_RSI_THRESH') signal_desc.append(f"RSI < {params['rsi_buy_threshold']}") elif last_row['rsi'] > params['rsi_sell_threshold']: signals.append('SELL_RSI_THRESH') signal_desc.append(f"RSI > {params['rsi_sell_threshold']}") # 6. RSI Trend if last_row['rsi'] > prev_row['rsi']: signals.append('BUY_RSI_TREND') signal_desc.append("RSI Increasing") elif last_row['rsi'] < prev_row['rsi']: signals.append('SELL_RSI_TREND') signal_desc.append("RSI Decreasing") # 7. ATR Threshold if last_row['atr'] > params['atr_threshold']: signals.append('BUY_SELL_ATR_THRESH') signal_desc.append(f"ATR > {params['atr_threshold']}") # 8. ATR Trend if last_row['atr'] > prev_row['atr']: signals.append('BUY_SELL_ATR_TREND') signal_desc.append("ATR Increasing") # 9. Bollinger Bands Position if last_row['close'] < last_row['bb_lower']: signals.append('BUY_BB_POS') signal_desc.append("Price < BB Lower") elif last_row['close'] > last_row['bb_upper']: signals.append('SELL_BB_POS') signal_desc.append("Price > BB Upper") # 10. Bollinger Bands %B if last_row['bb_percent'] < 0.2: signals.append('BUY_BB_PERCENT') signal_desc.append("BB %B < 0.2") elif last_row['bb_percent'] > 0.8: signals.append('SELL_BB_PERCENT') signal_desc.append("BB %B > 0.8") # 11. Stochastic %K if last_row['stoch_k'] < 20: signals.append('BUY_STOCH_K') signal_desc.append("Stochastic %K < 20") elif last_row['stoch_k'] > 80: signals.append('SELL_STOCH_K') signal_desc.append("Stochastic %K > 80") # 12. Stochastic Crossover if last_row['stoch_k'] > last_row['stoch_d'] and prev_row['stoch_k'] <= prev_row['stoch_d']: signals.append('BUY_STOCH_CROSS') signal_desc.append("Stochastic %K > %D") elif last_row['stoch_k'] < last_row['stoch_d'] and prev_row['stoch_k'] >= prev_row['stoch_d']: signals.append('SELL_STOCH_CROSS') signal_desc.append("Stochastic %K < %D") buy_signals = [s for s in signals if s.startswith('BUY')] sell_signals = [s for s in signals if s.startswith('SELL')] buy_sell_signals = [s for s in signals if s.startswith('BUY_SELL')] signal = None if len(buy_signals) + len(buy_sell_signals) >= 6: signal = 'BUY' log_status(f"BUY signal detected on {exchange_name} at price {last_row['close']}. Signals: {', '.join(signal_desc)}", Fore.GREEN) elif len(sell_signals) + len(buy_sell_signals) >= 6: signal = 'SELL' log_status(f"SELL signal detected on {exchange_name} at price {last_row['close']}. Signals: {', '.join(signal_desc)}", Fore.RED) else: log_status("No signal detected") return signal, signal_desc # Backtest function def backtest(df, params, symbol): log_status("Starting backtest...") trades = [] position = None balance = 10000 # Starting balance in USDT investment_percentage = params['investment_percentage'] for i in range(1, len(df)): last_row = df.iloc[i] current_price = last_row['close'] atr = last_row['atr'] # Calculate Fibonacci levels high = df['high'].iloc[max(0, i-20):i+1].max() low = df['low'].iloc[max(0, i-20):i+1].min() fib_levels = calculate_fibonacci_levels(high, low) # Check signal signal, _ = evaluate_signals(df.iloc[:i+1], params, 'backtest', symbol) # Open position if signal and not position: investment = balance * investment_percentage amount = investment / current_price if signal == 'BUY': entry_price = current_price sl_price = entry_price - (atr * params['sl_multiplier']) tp_price = fib_levels['61.8%'] if fib_levels['61.8%'] > entry_price else entry_price + (atr * params['tp_multiplier']) position = {'type': 'Long', 'entry_price': entry_price, 'sl_price': sl_price, 'tp_price': tp_price, 'amount': amount} log_status(f"Opened Long position: Entry={entry_price}, SL={sl_price}, TP={tp_price}") elif signal == 'SELL': entry_price = current_price sl_price = entry_price + (atr * params['sl_multiplier']) tp_price = fib_levels['38.2%'] if fib_levels['38.2%'] < entry_price else entry_price - (atr * params['tp_multiplier']) position = {'type': 'Short', 'entry_price': entry_price, 'sl_price': sl_price, 'tp_price': tp_price, 'amount': amount} log_status(f"Opened Short position: Entry={entry_price}, SL={sl_price}, TP={tp_price}") # Check SL/TP if position: if position['type'] == 'Long' and (current_price <= position['sl_price'] or current_price >= position['tp_price']): profit = (current_price - position['entry_price']) * position['amount'] trades.append({'type': 'Long', 'entry_price': position['entry_price'], 'exit_price': current_price, 'profit': profit}) balance += profit log_status(f"Closed Long position: Price={current_price}, Profit={profit}", Fore.YELLOW) position = None elif position['type'] == 'Short' and (current_price >= position['sl_price'] or current_price <= position['tp_price']): profit = (position['entry_price'] - current_price) * position['amount'] trades.append({'type': 'Short', 'entry_price': position['entry_price'], 'exit_price': current_price, 'profit': profit}) balance += profit log_status(f"Closed Short position: Price={current_price}, Profit={profit}", Fore.YELLOW) position = None # Calculate metrics total_trades = len(trades) wins = sum(1 for trade in trades if trade['profit'] > 0) win_rate = (wins / total_trades * 100) if total_trades > 0 else 0 total_profit = sum(trade['profit'] for trade in trades) max_drawdown = calculate_max_drawdown(trades, 10000) result_message = (f"=== Backtest Results ===\n" f"Total Trades: {total_trades}\n" f"Win Rate: {win_rate:.2f}%\n" f"Total Profit: {total_profit:.2f} USDT\n" f"Final Balance: {balance:.2f} USDT\n" f"Max Drawdown: {max_drawdown:.2f}%") log_status(result_message, Fore.GREEN) return { 'trades': trades, 'win_rate': win_rate, 'total_profit': total_profit, 'total_trades': total_trades, 'final_balance': balance, 'max_drawdown': max_drawdown } # Calculate max drawdown def calculate_max_drawdown(trades, initial_balance): balance = initial_balance peak = initial_balance max_drawdown = 0 for trade in trades: balance += trade['profit'] peak = max(peak, balance) drawdown = (peak - balance) / peak * 100 max_drawdown = max(max_drawdown, drawdown) return max_drawdown # Optimize parameters async def optimize_parameters(exchange, symbol, timeframe, start_date, end_date): log_status("Optimizing parameters...") param_ranges = { 'sma_fast_window': [5, 8, 10, 12, 15], 'sma_slow_window': [20, 22, 25, 28, 30], 'rsi_window': [10, 12, 14, 17], 'rsi_buy_threshold': [60, 65, 70], 'rsi_sell_threshold': [20, 25, 30], 'macd_fast': [12, 14, 16], 'macd_slow': [26, 28, 30], 'macd_signal': [9, 12], 'atr_window': [14, 20], 'atr_threshold': [0.5, 1.0, 1.5], 'sl_multiplier': [1.5, 2.0, 2.5], 'tp_multiplier': [2.0, 3.0, 4.0], 'bb_window': [20, 25], 'bb_std': [2.0, 2.5], 'stoch_k': [14, 20], 'stoch_d': [3, 5], 'stoch_smooth': [3, 5], 'investment_percentage': [0.25] } param_combinations = list(product( param_ranges['sma_fast_window'], param_ranges['sma_slow_window'], param_ranges['rsi_window'], param_ranges['rsi_buy_threshold'], param_ranges['rsi_sell_threshold'], param_ranges['macd_fast'], param_ranges['macd_slow'], param_ranges['macd_signal'], param_ranges['atr_window'], param_ranges['atr_threshold'], param_ranges['sl_multiplier'], param_ranges['tp_multiplier'], param_ranges['bb_window'], param_ranges['bb_std'], param_ranges['stoch_k'], param_ranges['stoch_d'], param_ranges['stoch_smooth'], param_ranges['investment_percentage'] )) best_params = None best_win_rate = 0 best_profit = 0 results = [] for combo in param_combinations[:50]: # Limit to 50 combinations params = { 'sma_fast_window': combo[0], 'sma_slow_window': combo[1], 'rsi_window': combo[2], 'rsi_buy_threshold': combo[3], 'rsi_sell_threshold': combo[4], 'macd_fast': combo[5], 'macd_slow': combo[6], 'macd_signal': combo[7], 'atr_window': combo[8], 'atr_threshold': combo[9], 'sl_multiplier': combo[10], 'tp_multiplier': combo[11], 'bb_window': combo[12], 'bb_std': combo[13], 'stoch_k': combo[14], 'stoch_d': combo[15], 'stoch_smooth': combo[16], 'investment_percentage': combo[17] } log_status(f"Testing parameters: {params}") df = await fetch_historical_data(exchange, symbol, timeframe, start_date, end_date) if df is None: continue df = apply_indicators(df, params) result = backtest(df, params, symbol) results.append({ 'params': params, 'win_rate': result['win_rate'], 'total_profit': result['total_profit'], 'total_trades': result['total_trades'], 'max_drawdown': result['max_drawdown'] }) if result['win_rate'] > best_win_rate or (result['win_rate'] == best_win_rate and result['total_profit'] > best_profit): best_win_rate = result['win_rate'] best_profit = result['total_profit'] best_params = params log_status(f"New best parameters found: Win Rate={best_win_rate:.2f}%, Profit={best_profit:.2f}") # Save results to SQLite with sqlite3.connect('trades.db') as conn: c = conn.cursor() for result in results: c.execute("INSERT INTO backtest_results VALUES (?, ?, ?, ?, ?, ?, ?)", (datetime.now().strftime('%Y-%m-%d %H:%M:%S'), start_date.strftime('%Y-%m-%d'), end_date.strftime('%Y-%m-%d'), result['win_rate'], result['total_profit'], result['total_trades'], str(result['params']))) conn.commit() log_status("Parameter optimization completed") return best_params # Fetch OHLCV data async def get_ohlcv(exchange, symbol, timeframe, params): log_status(f"Loading live OHLCV data for {symbol}...") try: ohlcv = await exchange.fetch_ohlcv(symbol, timeframe, limit=100) df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') df = apply_indicators(df, params) log_status("Live OHLCV data loaded successfully") return df except ccxt_async.NetworkError as e: log_status(f"Network Error fetching OHLCV: {str(e)}", Fore.RED) return None except ccxt_async.InsufficientFunds as e: log_status(f"Insufficient Funds: {str(e)}", Fore.RED) return None except Exception as e: log_status(f"Unexpected Error fetching OHLCV: {str(e)}", Fore.RED) return None # Check trading signal def check_signal(df, exchange_name, symbol, params): if df is None: return None signal, signal_desc = evaluate_signals(df, params, exchange_name, symbol) return signal # Progress bar def display_progress_bar(): for _ in tqdm(range(10), desc="Analyzing", ncols=50): time.sleep(0.2) log_status("Analysis completed") # Update status function async def update_status(exchanges, positions, config, last_prices): for exchange_name, exchange in exchanges.items(): log_status(f"Updating status for {exchange_name}...") try: ticker = await exchange.fetch_ticker(config['symbol']) current_price = ticker['last'] if exchange_name in last_prices: price_change = abs(current_price - last_prices[exchange_name]) / last_prices[exchange_name] * 100 if price_change > config['swing_threshold']: message = (f"⚠️ Price Swing Detected on {exchange_name}: " f"{config['symbol']} changed by {price_change:.2f}% " f"(from {last_prices[exchange_name]:.2f} to {current_price:.2f})") log_status(message, Fore.YELLOW) last_prices[exchange_name] = current_price balance = (await exchange.fetch_balance())['total']['USDT'] unrealized_pnl = 0 if exchange_name in positions: pos = positions[exchange_name] if pos['type'] == 'Long': unrealized_pnl = (current_price - pos['entry_price']) * pos['amount'] elif pos['type'] == 'Short': unrealized_pnl = (pos['entry_price'] - current_price) * pos['amount'] status_message = (f"📊 Status Update for {exchange_name}:\n" f" Symbol: {config['symbol']}\n" f" Current Price: {current_price:.2f} USDT\n" f" Balance: {balance:.2f} USDT\n" f" Unrealized PNL: {unrealized_pnl:.2f} USDT") log_status(status_message, Fore.CYAN) except Exception as e: log_status(f"Status Update Error on {exchange_name}: {str(e)}", Fore.RED) # Select timeframe def select_timeframe(): timeframes = { '1': '15m', '2': '30m', '3': '45m', '4': '1h', '5': '4h', '6': '8h', '7': '1d' } print(f"{Fore.CYAN}=== Select Timeframe ===") print("1. 15 minutes") print("2. 30 minutes") print("3. 45 minutes") print("4. 1 hour") print("5. 4 hours") print("6. 8 hours") print("7. 1 day") choice = input(f"{Fore.YELLOW}Select timeframe (1-7): ") timeframe = timeframes.get(choice, '4h') # Default to 4h if invalid log_status(f"Selected timeframe: {timeframe}") return timeframe # Select trading pair def select_trading_pair(): trading_pairs = { '1': 'BTC/USDT', '2': 'ETH/USDT', '3': 'BNB/USDT', '4': 'XRP/USDT', '5': 'ADA/USDT', '6': 'SOL/USDT', '7': 'DOGE/USDT', '8': 'DOT/USDT', '9': 'MATIC/USDT', '10': 'AVAX/USDT', '11': 'LINK/USDT', '12': 'LTC/USDT', '13': 'BCH/USDT', '14': 'ALGO/USDT', '15': 'ATOM/USDT', '16': 'XLM/USDT', '17': 'TRX/USDT', '18': 'ETC/USDT', '19': 'VET/USDT', '20': 'FTM/USDT' } print(f"{Fore.CYAN}=== Select Trading Pair ===") for key, value in trading_pairs.items(): print(f"{key}. {value}") choice = input(f"{Fore.YELLOW}Select trading pair (1-20): ") symbol = trading_pairs.get(choice, 'BNB/USDT') # Default to BNB/USDT if invalid log_status(f"Selected trading pair: {symbol}") return symbol # Main trading bot async def trading_bot(config, exchanges): init_db() config['symbol'] = select_trading_pair() # Select trading pair config['timeframe'] = select_timeframe() # Select timeframe log_status(f"Starting live trading with {config['symbol']} on timeframe {config['timeframe']}...") positions = {} last_prices = {} last_status_update = time.time() while True: try: current_time = time.time() if current_time - last_status_update >= config['status_update_interval']: await update_status(exchanges, positions, config, last_prices) last_status_update = current_time for exchange_name, exchange in exchanges.items(): print(f"\n{Fore.CYAN}=== Checking {exchange_name} ===") display_progress_bar() df = await get_ohlcv(exchange, config['symbol'], config['timeframe'], config) if df is None: continue current_price = df['close'].iloc[-1] atr = df['atr'].iloc[-1] last_prices[exchange_name] = current_price signal = check_signal(df, exchange_name, config['symbol'], config) if signal: print(f"{Fore.RED}🚨 Signal Detected: {signal} on {exchange_name}! 🚨") log_status(f"Signal Detected: {signal} on {exchange_name} at price {current_price}", Fore.RED) high = df['high'].iloc[-20:].max() low = df['low'].iloc[-20:].min() fib_levels = calculate_fibonacci_levels(high, low) balance = (await exchange.fetch_balance())['total']['USDT'] investment = balance * config['investment_percentage'] amount = investment / current_price if signal == 'BUY' and exchange_name not in positions: entry_price = current_price sl_price = entry_price - (atr * config['sl_multiplier']) tp_price = fib_levels['61.8%'] if fib_levels['61.8%'] > entry_price else entry_price + (atr * config['tp_multiplier']) try: log_status(f"Opening Long position on {exchange_name}...") order = await exchange.create_market_buy_order(config['symbol'], amount) positions[exchange_name] = { 'type': 'Long', 'entry_price': entry_price, 'sl_price': sl_price, 'tp_price': tp_price, 'amount': amount, 'order_id': order['id'] } log_status(f"Opened Long Position on {exchange_name}: Entry={entry_price}, SL={sl_price}, TP={tp_price}, Amount={amount}", Fore.GREEN) except ccxt_async.InsufficientFunds: log_status(f"Insufficient funds on {exchange_name}", Fore.RED) continue elif signal == 'SELL' and exchange_name not in positions: entry_price = current_price sl_price = entry_price + (atr * config['sl_multiplier']) tp_price = fib_levels['38.2%'] if fib_levels['38.2%'] < entry_price else entry_price - (atr * config['tp_multiplier']) try: log_status(f"Opening Short position on {exchange_name}...") order = await exchange.create_market_sell_order(config['symbol'], amount) positions[exchange_name] = { 'type': 'Short', 'entry_price': entry_price, 'sl_price': sl_price, 'tp_price': tp_price, 'amount': amount, 'order_id': order['id'] } log_status(f"Opened Short Position on {exchange_name}: Entry={entry_price}, SL={sl_price}, TP={tp_price}, Amount={amount}", Fore.RED) except ccxt_async.InsufficientFunds: log_status(f"Insufficient funds on {exchange_name}", Fore.RED) continue if exchange_name in positions: pos = positions[exchange_name] print(f"{Fore.MAGENTA}Position Status: {pos['type']} | Entry: {pos['entry_price']} | Current: {current_price} | SL: {pos['sl_price']} | TP: {pos['tp_price']}") with sqlite3.connect('trades.db') as conn: c = conn.cursor() if pos['type'] == 'Long' and (current_price <= pos['sl_price'] or current_price >= pos['tp_price']): log_status(f"Closing Long position on {exchange_name}...") await exchange.create_market_sell_order(config['symbol'], pos['amount']) profit = (current_price - pos['entry_price']) * pos['amount'] c.execute("INSERT INTO trades VALUES (?, ?, ?, ?, ?, ?, ?, ?)", (datetime.now().strftime('%Y-%m-%d %H:%M:%S'), exchange_name, config['symbol'], 'Long', pos['entry_price'], current_price, pos['amount'], profit)) conn.commit() log_status(f"Closed Long Position on {exchange_name}: Price={current_price}, Profit={profit}", Fore.YELLOW) del positions[exchange_name] elif pos['type'] == 'Short' and (current_price >= pos['sl_price'] or current_price <= pos['tp_price']): log_status(f"Closing Short position on {exchange_name}...") await exchange.create_market_buy_order(config['symbol'], pos['amount']) profit = (pos['entry_price'] - current_price) * pos['amount'] c.execute("INSERT INTO trades VALUES (?, ?, ?, ?, ?, ?, ?, ?)", (datetime.now().strftime('%Y-%m-%d %H:%M:%S'), exchange_name, config['symbol'], 'Short', pos['entry_price'], current_price, pos['amount'], profit)) conn.commit() log_status(f"Closed Short Position on {exchange_name}: Price={current_price}, Profit={profit}", Fore.YELLOW) del positions[exchange_name] log_status(f"Waiting for next check on {exchange_name}...") await asyncio.sleep(300) except Exception as e: log_status(f"Error: {str(e)}", Fore.RED) await asyncio.sleep(60) finally: for exchange in exchanges.values(): await exchange.close() # Show menu def show_menu(): print(f"{Fore.CYAN}=== Trading Bot Menu ===") print("1. Live Trading") print("2. Backtest") print("3. Exit") choice = input(f"{Fore.YELLOW}Select an option (1-3): ") log_status(f"Menu option selected: {choice}") return choice # Run backtest with user input async def run_backtest(exchanges, config): exchange = exchanges['binance'] log_status("Starting backtest setup...") print(f"{Fore.CYAN}=== Backtest Setup ===") # Select trading pair config['symbol'] = select_trading_pair() # Select timeframe config['timeframe'] = select_timeframe() # Select date range print("1. Manual date range") print("2. Random date range (1 month in past year)") date_choice = input(f"{Fore.YELLOW}Select date range option (1-2): ") if date_choice == '1': start_date = input("Enter start date (YYYY-MM-DD): ") end_date = input("Enter end date (YYYY-MM-DD): ") try: start_date = datetime.strptime(start_date, '%Y-%m-%d') end_date = datetime.strptime(end_date, '%Y-%m-%d') except ValueError: log_status("Invalid date format. Using default range.", Fore.RED) start_date = datetime.now() - timedelta(days=365) end_date = datetime.now() else: end_date = datetime.now() - timedelta(days=random.randint(30, 365)) start_date = end_date - timedelta(days=30) log_status(f"Backtesting {config['symbol']} from {start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')} on timeframe {config['timeframe']}") best_params = await optimize_parameters(exchange, config['symbol'], config['timeframe'], start_date, end_date) df = await fetch_historical_data(exchange, config['symbol'], config['timeframe'], start_date, end_date) if df is None: log_status("Failed to fetch historical data.", Fore.RED) return config df = apply_indicators(df, best_params) result = backtest(df, best_params, config['symbol']) # Update config config.update(best_params) log_status(f"Updated config with best parameters: {best_params}") return config # Main program async def main(): init_db() exchanges = init_exchanges() config = { 'symbol': 'BNB/USDT', 'timeframe': '4h', 'sma_fast_window': 10, 'sma_slow_window': 20, 'rsi_window': 14, 'rsi_buy_threshold': 70, 'rsi_sell_threshold': 30, 'macd_fast': 12, 'macd_slow': 26, 'macd_signal': 9, 'atr_window': 14, 'atr_threshold': 1.0, 'sl_multiplier': 2.0, 'tp_multiplier': 3.0, 'bb_window': 20, 'bb_std': 2.0, 'stoch_k': 14, 'stoch_d': 3, 'stoch_smooth': 3, 'investment_percentage': 0.25, 'swing_threshold': 2.0, 'status_update_interval': 600 } while True: choice = show_menu() if choice == '1': log_status("Starting Live Trading...") await trading_bot(config, exchanges) elif choice == '2': log_status("Starting Backtest...") config = await run_backtest(exchanges, config) elif choice == '3': log_status("Exiting...") break else: log_status("Invalid choice. Please select 1, 2, or 3.", Fore.RED) for exchange in exchanges.values(): await exchange.close() # Run the program if __name__ == "__main__": log_status("Starting Trading Bot...") asyncio.run(main())

💬 ความคิดเห็น 0

🌟 ยังไม่มีความคิดเห็น — คุณจะเป็นคนแรก!

🔑 เข้าสู่ระบบ เพื่อแสดงความคิดเห็น

🔙 Back Home