Files
cointrader/tests/test_data_stream.py
21in7 d1af736bfc feat: implement BTC/ETH correlation features for improved model accuracy
- Added a new design document outlining the integration of BTC/ETH candle data as additional features in the XRP ML filter, enhancing prediction accuracy.
- Introduced `MultiSymbolStream` for combined WebSocket data retrieval of XRP, BTC, and ETH.
- Expanded feature set from 13 to 21 by including 8 new BTC/ETH-related features.
- Updated various scripts and modules to support the new feature set and data handling.
- Enhanced training and deployment scripts to accommodate the new dataset structure.

This commit lays the groundwork for improved model performance by leveraging the correlation between BTC and ETH with XRP.
2026-03-01 19:30:17 +09:00

104 lines
2.9 KiB
Python

import pytest
import asyncio
from unittest.mock import AsyncMock, patch, MagicMock
from src.data_stream import KlineStream
from src.data_stream import MultiSymbolStream
def test_multi_symbol_stream_has_three_buffers():
stream = MultiSymbolStream(
symbols=["XRPUSDT", "BTCUSDT", "ETHUSDT"],
interval="1m",
)
assert "xrpusdt" in stream.buffers
assert "btcusdt" in stream.buffers
assert "ethusdt" in stream.buffers
def test_multi_symbol_stream_get_dataframe_returns_none_when_empty():
stream = MultiSymbolStream(
symbols=["XRPUSDT", "BTCUSDT", "ETHUSDT"],
interval="1m",
)
assert stream.get_dataframe("XRPUSDT") is None
def test_multi_symbol_stream_get_dataframe_returns_df_when_full():
import pandas as pd
stream = MultiSymbolStream(
symbols=["XRPUSDT", "BTCUSDT", "ETHUSDT"],
interval="1m",
buffer_size=200,
)
candle = {
"timestamp": 1000, "open": 1.0, "high": 1.1,
"low": 0.9, "close": 1.05, "volume": 100.0, "is_closed": True,
}
for i in range(50):
c = candle.copy()
c["timestamp"] = 1000 + i
stream.buffers["xrpusdt"].append(c)
df = stream.get_dataframe("XRPUSDT")
assert df is not None
assert len(df) == 50
@pytest.mark.asyncio
async def test_kline_stream_parses_message():
stream = KlineStream(symbol="XRPUSDT", interval="1m")
raw_msg = {
"k": {
"t": 1700000000000,
"o": "0.5000",
"h": "0.5100",
"l": "0.4900",
"c": "0.5050",
"v": "100000",
"x": True,
}
}
candle = stream.parse_kline(raw_msg)
assert candle["close"] == 0.5050
assert candle["is_closed"] is True
@pytest.mark.asyncio
async def test_callback_called_on_closed_candle():
received = []
stream = KlineStream(
symbol="XRPUSDT",
interval="1m",
on_candle=lambda c: received.append(c),
)
raw_msg = {
"k": {
"t": 1700000000000,
"o": "0.5",
"h": "0.51",
"l": "0.49",
"c": "0.505",
"v": "100000",
"x": True,
}
}
stream.handle_message(raw_msg)
assert len(received) == 1
@pytest.mark.asyncio
async def test_preload_history_fills_buffer():
stream = KlineStream(symbol="XRPUSDT", interval="1m", buffer_size=200)
# REST API 응답 형식: [open_time, open, high, low, close, volume, ...]
fake_klines = [
[1700000000000 + i * 60000, "0.5", "0.51", "0.49", "0.505", "100000",
0, "0", "0", "0", "0", "0"]
for i in range(201) # 201개 반환 → 마지막 1개 제외 → 200개 버퍼
]
mock_client = AsyncMock()
mock_client.futures_klines.return_value = fake_klines
await stream._preload_history(mock_client, limit=200)
assert len(stream.buffer) == 200
assert stream.get_dataframe() is not None