From 69b5675bfd4349a066d7f8b82264e0c1cfcfc7c6 Mon Sep 17 00:00:00 2001 From: 21in7 Date: Sun, 1 Mar 2026 12:52:40 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20WebSocket=20=EC=8B=A4=EC=8B=9C=EA=B0=84?= =?UTF-8?q?=20=EC=BA=94=EB=93=A4=20=EC=8A=A4=ED=8A=B8=EB=A6=BC=20=EA=B5=AC?= =?UTF-8?q?=ED=98=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Made-with: Cursor --- src/data_stream.py | 64 +++++++++++++++++++++++++++++++++++++++ tests/test_data_stream.py | 46 ++++++++++++++++++++++++++++ 2 files changed, 110 insertions(+) create mode 100644 src/data_stream.py create mode 100644 tests/test_data_stream.py diff --git a/src/data_stream.py b/src/data_stream.py new file mode 100644 index 0000000..336386f --- /dev/null +++ b/src/data_stream.py @@ -0,0 +1,64 @@ +import asyncio +from collections import deque +from typing import Callable +import pandas as pd +from binance import AsyncClient, BinanceSocketManager +from loguru import logger + + +class KlineStream: + def __init__( + self, + symbol: str, + interval: str = "1m", + buffer_size: int = 200, + on_candle: Callable = None, + ): + self.symbol = symbol.lower() + self.interval = interval + self.buffer: deque = deque(maxlen=buffer_size) + self.on_candle = on_candle + + def parse_kline(self, msg: dict) -> dict: + k = msg["k"] + return { + "timestamp": k["t"], + "open": float(k["o"]), + "high": float(k["h"]), + "low": float(k["l"]), + "close": float(k["c"]), + "volume": float(k["v"]), + "is_closed": k["x"], + } + + def handle_message(self, msg: dict): + candle = self.parse_kline(msg) + if candle["is_closed"]: + self.buffer.append(candle) + if self.on_candle: + self.on_candle(candle) + + def get_dataframe(self) -> pd.DataFrame | None: + if len(self.buffer) < 50: + return None + df = pd.DataFrame(list(self.buffer)) + df.set_index("timestamp", inplace=True) + return df + + async def start(self, api_key: str, api_secret: str): + client = await AsyncClient.create( + api_key=api_key, + api_secret=api_secret, + ) + bm = BinanceSocketManager(client) + stream_name = f"{self.symbol}@kline_{self.interval}" + logger.info(f"WebSocket 스트림 시작: {stream_name}") + try: + async with bm.futures_kline_socket( + symbol=self.symbol.upper(), interval=self.interval + ) as stream: + while True: + msg = await stream.recv() + self.handle_message(msg) + finally: + await client.close_connection() diff --git a/tests/test_data_stream.py b/tests/test_data_stream.py new file mode 100644 index 0000000..8b92a66 --- /dev/null +++ b/tests/test_data_stream.py @@ -0,0 +1,46 @@ +import pytest +import asyncio +from unittest.mock import AsyncMock, patch, MagicMock +from src.data_stream import KlineStream + + +@pytest.mark.asyncio +async def test_kline_stream_parses_message(): + stream = KlineStream(symbol="XRPUSDT", interval="1m") + raw_msg = { + "k": { + "t": 1700000000000, + "o": "0.5000", + "h": "0.5100", + "l": "0.4900", + "c": "0.5050", + "v": "100000", + "x": True, + } + } + candle = stream.parse_kline(raw_msg) + assert candle["close"] == 0.5050 + assert candle["is_closed"] is True + + +@pytest.mark.asyncio +async def test_callback_called_on_closed_candle(): + received = [] + stream = KlineStream( + symbol="XRPUSDT", + interval="1m", + on_candle=lambda c: received.append(c), + ) + raw_msg = { + "k": { + "t": 1700000000000, + "o": "0.5", + "h": "0.51", + "l": "0.49", + "c": "0.505", + "v": "100000", + "x": True, + } + } + stream.handle_message(raw_msg) + assert len(received) == 1