fix: _on_candle_closed async 콜백 구조 수정 — asyncio.create_task 제거
동기 콜백 내부에서 asyncio.create_task()를 호출하면 이벤트 루프 컨텍스트 밖에서 실패하여 캔들 처리가 전혀 이루어지지 않는 버그 수정. - _on_candle_closed: 동기 → async, create_task → await - handle_message (KlineStream/MultiSymbolStream): 동기 → async, on_candle await - test_callback_called_on_closed_candle: AsyncMock + await handle_message로 수정 Made-with: Cursor
This commit is contained in:
@@ -1,4 +1,3 @@
|
|||||||
import asyncio
|
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
from src.config import Config
|
from src.config import Config
|
||||||
@@ -25,12 +24,12 @@ class TradingBot:
|
|||||||
on_candle=self._on_candle_closed,
|
on_candle=self._on_candle_closed,
|
||||||
)
|
)
|
||||||
|
|
||||||
def _on_candle_closed(self, candle: dict):
|
async def _on_candle_closed(self, candle: dict):
|
||||||
xrp_df = self.stream.get_dataframe(self.config.symbol)
|
xrp_df = self.stream.get_dataframe(self.config.symbol)
|
||||||
btc_df = self.stream.get_dataframe("BTCUSDT")
|
btc_df = self.stream.get_dataframe("BTCUSDT")
|
||||||
eth_df = self.stream.get_dataframe("ETHUSDT")
|
eth_df = self.stream.get_dataframe("ETHUSDT")
|
||||||
if xrp_df is not None:
|
if xrp_df is not None:
|
||||||
asyncio.create_task(self.process_candle(xrp_df, btc_df=btc_df, eth_df=eth_df))
|
await self.process_candle(xrp_df, btc_df=btc_df, eth_df=eth_df)
|
||||||
|
|
||||||
async def _recover_position(self) -> None:
|
async def _recover_position(self) -> None:
|
||||||
"""재시작 시 바이낸스에서 현재 포지션을 조회하여 상태 복구."""
|
"""재시작 시 바이낸스에서 현재 포지션을 조회하여 상태 복구."""
|
||||||
|
|||||||
@@ -40,12 +40,12 @@ class KlineStream:
|
|||||||
"is_closed": k["x"],
|
"is_closed": k["x"],
|
||||||
}
|
}
|
||||||
|
|
||||||
def handle_message(self, msg: dict):
|
async def handle_message(self, msg: dict):
|
||||||
candle = self.parse_kline(msg)
|
candle = self.parse_kline(msg)
|
||||||
if candle["is_closed"]:
|
if candle["is_closed"]:
|
||||||
self.buffer.append(candle)
|
self.buffer.append(candle)
|
||||||
if self.on_candle:
|
if self.on_candle:
|
||||||
self.on_candle(candle)
|
await self.on_candle(candle)
|
||||||
|
|
||||||
def get_dataframe(self) -> pd.DataFrame | None:
|
def get_dataframe(self) -> pd.DataFrame | None:
|
||||||
if len(self.buffer) < _MIN_CANDLES_FOR_SIGNAL:
|
if len(self.buffer) < _MIN_CANDLES_FOR_SIGNAL:
|
||||||
@@ -90,7 +90,7 @@ class KlineStream:
|
|||||||
) as stream:
|
) as stream:
|
||||||
while True:
|
while True:
|
||||||
msg = await stream.recv()
|
msg = await stream.recv()
|
||||||
self.handle_message(msg)
|
await self.handle_message(msg)
|
||||||
finally:
|
finally:
|
||||||
await client.close_connection()
|
await client.close_connection()
|
||||||
|
|
||||||
@@ -129,7 +129,7 @@ class MultiSymbolStream:
|
|||||||
"is_closed": k["x"],
|
"is_closed": k["x"],
|
||||||
}
|
}
|
||||||
|
|
||||||
def handle_message(self, msg: dict):
|
async def handle_message(self, msg: dict):
|
||||||
# Combined stream 메시지는 {"stream": "...", "data": {...}} 형태
|
# Combined stream 메시지는 {"stream": "...", "data": {...}} 형태
|
||||||
if "stream" in msg:
|
if "stream" in msg:
|
||||||
data = msg["data"]
|
data = msg["data"]
|
||||||
@@ -145,7 +145,7 @@ class MultiSymbolStream:
|
|||||||
if candle["is_closed"] and symbol in self.buffers:
|
if candle["is_closed"] and symbol in self.buffers:
|
||||||
self.buffers[symbol].append(candle)
|
self.buffers[symbol].append(candle)
|
||||||
if symbol == self.primary_symbol and self.on_candle:
|
if symbol == self.primary_symbol and self.on_candle:
|
||||||
self.on_candle(candle)
|
await self.on_candle(candle)
|
||||||
|
|
||||||
def get_dataframe(self, symbol: str) -> pd.DataFrame | None:
|
def get_dataframe(self, symbol: str) -> pd.DataFrame | None:
|
||||||
key = symbol.lower()
|
key = symbol.lower()
|
||||||
@@ -192,6 +192,6 @@ class MultiSymbolStream:
|
|||||||
async with bm.futures_multiplex_socket(streams) as stream:
|
async with bm.futures_multiplex_socket(streams) as stream:
|
||||||
while True:
|
while True:
|
||||||
msg = await stream.recv()
|
msg = await stream.recv()
|
||||||
self.handle_message(msg)
|
await self.handle_message(msg)
|
||||||
finally:
|
finally:
|
||||||
await client.close_connection()
|
await client.close_connection()
|
||||||
|
|||||||
@@ -63,11 +63,11 @@ async def test_kline_stream_parses_message():
|
|||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_callback_called_on_closed_candle():
|
async def test_callback_called_on_closed_candle():
|
||||||
received = []
|
callback = AsyncMock()
|
||||||
stream = KlineStream(
|
stream = KlineStream(
|
||||||
symbol="XRPUSDT",
|
symbol="XRPUSDT",
|
||||||
interval="1m",
|
interval="1m",
|
||||||
on_candle=lambda c: received.append(c),
|
on_candle=callback,
|
||||||
)
|
)
|
||||||
raw_msg = {
|
raw_msg = {
|
||||||
"k": {
|
"k": {
|
||||||
@@ -80,8 +80,8 @@ async def test_callback_called_on_closed_candle():
|
|||||||
"x": True,
|
"x": True,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
stream.handle_message(raw_msg)
|
await stream.handle_message(raw_msg)
|
||||||
assert len(received) == 1
|
assert callback.call_count == 1
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
|
|||||||
Reference in New Issue
Block a user