From 4940de16fcbfecdd7f31abb39ce23f98b906558e Mon Sep 17 00:00:00 2001 From: 21in7 Date: Sun, 1 Mar 2026 13:02:42 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20=EB=B4=87=20=EC=8B=9C=EC=9E=91=20?= =?UTF-8?q?=EC=8B=9C=20=EA=B3=BC=EA=B1=B0=20=EC=BA=94=EB=93=A4=20200?= =?UTF-8?q?=EA=B0=9C=20=ED=94=84=EB=A6=AC=EB=A1=9C=EB=93=9C=20(=EC=A6=89?= =?UTF-8?q?=EC=8B=9C=20=EC=8B=A0=ED=98=B8=20=EA=B3=84=EC=82=B0=20=EA=B0=80?= =?UTF-8?q?=EB=8A=A5)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Made-with: Cursor --- src/data_stream.py | 26 ++++++++++++++++++++++++++ tests/test_data_stream.py | 20 ++++++++++++++++++++ 2 files changed, 46 insertions(+) diff --git a/src/data_stream.py b/src/data_stream.py index 256ee79..46117ae 100644 --- a/src/data_stream.py +++ b/src/data_stream.py @@ -45,11 +45,37 @@ class KlineStream: df.set_index("timestamp", inplace=True) return df + async def _preload_history(self, client: AsyncClient, limit: int = 200): + """REST API로 과거 캔들 데이터를 버퍼에 미리 채운다.""" + logger.info(f"과거 캔들 {limit}개 로드 중...") + loop = asyncio.get_event_loop() + klines = await loop.run_in_executor( + None, + lambda: client.futures_klines( + symbol=self.symbol.upper(), + interval=self.interval, + limit=limit, + ), + ) + # 마지막 캔들은 아직 닫히지 않았을 수 있으므로 제외 + for k in klines[:-1]: + self.buffer.append({ + "timestamp": k[0], + "open": float(k[1]), + "high": float(k[2]), + "low": float(k[3]), + "close": float(k[4]), + "volume": float(k[5]), + "is_closed": True, + }) + logger.info(f"과거 캔들 {len(self.buffer)}개 로드 완료 — 즉시 신호 계산 가능") + async def start(self, api_key: str, api_secret: str): client = await AsyncClient.create( api_key=api_key, api_secret=api_secret, ) + await self._preload_history(client) bm = BinanceSocketManager(client) stream_name = f"{self.symbol}@kline_{self.interval}" logger.info(f"WebSocket 스트림 시작: {stream_name}") diff --git a/tests/test_data_stream.py b/tests/test_data_stream.py index 8b92a66..82a6fa0 100644 --- a/tests/test_data_stream.py +++ b/tests/test_data_stream.py @@ -44,3 +44,23 @@ async def test_callback_called_on_closed_candle(): } stream.handle_message(raw_msg) assert len(received) == 1 + + +@pytest.mark.asyncio +async def test_preload_history_fills_buffer(): + stream = KlineStream(symbol="XRPUSDT", interval="1m", buffer_size=200) + + # REST API 응답 형식: [open_time, open, high, low, close, volume, ...] + fake_klines = [ + [1700000000000 + i * 60000, "0.5", "0.51", "0.49", "0.505", "100000", + 0, "0", "0", "0", "0", "0"] + for i in range(201) # 201개 반환 → 마지막 1개 제외 → 200개 버퍼 + ] + + mock_client = MagicMock() + mock_client.futures_klines.return_value = fake_klines + + await stream._preload_history(mock_client, limit=200) + + assert len(stream.buffer) == 200 + assert stream.get_dataframe() is not None