import ccxt.async_support as ccxt_async
import pandas as pd
import ta
from colorama import init, Fore, Style
import time
import logging
import sqlite3
import requests
import numpy as np
from datetime import datetime, timedelta
import os
from dotenv import load_dotenv
from tqdm import tqdm
import asyncio
from itertools import product
from telegram.ext import Application, CommandHandler, ContextTypes
from telegram import Update
# Load environment variables
load_dotenv()
# Setup logging
logging.basicConfig(filename='trading_bot.log', level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
# Initialize colorama
init(autoreset=True)
# Setup SQLite database
def init_db():
with sqlite3.connect('trades.db') as conn:
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS trades
(timestamp TEXT, exchange TEXT, symbol TEXT, type TEXT,
entry_price REAL, exit_price REAL, amount REAL, profit REAL,
mode TEXT, open_time TEXT, close_time TEXT, duration REAL, signals TEXT)''')
c.execute('''CREATE TABLE IF NOT EXISTS backtest_results
(timestamp TEXT, start_date TEXT, end_date TEXT, win_rate REAL,
total_profit REAL, total_trades INTEGER, wins INTEGER, losses INTEGER,
parameters TEXT, initial_capital REAL)''')
c.execute('''CREATE TABLE IF NOT EXISTS simulated_trades
(timestamp TEXT, symbol TEXT, type TEXT, entry_price REAL, exit_price REAL,
amount REAL, profit REAL, balance REAL)''')
conn.commit()
log_status("🗄️ Database initialized successfully")
# Log status to console, log file, and Telegram with custom header
last_alert_time = 0
def log_status(message, color=Fore.CYAN):
header = "❤️❤️Pu@Bot💋💋Ally V.2.1"
full_message = f"{header} | {message}"
print(f"{color}{full_message}{Style.RESET_ALL}")
logging.info(full_message)
send_telegram_alert(full_message)
# Telegram alert function with rate limiting
def send_telegram_alert(message):
global last_alert_time
if message.startswith("❤️❤️Pu@Bot💋💋Ally V.2.1 | Waiting for next check") or \
message.startswith("❤️❤️Pu@Bot💋💋Ally V.2.1 | No signal detected"):
return # Skip non-critical messages
current_time = time.time()
if current_time - last_alert_time < 10: # 10-second cooldown
time.sleep(10 - (current_time - last_alert_time))
retries = 3
for attempt in range(retries):
try:
url = f"https://api.telegram.org/bot{os.getenv('TELEGRAM_BOT_TOKEN')}/sendMessage"
payload = {'chat_id': os.getenv('TELEGRAM_CHAT_ID'), 'text': message}
response = requests.post(url, json=payload)
response.raise_for_status()
last_alert_time = time.time()
return
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429:
log_status(f"📡 Telegram Rate Limit Exceeded. Retrying in 10 seconds... (Attempt {attempt + 1}/{retries})", Fore.YELLOW)
time.sleep(10)
else:
log_status(f"❌ Telegram Alert Error: {str(e)}", Fore.RED)
break
except Exception as e:
log_status(f"❌ Telegram Alert Error: {str(e)}", Fore.RED)
break
# Initialize exchanges
def init_exchanges():
exchanges = {
'binance': ccxt_async.binance({
'apiKey': os.getenv('BINANCE_API_KEY'),
'secret': os.getenv('BINANCE_SECRET'),
'enableRateLimit': True,
})
}
log_status("🔗 Exchanges initialized successfully")
return exchanges
# Validate timeframe
def validate_timeframe(timeframe):
valid_timeframes = ['1m', '5m', '15m', '30m', '1h', '4h', '8h', '1d']
return timeframe in valid_timeframes
# Update prices for all trading pairs
async def update_all_prices(exchanges):
trading_pairs = [
'BTC/USDT', 'ETH/USDT', 'BNB/USDT', 'XRP/USDT', 'ADA/USDT',
'SOL/USDT', 'DOGE/USDT', 'DOT/USDT', 'MATIC/USDT', 'AVAX/USDT',
'LINK/USDT', 'LTC/USDT', 'BCH/USDT', 'ALGO/USDT', 'ATOM/USDT',
'XLM/USDT', 'TRX/USDT', 'ETC/USDT', 'VET/USDT', 'FTM/USDT'
]
while True:
try:
price_message = "📈 Current Prices:\n"
for symbol in trading_pairs:
ticker = await exchanges['binance'].fetch_ticker(symbol)
price = ticker['last']
price_message += f"{symbol}: {price:.2f} USDT\n"
log_status(price_message, Fore.CYAN)
await asyncio.sleep(3600) # Update every 1 hour
except Exception as e:
log_status(f"❌ Error updating prices: {str(e)}", Fore.RED)
await asyncio.sleep(60)
# Fetch historical data for backtesting
async def fetch_historical_data(exchange, symbol, timeframe, start_date, end_date):
if not validate_timeframe(timeframe):
log_status(f"❌ Invalid timeframe: {timeframe}. Supported timeframes: 1m, 5m, 15m, 30m, 1h, 4h, 8h, 1d", Fore.RED)
return None
log_status(f"📊 Loading historical OHLCV data for {symbol} from {start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')}...")
try:
since = int(start_date.timestamp() * 1000)
ohlcv = await exchange.fetch_ohlcv(symbol, timeframe, since=since, limit=1000)
df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
df = df[df['timestamp'] <= end_date]
log_status(f"✅ Historical data loaded successfully: {len(df)} candles")
return df
except Exception as e:
log_status(f"❌ Error fetching historical data: {str(e)}", Fore.RED)
return None
# Apply indicators
def apply_indicators(df, params):
log_status("🔍 Applying technical indicators...")
df['sma_fast'] = ta.trend.SMAIndicator(close=df['close'], window=params['sma_fast_window']).sma_indicator()
df['sma_slow'] = ta.trend.SMAIndicator(close=df['close'], window=params['sma_slow_window']).sma_indicator()
df['rsi'] = ta.momentum.RSIIndicator(close=df['close'], window=params['rsi_window']).rsi()
macd = ta.trend.MACD(close=df['close'], window_fast=params['macd_fast'],
window_slow=params['macd_slow'], window_sign=params['macd_signal'])
df['macd'] = macd.macd()
df['macd_signal'] = macd.macd_signal()
df['macd_hist'] = macd.macd_diff()
df['atr'] = ta.volatility.AverageTrueRange(high=df['high'], low=df['low'], close=df['close'], window=params['atr_window']).average_true_range()
bb = ta.volatility.BollingerBands(close=df['close'], window=params['bb_window'], window_dev=params['bb_std'])
df['bb_upper'] = bb.bollinger_hband()
df['bb_lower'] = bb.bollinger_lband()
df['bb_mid'] = bb.bollinger_mavg()
df['bb_percent'] = (df['close'] - df['bb_lower']) / (df['bb_upper'] - df['bb_lower'])
stoch = ta.momentum.StochasticOscillator(high=df['high'], low=df['low'], close=df['close'],
window=params['stoch_k'], smooth_window=params['stoch_smooth'])
df['stoch_k'] = stoch.stoch()
df['stoch_d'] = stoch.stoch_signal()
log_status("✅ Indicators applied successfully")
return df
# Evaluate signals
def evaluate_signals(df, params, exchange_name, symbol):
last_row = df.iloc[-1]
prev_row = df.iloc[-2]
signals = []
signal_desc = []
# 1. SMA Crossover
if prev_row['sma_fast'] < prev_row['sma_slow'] and last_row['sma_fast'] > last_row['sma_slow']:
signals.append('BUY_SMA_CROSS')
signal_desc.append("SMA Crossover (Fast > Slow)")
elif prev_row['sma_fast'] > prev_row['sma_slow'] and last_row['sma_fast'] < last_row['sma_slow']:
signals.append('SELL_SMA_CROSS')
signal_desc.append("SMA Crossover (Fast < Slow)")
# 2. SMA Position
if last_row['close'] > last_row['sma_slow']:
signals.append('BUY_SMA_POS')
signal_desc.append("Price > SMA Slow")
elif last_row['close'] < last_row['sma_slow']:
signals.append('SELL_SMA_POS')
signal_desc.append("Price < SMA Slow")
# 3. MACD Crossover
if prev_row['macd'] < prev_row['macd_signal'] and last_row['macd'] > last_row['macd_signal']:
signals.append('BUY_MACD_CROSS')
signal_desc.append("MACD > Signal Line")
elif prev_row['macd'] > prev_row['macd_signal'] and last_row['macd'] < last_row['macd_signal']:
signals.append('SELL_MACD_CROSS')
signal_desc.append("MACD < Signal Line")
# 4. MACD Histogram
if last_row['macd_hist'] > 0:
signals.append('BUY_MACD_HIST')
signal_desc.append("MACD Histogram > 0")
elif last_row['macd_hist'] < 0:
signals.append('SELL_MACD_HIST')
signal_desc.append("MACD Histogram < 0")
# 5. RSI Threshold
if last_row['rsi'] < params['rsi_buy_threshold']:
signals.append('BUY_RSI_THRESH')
signal_desc.append(f"RSI < {params['rsi_buy_threshold']}")
elif last_row['rsi'] > params['rsi_sell_threshold']:
signals.append('SELL_RSI_THRESH')
signal_desc.append(f"RSI > {params['rsi_sell_threshold']}")
# 6. RSI Trend
if last_row['rsi'] > prev_row['rsi']:
signals.append('BUY_RSI_TREND')
signal_desc.append("RSI Increasing")
elif last_row['rsi'] < prev_row['rsi']:
signals.append('SELL_RSI_TREND')
signal_desc.append("RSI Decreasing")
# 7. ATR Threshold
if last_row['atr'] > params['atr_threshold']:
signals.append('BUY_SELL_ATR_THRESH')
signal_desc.append(f"ATR > {params['atr_threshold']}")
# 8. ATR Trend
if last_row['atr'] > prev_row['atr']:
signals.append('BUY_SELL_ATR_TREND')
signal_desc.append("ATR Increasing")
# 9. Bollinger Bands Position
if last_row['close'] < last_row['bb_lower']:
signals.append('BUY_BB_POS')
signal_desc.append("Price < BB Lower")
elif last_row['close'] > last_row['bb_upper']:
signals.append('SELL_BB_POS')
signal_desc.append("Price > BB Upper")
# 10. Bollinger Bands %B
if last_row['bb_percent'] < 0.2:
signals.append('BUY_BB_PERCENT')
signal_desc.append("BB %B < 0.2")
elif last_row['bb_percent'] > 0.8:
signals.append('SELL_BB_PERCENT')
signal_desc.append("BB %B > 0.8")
# 11. Stochastic %K
if last_row['stoch_k'] < 20:
signals.append('BUY_STOCH_K')
signal_desc.append("Stochastic %K < 20")
elif last_row['stoch_k'] > 80:
signals.append('SELL_STOCH_K')
signal_desc.append("Stochastic %K > 80")
# 12. Stochastic Crossover
if last_row['stoch_k'] > last_row['stoch_d'] and prev_row['stoch_k'] <= prev_row['stoch_d']:
signals.append('BUY_STOCH_CROSS')
signal_desc.append("Stochastic %K > %D")
elif last_row['stoch_k'] < last_row['stoch_d'] and prev_row['stoch_k'] >= prev_row['stoch_d']:
signals.append('SELL_STOCH_CROSS')
signal_desc.append("Stochastic %K < %D")
buy_signals = [s for s in signals if s.startswith('BUY')]
sell_signals = [s for s in signals if s.startswith('SELL')]
buy_sell_signals = [s for s in signals if s.startswith('BUY_SELL')]
signal = None
if len(buy_signals) + len(buy_sell_signals) >= 6:
signal = 'BUY'
log_status(f"🚀 BUY signal detected on {exchange_name} at price {last_row['close']:.2f}. Signals: {', '.join(signal_desc)}", Fore.GREEN)
elif len(sell_signals) + len(buy_sell_signals) >= 6:
signal = 'SELL'
log_status(f"📉 SELL signal detected on {exchange_name} at price {last_row['close']:.2f}. Signals: {', '.join(signal_desc)}", Fore.RED)
else:
log_status("No signal detected")
return signal, signal_desc
# Backtest function
def backtest(df, params, symbol, initial_capital):
log_status(f"📊 Starting backtest for {symbol}...")
trades = []
position = None
balance = initial_capital
investment_percentage = params['investment_percentage']
for i in range(1, len(df)):
last_row = df.iloc[i]
current_price = last_row['close']
# Check signal
signal, signal_desc = evaluate_signals(df.iloc[:i+1], params, 'backtest', symbol)
# Open position
if signal and not position:
investment = balance * investment_percentage
amount = investment / current_price
if signal == 'BUY':
entry_price = current_price
sl_price = entry_price * (1 - 0.39) # SL at -39%
tp_price = entry_price * (1 + 0.25) # TP at +25%
position = {
'type': 'Long',
'entry_price': entry_price,
'sl_price': sl_price,
'tp_price': tp_price,
'amount': amount,
'open_time': df['timestamp'].iloc[i],
'signals': signal_desc
}
log_status(f"🚀 Opened Long position: Entry={entry_price:.2f}, SL={sl_price:.2f}, TP={tp_price:.2f}, Signals: {', '.join(signal_desc)}", Fore.GREEN)
elif signal == 'SELL':
entry_price = current_price
sl_price = entry_price * (1 + 0.39) # SL at +39%
tp_price = entry_price * (1 - 0.25) # TP at -25%
position = {
'type': 'Short',
'entry_price': entry_price,
'sl_price': sl_price,
'tp_price': tp_price,
'amount': amount,
'open_time': df['timestamp'].iloc[i],
'signals': signal_desc
}
log_status(f"📉 Opened Short position: Entry={entry_price:.2f}, SL={sl_price:.2f}, TP={tp_price:.2f}, Signals: {', '.join(signal_desc)}", Fore.RED)
# Check SL/TP
if position:
if position['type'] == 'Long' and (current_price <= position['sl_price'] or current_price >= position['tp_price']):
profit = (current_price - position['entry_price']) * position['amount']
close_time = df['timestamp'].iloc[i]
duration = (close_time - position['open_time']).total_seconds() / 3600 # Duration in hours
trades.append({
'type': 'Long',
'entry_price': position['entry_price'],
'exit_price': current_price,
'profit': profit,
'open_time': position['open_time'],
'close_time': close_time,
'duration': duration,
'signals': position['signals']
})
balance += profit
log_status(f"{'🟢' if profit > 0 else '🔴'} Closed Long position: Price={current_price:.2f}, Profit={profit:.2f} USDT, Duration={duration:.2f} hours, Signals: {', '.join(position['signals'])}",
Fore.GREEN if profit > 0 else Fore.RED)
with sqlite3.connect('trades.db') as conn:
c = conn.cursor()
c.execute("INSERT INTO trades VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'backtest', symbol, 'Long',
position['entry_price'], current_price, position['amount'], profit, 'backtest',
position['open_time'].strftime('%Y-%m-%d %H:%M:%S'),
close_time.strftime('%Y-%m-%d %H:%M:%S'), duration, ', '.join(position['signals'])))
conn.commit()
position = None
elif position['type'] == 'Short' and (current_price >= position['sl_price'] or current_price <= position['tp_price']):
profit = (position['entry_price'] - current_price) * position['amount']
close_time = df['timestamp'].iloc[i]
duration = (close_time - position['open_time']).total_seconds() / 3600 # Duration in hours
trades.append({
'type': 'Short',
'entry_price': position['entry_price'],
'exit_price': current_price,
'profit': profit,
'open_time': position['open_time'],
'close_time': close_time,
'duration': duration,
'signals': position['signals']
})
balance += profit
log_status(f"{'🟢' if profit > 0 else '🔴'} Closed Short position: Price={current_price:.2f}, Profit={profit:.2f} USDT, Duration={duration:.2f} hours, Signals: {', '.join(position['signals'])}",
Fore.GREEN if profit > 0 else Fore.RED)
with sqlite3.connect('trades.db') as conn:
c = conn.cursor()
c.execute("INSERT INTO trades VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'backtest', symbol, 'Short',
position['entry_price'], current_price, position['amount'], profit, 'backtest',
position['open_time'].strftime('%Y-%m-%d %H:%M:%S'),
close_time.strftime('%Y-%m-%d %H:%M:%S'), duration, ', '.join(position['signals'])))
conn.commit()
position = None
# Calculate metrics
total_trades = len(trades)
wins = sum(1 for trade in trades if trade['profit'] > 0)
losses = sum(1 for trade in trades if trade['profit'] <= 0)
win_rate = (wins / total_trades * 100) if total_trades > 0 else 0
total_profit = sum(trade['profit'] for trade in trades)
max_drawdown = calculate_max_drawdown(trades, initial_capital)
# Detailed trade summary
trade_summary = f"📊 Detailed Trade Summary for {symbol}:\n"
for i, trade in enumerate(trades, 1):
trade_summary += (f"Trade {i} ({trade['type']}):\n"
f" 📅 Open: {trade['open_time'].strftime('%Y-%m-%d %H:%M:%S')}\n"
f" 📅 Close: {trade['close_time'].strftime('%Y-%m-%d %H:%M:%S')}\n"
f" ⏰ Duration: {trade['duration']:.2f} hours\n"
f" 💰 Entry Price: {trade['entry_price']:.2f} USDT\n"
f" 💸 Exit Price: {trade['exit_price']:.2f} USDT\n"
f" {'🟢' if trade['profit'] > 0 else '🔴'} Profit/Loss: {trade['profit']:.2f} USDT\n"
f" 📋 Signals: {', '.join(trade['signals'])}\n")
trade_summary += (f"📊 Backtest Results:\n"
f" 💵 Initial Capital: {initial_capital:.2f} USDT\n"
f" 📈 Total Trades: {total_trades}\n"
f" 🏆 Wins: {wins} ({win_rate:.2f}%)\n"
f" 😔 Losses: {losses}\n"
f" 💰 Total Profit: {total_profit:.2f} USDT\n"
f" 💳 Final Balance: {balance:.2f} USDT\n"
f" 📉 Max Drawdown: {max_drawdown:.2f}%")
log_status(trade_summary, Fore.GREEN)
with sqlite3.connect('trades.db') as conn:
c = conn.cursor()
c.execute("INSERT INTO backtest_results VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
df['timestamp'].iloc[0].strftime('%Y-%m-%d'),
df['timestamp'].iloc[-1].strftime('%Y-%m-%d'),
win_rate, total_profit, total_trades, wins, losses, str(params), initial_capital))
conn.commit()
return {
'trades': trades,
'win_rate': win_rate,
'total_profit': total_profit,
'total_trades': total_trades,
'wins': wins,
'losses': losses,
'final_balance': balance,
'max_drawdown': max_drawdown
}
# Calculate max drawdown
def calculate_max_drawdown(trades, initial_balance):
balance = initial_balance
peak = initial_balance
max_drawdown = 0
for trade in trades:
balance += trade['profit']
peak = max(peak, balance)
drawdown = (peak - balance) / peak * 100
max_drawdown = max(max_drawdown, drawdown)
return max_drawdown
# Optimize parameters
async def optimize_parameters(exchange, symbol, timeframe, start_date, end_date):
if not validate_timeframe(timeframe):
log_status(f"❌ Invalid timeframe: {timeframe}. Supported timeframes: 1m, 5m, 15m, 30m, 1h, 4h, 8h, 1d", Fore.RED)
return None
log_status("🔧 Optimizing parameters...")
param_ranges = {
'sma_fast_window': [5, 8, 10, 12, 15],
'sma_slow_window': [20, 22, 25, 28, 30],
'rsi_window': [10, 12, 14, 17],
'rsi_buy_threshold': [60, 65, 70],
'rsi_sell_threshold': [20, 25, 30],
'macd_fast': [12, 14, 16],
'macd_slow': [26, 28, 30],
'macd_signal': [9, 12],
'atr_window': [14, 20],
'atr_threshold': [0.5, 1.0, 1.5],
'bb_window': [20, 25],
'bb_std': [2.0, 2.5],
'stoch_k': [14, 20],
'stoch_d': [3, 5],
'stoch_smooth': [3, 5],
'investment_percentage': [0.25]
}
param_combinations = list(product(
param_ranges['sma_fast_window'],
param_ranges['sma_slow_window'],
param_ranges['rsi_window'],
param_ranges['rsi_buy_threshold'],
param_ranges['rsi_sell_threshold'],
param_ranges['macd_fast'],
param_ranges['macd_slow'],
param_ranges['macd_signal'],
param_ranges['atr_window'],
param_ranges['atr_threshold'],
param_ranges['bb_window'],
param_ranges['bb_std'],
param_ranges['stoch_k'],
param_ranges['stoch_d'],
param_ranges['stoch_smooth'],
param_ranges['investment_percentage']
))
best_params = None
best_win_rate = 0
best_profit = 0
results = []
for combo in param_combinations[:50]: # Limit to 50 combinations
params = {
'sma_fast_window': combo[0],
'sma_slow_window': combo[1],
'rsi_window': combo[2],
'rsi_buy_threshold': combo[3],
'rsi_sell_threshold': combo[4],
'macd_fast': combo[5],
'macd_slow': combo[6],
'macd_signal': combo[7],
'atr_window': combo[8],
'atr_threshold': combo[9],
'bb_window': combo[10],
'bb_std': combo[11],
'stoch_k': combo[12],
'stoch_d': combo[13],
'stoch_smooth': combo[14],
'investment_percentage': combo[15]
}
log_status(f"🔍 Testing parameters: {params}")
df = await fetch_historical_data(exchange, symbol, timeframe, start_date, end_date)
if df is None:
continue
df = apply_indicators(df, params)
result = backtest(df, params, symbol, 10000) # Default capital for optimization
results.append({
'params': params,
'win_rate': result['win_rate'],
'total_profit': result['total_profit'],
'total_trades': result['total_trades'],
'max_drawdown': result['max_drawdown']
})
if result['win_rate'] > best_win_rate or (result['win_rate'] == best_win_rate and result['total_profit'] > best_profit):
best_win_rate = result['win_rate']
best_profit = result['total_profit']
best_params = params
log_status(f"🏆 New best parameters found: Win Rate={best_win_rate:.2f}%, Profit={best_profit:.2f}")
# Save results to SQLite
with sqlite3.connect('trades.db') as conn:
c = conn.cursor()
for result in results:
c.execute("INSERT INTO backtest_results VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
start_date.strftime('%Y-%m-%d'),
end_date.strftime('%Y-%m-%d'),
result['win_rate'],
result['total_profit'],
result['total_trades'],
result['total_trades'] - result['win_rate'] * result['total_trades'] / 100,
result['win_rate'] * result['total_trades'] / 100,
str(result['params']),
10000))
conn.commit()
log_status("✅ Parameter optimization completed")
return best_params
# Fetch OHLCV data
async def get_ohlcv(exchange, symbol, timeframe, params):
if not validate_timeframe(timeframe):
log_status(f"❌ Invalid timeframe: {timeframe}. Supported timeframes: 1m, 5m, 15m, 30m, 1h, 4h, 8h, 1d", Fore.RED)
return None
log_status(f"📊 Loading live OHLCV data for {symbol}...")
try:
ohlcv = await exchange.fetch_ohlcv(symbol, timeframe, limit=100)
df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
df = apply_indicators(df, params)
log_status("✅ Live OHLCV data loaded successfully")
return df
except ccxt_async.NetworkError as e:
log_status(f"❌ Network Error fetching OHLCV: {str(e)}", Fore.RED)
return None
except ccxt_async.InsufficientFunds as e:
log_status(f"❌ Insufficient Funds: {str(e)}", Fore.RED)
return None
except Exception as e:
log_status(f"❌ Unexpected Error fetching OHLCV: {str(e)}", Fore.RED)
return None
# Check trading signal
def check_signal(df, exchange_name, symbol, params):
if df is None:
return None, []
signal, signal_desc = evaluate_signals(df, params, exchange_name, symbol)
return signal, signal_desc
# Progress bar
def display_progress_bar():
for _ in tqdm(range(10), desc="Analyzing", ncols=50):
time.sleep(0.2)
log_status("✅ Analysis completed")
# Update status function
async def update_status(exchanges, positions, config, last_prices, balance, mode='live'):
for exchange_name, exchange in exchanges.items():
log_status(f"🔄 Updating status for {exchange_name}...")
try:
ticker = await exchange.fetch_ticker(config['symbol'])
current_price = ticker['last']
if exchange_name in last_prices:
price_change = abs(current_price - last_prices[exchange_name]) / last_prices[exchange_name] * 100
if price_change > config['swing_threshold']:
message = (f"⚠️ Price Swing Detected on {exchange_name}: "
f"{config['symbol']} changed by {price_change:.2f}% "
f"(from {last_prices[exchange_name]:.2f} to {current_price:.2f})")
log_status(message, Fore.YELLOW)
last_prices[exchange_name] = current_price
unrealized_pnl = 0
if exchange_name in positions:
pos = positions[exchange_name]
if pos['type'] == 'Long':
unrealized_pnl = (current_price - pos['entry_price']) * pos['amount']
elif pos['type'] == 'Short':
unrealized_pnl = (pos['entry_price'] - current_price) * pos['amount']
status_message = (f"📊 Status Update for {exchange_name} ({mode}):\n"
f" Symbol: {config['symbol']}\n"
f" Current Price: {current_price:.2f} USDT\n"
f" Balance: {balance:.2f} USDT\n"
f" Unrealized PNL: {unrealized_pnl:.2f} USDT")
log_status(status_message, Fore.CYAN)
except Exception as e:
log_status(f"❌ Status Update Error on {exchange_name}: {str(e)}", Fore.RED)
return balance
# Simulated trading
async def simulate_trading(config, exchanges, initial_capital):
init_db()
config['symbol'] = config.get('symbol', 'BNB/USDT')
config['timeframe'] = config.get('timeframe', '4h')
if not validate_timeframe(config['timeframe']):
log_status(f"❌ Invalid timeframe: {config['timeframe']}. Supported timeframes: 1m, 5m, 15m, 30m, 1h, 4h, 8h, 1d", Fore.RED)
return
log_status(f"🚀 Starting simulated trading with {config['symbol']} on timeframe {config['timeframe']} with initial capital {initial_capital} USDT...")
positions = {}
last_prices = {}
balance = initial_capital
last_status_update = time.time()
while True:
try:
current_time = time.time()
if current_time - last_status_update >= config['status_update_interval']:
balance = await update_status(exchanges, positions, config, last_prices, balance, mode='simulated')
last_status_update = current_time
for exchange_name, exchange in exchanges.items():
print(f"\n{Fore.CYAN}=== Checking {exchange_name} (Simulated) ===")
display_progress_bar()
df = await get_ohlcv(exchange, config['symbol'], config['timeframe'], config)
if df is None:
continue
current_price = df['close'].iloc[-1]
last_prices[exchange_name] = current_price
signal, signal_desc = check_signal(df, exchange_name, config['symbol'], config)
if signal:
print(f"{Fore.RED}🚨 Signal Detected: {signal} on {exchange_name}! 🚨")
log_status(f"🚨 Signal Detected: {signal} on {exchange_name} at price {current_price:.2f}. Signals: {', '.join(signal_desc)}", Fore.RED)
investment = balance * config['investment_percentage']
amount = investment / current_price
if signal == 'BUY' and exchange_name not in positions:
entry_price = current_price
sl_price = entry_price * (1 - 0.39) # SL at -39%
tp_price = entry_price * (1 + 0.25) # TP at +25%
positions[exchange_name] = {
'type': 'Long',
'entry_price': entry_price,
'sl_price': sl_price,
'tp_price': tp_price,
'amount': amount,
'open_time': df['timestamp'].iloc[-1],
'signals': signal_desc
}
log_status(f"🚀 Opened Simulated Long Position on {exchange_name}: Entry={entry_price:.2f}, SL={sl_price:.2f}, TP={tp_price:.2f}, Amount={amount:.4f}, Signals: {', '.join(signal_desc)}", Fore.GREEN)
elif signal == 'SELL' and exchange_name not in positions:
entry_price = current_price
sl_price = entry_price * (1 + 0.39) # SL at +39%
tp_price = entry_price * (1 - 0.25) # TP at -25%
positions[exchange_name] = {
'type': 'Short',
'entry_price': entry_price,
'sl_price': sl_price,
'tp_price': tp_price,
'amount': amount,
'open_time': df['timestamp'].iloc[-1],
'signals': signal_desc
}
log_status(f"📉 Opened Simulated Short Position on {exchange_name}: Entry={entry_price:.2f}, SL={sl_price:.2f}, TP={tp_price:.2f}, Amount={amount:.4f}, Signals: {', '.join(signal_desc)}", Fore.RED)
if exchange_name in positions:
pos = positions[exchange_name]
print(f"{Fore.MAGENTA}Position Status: {pos['type']} | Entry: {pos['entry_price']:.2f} | Current: {current_price:.2f} | SL: {pos['sl_price']:.2f} | TP: {pos['tp_price']:.2f}")
with sqlite3.connect('trades.db') as conn:
c = conn.cursor()
if pos['type'] == 'Long' and (current_price <= pos['sl_price'] or current_price >= pos['tp_price']):
profit = (current_price - pos['entry_price']) * pos['amount']
close_time = datetime.now()
duration = (close_time - pos['open_time']).total_seconds() / 3600
balance += profit
c.execute("INSERT INTO simulated_trades VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), config['symbol'], 'Long',
pos['entry_price'], current_price, pos['amount'], profit, balance))
conn.commit()
log_status(f"{'🟢' if profit > 0 else '🔴'} Closed Simulated Long Position on {exchange_name}: Price={current_price:.2f}, Profit={profit:.2f}, Balance={balance:.2f}, Duration={duration:.2f} hours, Signals: {', '.join(pos['signals'])}",
Fore.GREEN if profit > 0 else Fore.RED)
del positions[exchange_name]
elif pos['type'] == 'Short' and (current_price >= pos['sl_price'] or current_price <= pos['tp_price']):
profit = (pos['entry_price'] - current_price) * pos['amount']
close_time = datetime.now()
duration = (close_time - pos['open_time']).total_seconds() / 3600
balance += profit
c.execute("INSERT INTO simulated_trades VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), config['symbol'], 'Short',
pos['entry_price'], current_price, pos['amount'], profit, balance))
conn.commit()
log_status(f"{'🟢' if profit > 0 else '🔴'} Closed Simulated Short Position on {exchange_name}: Price={current_price:.2f}, Profit={profit:.2f}, Balance={balance:.2f}, Duration={duration:.2f} hours, Signals: {', '.join(pos['signals'])}",
Fore.GREEN if profit > 0 else Fore.RED)
del positions[exchange_name]
log_status(f"⏳ Waiting for next check on {exchange_name}...")
await asyncio.sleep(300)
except Exception as e:
log_status(f"❌ Error: {str(e)}", Fore.RED)
await asyncio.sleep(60)
# Telegram command handlers
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
await update.message.reply_text(
"❤️❤️Pu@Bot💋💋Ally V.2.1 | Welcome! 🎉\n"
"Available commands:\n"
"/price - Get current prices of all trading pairs 📈\n"
"/simulate <symbol> <timeframe> <capital> - Start simulated trading 🚀\n"
"/backtest <symbol> <timeframe> <capital> - Run backtest 📊\n"
"/stop - Stop the bot 🛑\n"
"Supported timeframes: 1m, 5m, 15m, 30m, 1h, 4h, 8h, 1d"
)
async def price(update: Update, context: ContextTypes.DEFAULT_TYPE):
exchanges = init_exchanges()
trading_pairs = [
'BTC/USDT', 'ETH/USDT', 'BNB/USDT', 'XRP/USDT', 'ADA/USDT',
'SOL/USDT', 'DOGE/USDT', 'DOT/USDT', 'MATIC/USDT', 'AVAX/USDT',
'LINK/USDT', 'LTC/USDT', 'BCH/USDT', 'ALGO/USDT', 'ATOM/USDT',
'XLM/USDT', 'TRX/USDT', 'ETC/USDT', 'VET/USDT', 'FTM/USDT'
]
price_message = "📈 Current Prices:\n"
for symbol in trading_pairs:
try:
ticker = await exchanges['binance'].fetch_ticker(symbol)
price = ticker['last']
price_message += f"{symbol}: {price:.2f} USDT\n"
except Exception as e:
price_message += f"{symbol}: Error fetching price\n"
log_status(price_message, Fore.CYAN)
await update.message.reply_text(price_message)
await exchanges['binance'].close()
async def simulate(update: Update, context: ContextTypes.DEFAULT_TYPE):
args = context.args
if len(args) != 3:
await update.message.reply_text(
"Usage: /simulate <symbol> <timeframe> <capital>\n"
"Example: /simulate BTC/USDT 1h 10000\n"
"Supported timeframes: 1m, 5m, 15m, 30m, 1h, 4h, 8h, 1d"
)
return
symbol, timeframe, capital = args
valid_pairs = [
'BTC/USDT', 'ETH/USDT', 'BNB/USDT', 'XRP/USDT', 'ADA/USDT',
'SOL/USDT', 'DOGE/USDT', 'DOT/USDT', 'MATIC/USDT', 'AVAX/USDT',
'LINK/USDT', 'LTC/USDT', 'BCH/USDT', 'ALGO/USDT', 'ATOM/USDT',
'XLM/USDT', 'TRX/USDT', 'ETC/USDT', 'VET/USDT', 'FTM/USDT'
]
valid_timeframes = ['1m', '5m', '15m', '30m', '1h', '4h', '8h', '1d']
try:
capital = float(capital)
if symbol not in valid_pairs:
await update.message.reply_text(f"Invalid symbol. Choose from: {', '.join(valid_pairs)}")
return
if timeframe not in valid_timeframes:
await update.message.reply_text(f"Invalid timeframe. Choose from: {', '.join(valid_timeframes)}")
return
if capital <= 0:
await update.message.reply_text("Capital must be greater than 0")
return
config = {
'symbol': symbol,
'timeframe': timeframe,
'sma_fast_window': 10,
'sma_slow_window': 20,
'rsi_window': 14,
'rsi_buy_threshold': 70,
'rsi_sell_threshold': 30,
'macd_fast': 12,
'macd_slow': 26,
'macd_signal': 9,
'atr_window': 14,
'atr_threshold': 1.0,
'bb_window': 20,
'bb_std': 2.0,
'stoch_k': 14,
'stoch_d': 3,
'stoch_smooth': 3,
'investment_percentage': 0.25,
'swing_threshold': 2.0,
'status_update_interval': 600
}
exchanges = init_exchanges()
await update.message.reply_text(f"🚀 Starting simulated trading for {symbol} on {timeframe} with {capital} USDT")
await simulate_trading(config, exchanges, capital)
await exchanges['binance'].close()
except ValueError:
await update.message.reply_text("Invalid capital. Please enter a number.")
async def backtest_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
args = context.args
if len(args) != 3:
await update.message.reply_text(
"Usage: /backtest <symbol> <timeframe> <capital>\n"
"Example: /backtest BTC/USDT 1h 10000\n"
"Supported timeframes: 1m, 5m, 15m, 30m, 1h, 4h, 8h, 1d"
)
return
symbol, timeframe, capital = args
valid_pairs = [
'BTC/USDT', 'ETH/USDT', 'BNB/USDT', 'XRP/USDT', 'ADA/USDT',
'SOL/USDT', 'DOGE/USDT', 'DOT/USDT', 'MATIC/USDT', 'AVAX/USDT',
'LINK/USDT', 'LTC/USDT', 'BCH/USDT', 'ALGO/USDT', 'ATOM/USDT',
'XLM/USDT', 'TRX/USDT', 'ETC/USDT', 'VET/USDT', 'FTM/USDT'
]
valid_timeframes = ['1m', '5m', '15m', '30m', '1h', '4h', '8h', '1d']
try:
capital = float(capital)
if symbol not in valid_pairs:
await update.message.reply_text(f"Invalid symbol. Choose from: {', '.join(valid_pairs)}")
return
if timeframe not in valid_timeframes:
await update.message.reply_text(f"Invalid timeframe. Choose from: {', '.join(valid_timeframes)}")
return
if capital <= 0:
await update.message.reply_text("Capital must be greater than 0")
return
config = {
'symbol': symbol,
'timeframe': timeframe,
'sma_fast_window': 10,
'sma_slow_window': 20,
'rsi_window': 14,
'rsi_buy_threshold': 70,
'rsi_sell_threshold': 30,
'macd_fast': 12,
'macd_slow': 26,
'macd_signal': 9,
'atr_window': 14,
'atr_threshold': 1.0,
'bb_window': 20,
'bb_std': 2.0,
'stoch_k': 14,
'stoch_d': 3,
'stoch_smooth': 3,
'investment_percentage': 0.25,
'swing_threshold': 2.0,
'status_update_interval': 600
}
exchanges = init_exchanges()
end_date = datetime.now()
start_date = end_date - timedelta(days=30)
log_status(f"📊 Running backtest for {symbol} on {timeframe} with {capital} USDT")
best_params = await optimize_parameters(exchanges['binance'], symbol, timeframe, start_date, end_date)
if best_params is None:
await update.message.reply_text(f"Backtest failed due to invalid timeframe: {timeframe}")
return
df = await fetch_historical_data(exchanges['binance'], symbol, timeframe, start_date, end_date)
if df is None:
await update.message.reply_text("Failed to fetch historical data.")
return
df = apply_indicators(df, best_params)
result = backtest(df, best_params, symbol, capital)
result_message = (f"📊 Backtest Results for {symbol} on {timeframe}:\n"
f"💵 Initial Capital: {capital:.2f} USDT\n"
f"📈 Total Trades: {result['total_trades']}\n"
f"🏆 Wins: {result['wins']} ({result['win_rate']:.2f}%)\n"
f"😔 Losses: {result['losses']}\n"
f"💰 Total Profit: {result['total_profit']:.2f} USDT\n"
f"💳 Final Balance: {result['final_balance']:.2f} USDT\n"
f"📉 Max Drawdown: {result['max_drawdown']:.2f}%\n"
f"🔍 Detailed trade summary sent to Telegram and log.")
await update.message.reply_text(result_message)
await exchanges['binance'].close()
except ValueError:
await update.message.reply_text("Invalid capital. Please enter a number.")
async def stop(update: Update, context: ContextTypes.DEFAULT_TYPE):
log_status("🛑 Stopping bot...", Fore.RED)
await update.message.reply_text("Bot stopped.")
os._exit(0)
# Select trading pair (for console)
def select_trading_pair():
trading_pairs = {
'1': 'BTC/USDT', '2': 'ETH/USDT', '3': 'BNB/USDT', '4': 'XRP/USDT', '5': 'ADA/USDT',
'6': 'SOL/USDT', '7': 'DOGE/USDT', '8': 'DOT/USDT', '9': 'MATIC/USDT', '10': 'AVAX/USDT',
'11': 'LINK/USDT', '12': 'LTC/USDT', '13': 'BCH/USDT', '14': 'ALGO/USDT', '15': 'ATOM/USDT',
'16': 'XLM/USDT', '17': 'TRX/USDT', '18': 'ETC/USDT', '19': 'VET/USDT', '20': 'FTM/USDT'
}
print(f"{Fore.CYAN}=== Select Trading Pair ===")
for key, value in trading_pairs.items():
print(f"{key}. {value}")
choice = input(f"{Fore.YELLOW}Select trading pair (1-20): ")
symbol = trading_pairs.get(choice, 'BNB/USDT') # Default to BNB/USDT
log_status(f"✅ Selected trading pair: {symbol}")
return symbol
# Select timeframe (for console)
def select_timeframe():
timeframes = {
'1': '15m', '2': '30m', '3': '1h', '4': '4h', '5': '8h', '6': '1d'
}
print(f"{Fore.CYAN}=== Select Timeframe ===")
print("1. 15 minutes\n2. 30 minutes\n3. 1 hour\n4. 4 hours\n5. 8 hours\n6. 1 day")
choice = input(f"{Fore.YELLOW}Select timeframe (1-6): ")
timeframe = timeframes.get(choice, '4h') # Default to 4h
log_status(f"✅ Selected timeframe: {timeframe}")
return timeframe
# Show menu (for console)
def show_menu():
print(f"{Fore.CYAN}=== Trading Bot Menu ===")
print("1. Live Trading\n2. Backtest\n3. Simulated Trading\n4. Exit")
choice = input(f"{Fore.YELLOW}Select an option (1-4): ")
log_status(f"🔘 Menu option selected: {choice}")
return choice
# Main program
async def main():
init_db()
exchanges = init_exchanges()
# Setup Telegram bot
application = Application.builder().token(os.getenv('TELEGRAM_BOT_TOKEN')).build()
application.add_handler(CommandHandler("start", start))
application.add_handler(CommandHandler("price", price))
application.add_handler(CommandHandler("simulate", simulate))
application.add_handler(CommandHandler("backtest", backtest_command))
application.add_handler(CommandHandler("stop", stop))
await application.initialize()
await application.start()
await application.updater.start_polling()
# Start price update task
asyncio.create_task(update_all_prices(exchanges))
config = {
'symbol': 'BNB/USDT',
'timeframe': '4h',
'sma_fast_window': 10,
'sma_slow_window': 20,
'rsi_window': 14,
'rsi_buy_threshold': 70,
'rsi_sell_threshold': 30,
'macd_fast': 12,
'macd_slow': 26,
'macd_signal': 9,
'atr_window': 14,
'atr_threshold': 1.0,
'bb_window': 20,
'bb_std': 2.0,
'stoch_k': 14,
'stoch_d': 3,
'stoch_smooth': 3,
'investment_percentage': 0.25,
'swing_threshold': 2.0,
'status_update_interval': 600
}
while True:
choice = show_menu()
if choice == '1':
log_status("🚀 Starting Live Trading...")
config['symbol'] = select_trading_pair()
config['timeframe'] = select_timeframe()
log_status("❌ Live trading not implemented. Use /simulate or /backtest via Telegram.", Fore.RED)
elif choice == '2':
log_status("📊 Starting Backtest...")
config['symbol'] = select_trading_pair()
config['timeframe'] = select_timeframe()
try:
initial_capital = float(input(f"{Fore.YELLOW}Enter initial capital (USDT): "))
if initial_capital <= 0:
log_status("❌ Capital must be greater than 0", Fore.RED)
continue
except ValueError:
log_status("❌ Invalid capital. Please enter a number.", Fore.RED)
continue
end_date = datetime.now()
start_date = end_date - timedelta(days=30)
best_params = await optimize_parameters(exchanges['binance'], config['symbol'], config['timeframe'], start_date, end_date)
if best_params is None:
log_status("❌ Backtest failed due to invalid timeframe.", Fore.RED)
continue
df = await fetch_historical_data(exchanges['binance'], config['symbol'], config['timeframe'], start_date, end_date)
if df is None:
log_status("❌ Failed to fetch historical data.", Fore.RED)
continue
df = apply_indicators(df, best_params)
result = backtest(df, best_params, config['symbol'], initial_capital)
config.update(best_params)
log_status("✅ Backtest completed. Returning to menu...", Fore.CYAN)
elif choice == '3':
log_status("🚀 Starting Simulated Trading...")
config['symbol'] = select_trading_pair()
config['timeframe'] = select_timeframe()
try:
initial_capital = float(input(f"{Fore.YELLOW}Enter initial capital (USDT): "))
if initial_capital <= 0:
log_status("❌ Capital must be greater than 0", Fore.RED)
continue
except ValueError:
log_status("❌ Invalid capital. Please enter a number.", Fore.RED)
continue
await simulate_trading(config, exchanges, initial_capital)
elif choice == '4':
log_status("🛑 Exiting...")
break
else:
log_status("❌ Invalid choice. Please select 1, 2, 3, or 4.", Fore.RED)
for exchange in exchanges.values():
await exchange.close()
await application.stop()
await application.updater.stop()
# Run the program
if __name__ == "__main__":
log_status("🚀 Starting Trading Bot...")
asyncio.run(main())