⏰ Time:
🌡️ Temp:
Loading...
📌 ❤️❤️Pu@Bot💋💋Ally V.2.0
โดย puk
•2025-09-02 00:12
import ccxt.async_support as ccxt_async
import pandas as pd
import ta
from colorama import init, Fore, Style
import time
import logging
import sqlite3
import requests
import numpy as np
from datetime import datetime, timedelta
import os
from dotenv import load_dotenv
from tqdm import tqdm
import asyncio
import random
from itertools import product
# Load environment variables
load_dotenv()
# Setup logging
logging.basicConfig(filename='trading_bot.log', level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
# Initialize colorama
init(autoreset=True)
# Setup SQLite database
def init_db():
with sqlite3.connect('trades.db') as conn:
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS trades
(timestamp TEXT, exchange TEXT, symbol TEXT, type TEXT,
entry_price REAL, exit_price REAL, amount REAL, profit REAL)''')
c.execute('''CREATE TABLE IF NOT EXISTS backtest_results
(timestamp TEXT, start_date TEXT, end_date TEXT, win_rate REAL,
total_profit REAL, total_trades INTEGER, parameters TEXT)''')
conn.commit()
log_status("Database initialized successfully")
# Log status to console, log file, and Telegram with custom header
last_alert_time = 0
def log_status(message, color=Fore.CYAN):
header = "❤️❤️Pu@Bot💋💋Ally V.2.1"
full_message = f"{header} | {message}"
print(f"{color}{full_message}{Style.RESET_ALL}")
logging.info(full_message)
send_telegram_alert(full_message)
# Telegram alert function with rate limiting
def send_telegram_alert(message):
global last_alert_time
if message.startswith("❤️❤️Pu@Bot💋💋Ally V.2.1 | Waiting for next check"):
return # Skip non-critical messages
current_time = time.time()
if current_time - last_alert_time < 10: # 10-second cooldown
time.sleep(10 - (current_time - last_alert_time))
retries = 3
for attempt in range(retries):
try:
url = f"https://api.telegram.org/bot{os.getenv('TELEGRAM_BOT_TOKEN')}/sendMessage"
payload = {'chat_id': os.getenv('TELEGRAM_CHAT_ID'), 'text': message}
response = requests.post(url, json=payload)
response.raise_for_status()
last_alert_time = time.time()
return
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429:
log_status(f"Telegram Rate Limit Exceeded. Retrying in 10 seconds... (Attempt {attempt + 1}/{retries})", Fore.YELLOW)
time.sleep(10)
else:
log_status(f"Telegram Alert Error: {str(e)}", Fore.RED)
break
except Exception as e:
log_status(f"Telegram Alert Error: {str(e)}", Fore.RED)
break
# Calculate Fibonacci levels
def calculate_fibonacci_levels(high, low):
diff = high - low
return {
'0.0%': low,
'23.6%': low + diff * 0.236,
'38.2%': low + diff * 0.382,
'50.0%': low + diff * 0.50,
'61.8%': low + diff * 0.618,
'78.6%': low + diff * 0.786,
'100.0%': high,
'127.2%': high + diff * 0.272,
'161.8%': high + diff * 0.618
}
# Initialize exchanges
def init_exchanges():
exchanges = {
'binance': ccxt_async.binance({
'apiKey': os.getenv('BINANCE_API_KEY'),
'secret': os.getenv('BINANCE_SECRET'),
'enableRateLimit': True,
})
}
log_status("Exchanges initialized successfully")
return exchanges
# Fetch historical data for backtesting
async def fetch_historical_data(exchange, symbol, timeframe, start_date, end_date):
log_status(f"Loading historical OHLCV data for {symbol} from {start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')}...")
try:
since = int(start_date.timestamp() * 1000)
ohlcv = await exchange.fetch_ohlcv(symbol, timeframe, since=since, limit=1000)
df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
df = df[df['timestamp'] <= end_date]
log_status(f"Historical data loaded successfully: {len(df)} candles")
return df
except Exception as e:
log_status(f"Error fetching historical data: {str(e)}", Fore.RED)
return None
# Apply indicators
def apply_indicators(df, params):
log_status("Applying technical indicators...")
df['sma_fast'] = ta.trend.SMAIndicator(close=df['close'], window=params['sma_fast_window']).sma_indicator()
df['sma_slow'] = ta.trend.SMAIndicator(close=df['close'], window=params['sma_slow_window']).sma_indicator()
df['rsi'] = ta.momentum.RSIIndicator(close=df['close'], window=params['rsi_window']).rsi()
macd = ta.trend.MACD(close=df['close'], window_fast=params['macd_fast'],
window_slow=params['macd_slow'], window_sign=params['macd_signal'])
df['macd'] = macd.macd()
df['macd_signal'] = macd.macd_signal()
df['macd_hist'] = macd.macd_diff()
df['atr'] = ta.volatility.AverageTrueRange(high=df['high'], low=df['low'], close=df['close'], window=params['atr_window']).average_true_range()
bb = ta.volatility.BollingerBands(close=df['close'], window=params['bb_window'], window_dev=params['bb_std'])
df['bb_upper'] = bb.bollinger_hband()
df['bb_lower'] = bb.bollinger_lband()
df['bb_mid'] = bb.bollinger_mavg()
df['bb_percent'] = (df['close'] - df['bb_lower']) / (df['bb_upper'] - df['bb_lower'])
stoch = ta.momentum.StochasticOscillator(high=df['high'], low=df['low'], close=df['close'],
window=params['stoch_k'], smooth_window=params['stoch_smooth'])
df['stoch_k'] = stoch.stoch()
df['stoch_d'] = stoch.stoch_signal()
log_status("Indicators applied successfully")
return df
# Evaluate signals
def evaluate_signals(df, params, exchange_name, symbol):
last_row = df.iloc[-1]
prev_row = df.iloc[-2]
signals = []
signal_desc = []
# 1. SMA Crossover
if prev_row['sma_fast'] < prev_row['sma_slow'] and last_row['sma_fast'] > last_row['sma_slow']:
signals.append('BUY_SMA_CROSS')
signal_desc.append("SMA Crossover (Fast > Slow)")
elif prev_row['sma_fast'] > prev_row['sma_slow'] and last_row['sma_fast'] < last_row['sma_slow']:
signals.append('SELL_SMA_CROSS')
signal_desc.append("SMA Crossover (Fast < Slow)")
# 2. SMA Position
if last_row['close'] > last_row['sma_slow']:
signals.append('BUY_SMA_POS')
signal_desc.append("Price > SMA Slow")
elif last_row['close'] < last_row['sma_slow']:
signals.append('SELL_SMA_POS')
signal_desc.append("Price < SMA Slow")
# 3. MACD Crossover
if prev_row['macd'] < prev_row['macd_signal'] and last_row['macd'] > last_row['macd_signal']:
signals.append('BUY_MACD_CROSS')
signal_desc.append("MACD > Signal Line")
elif prev_row['macd'] > prev_row['macd_signal'] and last_row['macd'] < last_row['macd_signal']:
signals.append('SELL_MACD_CROSS')
signal_desc.append("MACD < Signal Line")
# 4. MACD Histogram
if last_row['macd_hist'] > 0:
signals.append('BUY_MACD_HIST')
signal_desc.append("MACD Histogram > 0")
elif last_row['macd_hist'] < 0:
signals.append('SELL_MACD_HIST')
signal_desc.append("MACD Histogram < 0")
# 5. RSI Threshold
if last_row['rsi'] < params['rsi_buy_threshold']:
signals.append('BUY_RSI_THRESH')
signal_desc.append(f"RSI < {params['rsi_buy_threshold']}")
elif last_row['rsi'] > params['rsi_sell_threshold']:
signals.append('SELL_RSI_THRESH')
signal_desc.append(f"RSI > {params['rsi_sell_threshold']}")
# 6. RSI Trend
if last_row['rsi'] > prev_row['rsi']:
signals.append('BUY_RSI_TREND')
signal_desc.append("RSI Increasing")
elif last_row['rsi'] < prev_row['rsi']:
signals.append('SELL_RSI_TREND')
signal_desc.append("RSI Decreasing")
# 7. ATR Threshold
if last_row['atr'] > params['atr_threshold']:
signals.append('BUY_SELL_ATR_THRESH')
signal_desc.append(f"ATR > {params['atr_threshold']}")
# 8. ATR Trend
if last_row['atr'] > prev_row['atr']:
signals.append('BUY_SELL_ATR_TREND')
signal_desc.append("ATR Increasing")
# 9. Bollinger Bands Position
if last_row['close'] < last_row['bb_lower']:
signals.append('BUY_BB_POS')
signal_desc.append("Price < BB Lower")
elif last_row['close'] > last_row['bb_upper']:
signals.append('SELL_BB_POS')
signal_desc.append("Price > BB Upper")
# 10. Bollinger Bands %B
if last_row['bb_percent'] < 0.2:
signals.append('BUY_BB_PERCENT')
signal_desc.append("BB %B < 0.2")
elif last_row['bb_percent'] > 0.8:
signals.append('SELL_BB_PERCENT')
signal_desc.append("BB %B > 0.8")
# 11. Stochastic %K
if last_row['stoch_k'] < 20:
signals.append('BUY_STOCH_K')
signal_desc.append("Stochastic %K < 20")
elif last_row['stoch_k'] > 80:
signals.append('SELL_STOCH_K')
signal_desc.append("Stochastic %K > 80")
# 12. Stochastic Crossover
if last_row['stoch_k'] > last_row['stoch_d'] and prev_row['stoch_k'] <= prev_row['stoch_d']:
signals.append('BUY_STOCH_CROSS')
signal_desc.append("Stochastic %K > %D")
elif last_row['stoch_k'] < last_row['stoch_d'] and prev_row['stoch_k'] >= prev_row['stoch_d']:
signals.append('SELL_STOCH_CROSS')
signal_desc.append("Stochastic %K < %D")
buy_signals = [s for s in signals if s.startswith('BUY')]
sell_signals = [s for s in signals if s.startswith('SELL')]
buy_sell_signals = [s for s in signals if s.startswith('BUY_SELL')]
signal = None
if len(buy_signals) + len(buy_sell_signals) >= 6:
signal = 'BUY'
log_status(f"BUY signal detected on {exchange_name} at price {last_row['close']}. Signals: {', '.join(signal_desc)}", Fore.GREEN)
elif len(sell_signals) + len(buy_sell_signals) >= 6:
signal = 'SELL'
log_status(f"SELL signal detected on {exchange_name} at price {last_row['close']}. Signals: {', '.join(signal_desc)}", Fore.RED)
else:
log_status("No signal detected")
return signal, signal_desc
# Backtest function
def backtest(df, params, symbol):
log_status("Starting backtest...")
trades = []
position = None
balance = 10000 # Starting balance in USDT
investment_percentage = params['investment_percentage']
for i in range(1, len(df)):
last_row = df.iloc[i]
current_price = last_row['close']
atr = last_row['atr']
# Calculate Fibonacci levels
high = df['high'].iloc[max(0, i-20):i+1].max()
low = df['low'].iloc[max(0, i-20):i+1].min()
fib_levels = calculate_fibonacci_levels(high, low)
# Check signal
signal, _ = evaluate_signals(df.iloc[:i+1], params, 'backtest', symbol)
# Open position
if signal and not position:
investment = balance * investment_percentage
amount = investment / current_price
if signal == 'BUY':
entry_price = current_price
sl_price = entry_price - (atr * params['sl_multiplier'])
tp_price = fib_levels['61.8%'] if fib_levels['61.8%'] > entry_price else entry_price + (atr * params['tp_multiplier'])
position = {'type': 'Long', 'entry_price': entry_price, 'sl_price': sl_price, 'tp_price': tp_price, 'amount': amount}
log_status(f"Opened Long position: Entry={entry_price}, SL={sl_price}, TP={tp_price}")
elif signal == 'SELL':
entry_price = current_price
sl_price = entry_price + (atr * params['sl_multiplier'])
tp_price = fib_levels['38.2%'] if fib_levels['38.2%'] < entry_price else entry_price - (atr * params['tp_multiplier'])
position = {'type': 'Short', 'entry_price': entry_price, 'sl_price': sl_price, 'tp_price': tp_price, 'amount': amount}
log_status(f"Opened Short position: Entry={entry_price}, SL={sl_price}, TP={tp_price}")
# Check SL/TP
if position:
if position['type'] == 'Long' and (current_price <= position['sl_price'] or current_price >= position['tp_price']):
profit = (current_price - position['entry_price']) * position['amount']
trades.append({'type': 'Long', 'entry_price': position['entry_price'], 'exit_price': current_price, 'profit': profit})
balance += profit
log_status(f"Closed Long position: Price={current_price}, Profit={profit}", Fore.YELLOW)
position = None
elif position['type'] == 'Short' and (current_price >= position['sl_price'] or current_price <= position['tp_price']):
profit = (position['entry_price'] - current_price) * position['amount']
trades.append({'type': 'Short', 'entry_price': position['entry_price'], 'exit_price': current_price, 'profit': profit})
balance += profit
log_status(f"Closed Short position: Price={current_price}, Profit={profit}", Fore.YELLOW)
position = None
# Calculate metrics
total_trades = len(trades)
wins = sum(1 for trade in trades if trade['profit'] > 0)
win_rate = (wins / total_trades * 100) if total_trades > 0 else 0
total_profit = sum(trade['profit'] for trade in trades)
max_drawdown = calculate_max_drawdown(trades, 10000)
result_message = (f"=== Backtest Results ===\n"
f"Total Trades: {total_trades}\n"
f"Win Rate: {win_rate:.2f}%\n"
f"Total Profit: {total_profit:.2f} USDT\n"
f"Final Balance: {balance:.2f} USDT\n"
f"Max Drawdown: {max_drawdown:.2f}%")
log_status(result_message, Fore.GREEN)
return {
'trades': trades,
'win_rate': win_rate,
'total_profit': total_profit,
'total_trades': total_trades,
'final_balance': balance,
'max_drawdown': max_drawdown
}
# Calculate max drawdown
def calculate_max_drawdown(trades, initial_balance):
balance = initial_balance
peak = initial_balance
max_drawdown = 0
for trade in trades:
balance += trade['profit']
peak = max(peak, balance)
drawdown = (peak - balance) / peak * 100
max_drawdown = max(max_drawdown, drawdown)
return max_drawdown
# Optimize parameters
async def optimize_parameters(exchange, symbol, timeframe, start_date, end_date):
log_status("Optimizing parameters...")
param_ranges = {
'sma_fast_window': [5, 8, 10, 12, 15],
'sma_slow_window': [20, 22, 25, 28, 30],
'rsi_window': [10, 12, 14, 17],
'rsi_buy_threshold': [60, 65, 70],
'rsi_sell_threshold': [20, 25, 30],
'macd_fast': [12, 14, 16],
'macd_slow': [26, 28, 30],
'macd_signal': [9, 12],
'atr_window': [14, 20],
'atr_threshold': [0.5, 1.0, 1.5],
'sl_multiplier': [1.5, 2.0, 2.5],
'tp_multiplier': [2.0, 3.0, 4.0],
'bb_window': [20, 25],
'bb_std': [2.0, 2.5],
'stoch_k': [14, 20],
'stoch_d': [3, 5],
'stoch_smooth': [3, 5],
'investment_percentage': [0.25]
}
param_combinations = list(product(
param_ranges['sma_fast_window'],
param_ranges['sma_slow_window'],
param_ranges['rsi_window'],
param_ranges['rsi_buy_threshold'],
param_ranges['rsi_sell_threshold'],
param_ranges['macd_fast'],
param_ranges['macd_slow'],
param_ranges['macd_signal'],
param_ranges['atr_window'],
param_ranges['atr_threshold'],
param_ranges['sl_multiplier'],
param_ranges['tp_multiplier'],
param_ranges['bb_window'],
param_ranges['bb_std'],
param_ranges['stoch_k'],
param_ranges['stoch_d'],
param_ranges['stoch_smooth'],
param_ranges['investment_percentage']
))
best_params = None
best_win_rate = 0
best_profit = 0
results = []
for combo in param_combinations[:50]: # Limit to 50 combinations
params = {
'sma_fast_window': combo[0],
'sma_slow_window': combo[1],
'rsi_window': combo[2],
'rsi_buy_threshold': combo[3],
'rsi_sell_threshold': combo[4],
'macd_fast': combo[5],
'macd_slow': combo[6],
'macd_signal': combo[7],
'atr_window': combo[8],
'atr_threshold': combo[9],
'sl_multiplier': combo[10],
'tp_multiplier': combo[11],
'bb_window': combo[12],
'bb_std': combo[13],
'stoch_k': combo[14],
'stoch_d': combo[15],
'stoch_smooth': combo[16],
'investment_percentage': combo[17]
}
log_status(f"Testing parameters: {params}")
df = await fetch_historical_data(exchange, symbol, timeframe, start_date, end_date)
if df is None:
continue
df = apply_indicators(df, params)
result = backtest(df, params, symbol)
results.append({
'params': params,
'win_rate': result['win_rate'],
'total_profit': result['total_profit'],
'total_trades': result['total_trades'],
'max_drawdown': result['max_drawdown']
})
if result['win_rate'] > best_win_rate or (result['win_rate'] == best_win_rate and result['total_profit'] > best_profit):
best_win_rate = result['win_rate']
best_profit = result['total_profit']
best_params = params
log_status(f"New best parameters found: Win Rate={best_win_rate:.2f}%, Profit={best_profit:.2f}")
# Save results to SQLite
with sqlite3.connect('trades.db') as conn:
c = conn.cursor()
for result in results:
c.execute("INSERT INTO backtest_results VALUES (?, ?, ?, ?, ?, ?, ?)",
(datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
start_date.strftime('%Y-%m-%d'),
end_date.strftime('%Y-%m-%d'),
result['win_rate'],
result['total_profit'],
result['total_trades'],
str(result['params'])))
conn.commit()
log_status("Parameter optimization completed")
return best_params
# Fetch OHLCV data
async def get_ohlcv(exchange, symbol, timeframe, params):
log_status(f"Loading live OHLCV data for {symbol}...")
try:
ohlcv = await exchange.fetch_ohlcv(symbol, timeframe, limit=100)
df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
df = apply_indicators(df, params)
log_status("Live OHLCV data loaded successfully")
return df
except ccxt_async.NetworkError as e:
log_status(f"Network Error fetching OHLCV: {str(e)}", Fore.RED)
return None
except ccxt_async.InsufficientFunds as e:
log_status(f"Insufficient Funds: {str(e)}", Fore.RED)
return None
except Exception as e:
log_status(f"Unexpected Error fetching OHLCV: {str(e)}", Fore.RED)
return None
# Check trading signal
def check_signal(df, exchange_name, symbol, params):
if df is None:
return None
signal, signal_desc = evaluate_signals(df, params, exchange_name, symbol)
return signal
# Progress bar
def display_progress_bar():
for _ in tqdm(range(10), desc="Analyzing", ncols=50):
time.sleep(0.2)
log_status("Analysis completed")
# Update status function
async def update_status(exchanges, positions, config, last_prices):
for exchange_name, exchange in exchanges.items():
log_status(f"Updating status for {exchange_name}...")
try:
ticker = await exchange.fetch_ticker(config['symbol'])
current_price = ticker['last']
if exchange_name in last_prices:
price_change = abs(current_price - last_prices[exchange_name]) / last_prices[exchange_name] * 100
if price_change > config['swing_threshold']:
message = (f"⚠️ Price Swing Detected on {exchange_name}: "
f"{config['symbol']} changed by {price_change:.2f}% "
f"(from {last_prices[exchange_name]:.2f} to {current_price:.2f})")
log_status(message, Fore.YELLOW)
last_prices[exchange_name] = current_price
balance = (await exchange.fetch_balance())['total']['USDT']
unrealized_pnl = 0
if exchange_name in positions:
pos = positions[exchange_name]
if pos['type'] == 'Long':
unrealized_pnl = (current_price - pos['entry_price']) * pos['amount']
elif pos['type'] == 'Short':
unrealized_pnl = (pos['entry_price'] - current_price) * pos['amount']
status_message = (f"📊 Status Update for {exchange_name}:\n"
f" Symbol: {config['symbol']}\n"
f" Current Price: {current_price:.2f} USDT\n"
f" Balance: {balance:.2f} USDT\n"
f" Unrealized PNL: {unrealized_pnl:.2f} USDT")
log_status(status_message, Fore.CYAN)
except Exception as e:
log_status(f"Status Update Error on {exchange_name}: {str(e)}", Fore.RED)
# Select timeframe
def select_timeframe():
timeframes = {
'1': '15m',
'2': '30m',
'3': '45m',
'4': '1h',
'5': '4h',
'6': '8h',
'7': '1d'
}
print(f"{Fore.CYAN}=== Select Timeframe ===")
print("1. 15 minutes")
print("2. 30 minutes")
print("3. 45 minutes")
print("4. 1 hour")
print("5. 4 hours")
print("6. 8 hours")
print("7. 1 day")
choice = input(f"{Fore.YELLOW}Select timeframe (1-7): ")
timeframe = timeframes.get(choice, '4h') # Default to 4h if invalid
log_status(f"Selected timeframe: {timeframe}")
return timeframe
# Select trading pair
def select_trading_pair():
trading_pairs = {
'1': 'BTC/USDT', '2': 'ETH/USDT', '3': 'BNB/USDT', '4': 'XRP/USDT', '5': 'ADA/USDT',
'6': 'SOL/USDT', '7': 'DOGE/USDT', '8': 'DOT/USDT', '9': 'MATIC/USDT', '10': 'AVAX/USDT',
'11': 'LINK/USDT', '12': 'LTC/USDT', '13': 'BCH/USDT', '14': 'ALGO/USDT', '15': 'ATOM/USDT',
'16': 'XLM/USDT', '17': 'TRX/USDT', '18': 'ETC/USDT', '19': 'VET/USDT', '20': 'FTM/USDT'
}
print(f"{Fore.CYAN}=== Select Trading Pair ===")
for key, value in trading_pairs.items():
print(f"{key}. {value}")
choice = input(f"{Fore.YELLOW}Select trading pair (1-20): ")
symbol = trading_pairs.get(choice, 'BNB/USDT') # Default to BNB/USDT if invalid
log_status(f"Selected trading pair: {symbol}")
return symbol
# Main trading bot
async def trading_bot(config, exchanges):
init_db()
config['symbol'] = select_trading_pair() # Select trading pair
config['timeframe'] = select_timeframe() # Select timeframe
log_status(f"Starting live trading with {config['symbol']} on timeframe {config['timeframe']}...")
positions = {}
last_prices = {}
last_status_update = time.time()
while True:
try:
current_time = time.time()
if current_time - last_status_update >= config['status_update_interval']:
await update_status(exchanges, positions, config, last_prices)
last_status_update = current_time
for exchange_name, exchange in exchanges.items():
print(f"\n{Fore.CYAN}=== Checking {exchange_name} ===")
display_progress_bar()
df = await get_ohlcv(exchange, config['symbol'], config['timeframe'], config)
if df is None:
continue
current_price = df['close'].iloc[-1]
atr = df['atr'].iloc[-1]
last_prices[exchange_name] = current_price
signal = check_signal(df, exchange_name, config['symbol'], config)
if signal:
print(f"{Fore.RED}🚨 Signal Detected: {signal} on {exchange_name}! 🚨")
log_status(f"Signal Detected: {signal} on {exchange_name} at price {current_price}", Fore.RED)
high = df['high'].iloc[-20:].max()
low = df['low'].iloc[-20:].min()
fib_levels = calculate_fibonacci_levels(high, low)
balance = (await exchange.fetch_balance())['total']['USDT']
investment = balance * config['investment_percentage']
amount = investment / current_price
if signal == 'BUY' and exchange_name not in positions:
entry_price = current_price
sl_price = entry_price - (atr * config['sl_multiplier'])
tp_price = fib_levels['61.8%'] if fib_levels['61.8%'] > entry_price else entry_price + (atr * config['tp_multiplier'])
try:
log_status(f"Opening Long position on {exchange_name}...")
order = await exchange.create_market_buy_order(config['symbol'], amount)
positions[exchange_name] = {
'type': 'Long',
'entry_price': entry_price,
'sl_price': sl_price,
'tp_price': tp_price,
'amount': amount,
'order_id': order['id']
}
log_status(f"Opened Long Position on {exchange_name}: Entry={entry_price}, SL={sl_price}, TP={tp_price}, Amount={amount}", Fore.GREEN)
except ccxt_async.InsufficientFunds:
log_status(f"Insufficient funds on {exchange_name}", Fore.RED)
continue
elif signal == 'SELL' and exchange_name not in positions:
entry_price = current_price
sl_price = entry_price + (atr * config['sl_multiplier'])
tp_price = fib_levels['38.2%'] if fib_levels['38.2%'] < entry_price else entry_price - (atr * config['tp_multiplier'])
try:
log_status(f"Opening Short position on {exchange_name}...")
order = await exchange.create_market_sell_order(config['symbol'], amount)
positions[exchange_name] = {
'type': 'Short',
'entry_price': entry_price,
'sl_price': sl_price,
'tp_price': tp_price,
'amount': amount,
'order_id': order['id']
}
log_status(f"Opened Short Position on {exchange_name}: Entry={entry_price}, SL={sl_price}, TP={tp_price}, Amount={amount}", Fore.RED)
except ccxt_async.InsufficientFunds:
log_status(f"Insufficient funds on {exchange_name}", Fore.RED)
continue
if exchange_name in positions:
pos = positions[exchange_name]
print(f"{Fore.MAGENTA}Position Status: {pos['type']} | Entry: {pos['entry_price']} | Current: {current_price} | SL: {pos['sl_price']} | TP: {pos['tp_price']}")
with sqlite3.connect('trades.db') as conn:
c = conn.cursor()
if pos['type'] == 'Long' and (current_price <= pos['sl_price'] or current_price >= pos['tp_price']):
log_status(f"Closing Long position on {exchange_name}...")
await exchange.create_market_sell_order(config['symbol'], pos['amount'])
profit = (current_price - pos['entry_price']) * pos['amount']
c.execute("INSERT INTO trades VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), exchange_name, config['symbol'], 'Long',
pos['entry_price'], current_price, pos['amount'], profit))
conn.commit()
log_status(f"Closed Long Position on {exchange_name}: Price={current_price}, Profit={profit}", Fore.YELLOW)
del positions[exchange_name]
elif pos['type'] == 'Short' and (current_price >= pos['sl_price'] or current_price <= pos['tp_price']):
log_status(f"Closing Short position on {exchange_name}...")
await exchange.create_market_buy_order(config['symbol'], pos['amount'])
profit = (pos['entry_price'] - current_price) * pos['amount']
c.execute("INSERT INTO trades VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), exchange_name, config['symbol'], 'Short',
pos['entry_price'], current_price, pos['amount'], profit))
conn.commit()
log_status(f"Closed Short Position on {exchange_name}: Price={current_price}, Profit={profit}", Fore.YELLOW)
del positions[exchange_name]
log_status(f"Waiting for next check on {exchange_name}...")
await asyncio.sleep(300)
except Exception as e:
log_status(f"Error: {str(e)}", Fore.RED)
await asyncio.sleep(60)
finally:
for exchange in exchanges.values():
await exchange.close()
# Show menu
def show_menu():
print(f"{Fore.CYAN}=== Trading Bot Menu ===")
print("1. Live Trading")
print("2. Backtest")
print("3. Exit")
choice = input(f"{Fore.YELLOW}Select an option (1-3): ")
log_status(f"Menu option selected: {choice}")
return choice
# Run backtest with user input
async def run_backtest(exchanges, config):
exchange = exchanges['binance']
log_status("Starting backtest setup...")
print(f"{Fore.CYAN}=== Backtest Setup ===")
# Select trading pair
config['symbol'] = select_trading_pair()
# Select timeframe
config['timeframe'] = select_timeframe()
# Select date range
print("1. Manual date range")
print("2. Random date range (1 month in past year)")
date_choice = input(f"{Fore.YELLOW}Select date range option (1-2): ")
if date_choice == '1':
start_date = input("Enter start date (YYYY-MM-DD): ")
end_date = input("Enter end date (YYYY-MM-DD): ")
try:
start_date = datetime.strptime(start_date, '%Y-%m-%d')
end_date = datetime.strptime(end_date, '%Y-%m-%d')
except ValueError:
log_status("Invalid date format. Using default range.", Fore.RED)
start_date = datetime.now() - timedelta(days=365)
end_date = datetime.now()
else:
end_date = datetime.now() - timedelta(days=random.randint(30, 365))
start_date = end_date - timedelta(days=30)
log_status(f"Backtesting {config['symbol']} from {start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')} on timeframe {config['timeframe']}")
best_params = await optimize_parameters(exchange, config['symbol'], config['timeframe'], start_date, end_date)
df = await fetch_historical_data(exchange, config['symbol'], config['timeframe'], start_date, end_date)
if df is None:
log_status("Failed to fetch historical data.", Fore.RED)
return config
df = apply_indicators(df, best_params)
result = backtest(df, best_params, config['symbol'])
# Update config
config.update(best_params)
log_status(f"Updated config with best parameters: {best_params}")
return config
# Main program
async def main():
init_db()
exchanges = init_exchanges()
config = {
'symbol': 'BNB/USDT',
'timeframe': '4h',
'sma_fast_window': 10,
'sma_slow_window': 20,
'rsi_window': 14,
'rsi_buy_threshold': 70,
'rsi_sell_threshold': 30,
'macd_fast': 12,
'macd_slow': 26,
'macd_signal': 9,
'atr_window': 14,
'atr_threshold': 1.0,
'sl_multiplier': 2.0,
'tp_multiplier': 3.0,
'bb_window': 20,
'bb_std': 2.0,
'stoch_k': 14,
'stoch_d': 3,
'stoch_smooth': 3,
'investment_percentage': 0.25,
'swing_threshold': 2.0,
'status_update_interval': 600
}
while True:
choice = show_menu()
if choice == '1':
log_status("Starting Live Trading...")
await trading_bot(config, exchanges)
elif choice == '2':
log_status("Starting Backtest...")
config = await run_backtest(exchanges, config)
elif choice == '3':
log_status("Exiting...")
break
else:
log_status("Invalid choice. Please select 1, 2, or 3.", Fore.RED)
for exchange in exchanges.values():
await exchange.close()
# Run the program
if __name__ == "__main__":
log_status("Starting Trading Bot...")
asyncio.run(main())