diff --git a/app/damination.py b/app/damination.py index 371ed7e..2bfd89a 100644 --- a/app/damination.py +++ b/app/damination.py @@ -2,7 +2,6 @@ import asyncio import json import re import requests -from aiohttp import web from websockets import connect import logging.handlers import configparser @@ -23,13 +22,6 @@ config = configparser.ConfigParser() config.read("config/settings.ini") -if config.has_option("widget", "ids"): - WIDGET_IDS = config.get("widget", "ids").split(",") - if not WIDGET_IDS[0]: - WIDGET_IDS = [] -else: - WIDGET_IDS = [] - MAX_RETRIES = config.getint("config", "MAX_RETRIES") SERVER_IP = config.get("server", "ip") @@ -79,13 +71,27 @@ class Donate(BaseModel): class DaminationAPI(): def __init__(self): - self.WIDGET_IDS = WIDGET_IDS + self.WIDGET_IDS = [] self.queue = asyncio.Queue() self.fetch_tasks = [] - self.stop_event = asyncio.Event() self.subscribers = {} self.session = aiohttp.ClientSession() + self.connections = {} + self.processing_tasks = [] + async def init_widget(self): + if config.has_option("widget", "ids"): + for widget in config.get("widget", "ids").split(","): + await self.add_widget_id_and_start_fetching(widget) + + async def process_notification(self): + while True: + donate = await self.pop_from_queue() + if donate is not None: + for widget_id, value in donate.items(): + await self.notify_subscribers(widget_id, value) + else: + break def add_subscriber(self, widget_id: str, url: str) -> bool: if widget_id not in self.subscribers: @@ -167,8 +173,6 @@ class DaminationAPI(): logger.debug(f"token : {token}") logger.info(f"WebSocket connection established for widget_id: {widget_id}") while True: - if self.stop_event.is_set(): # 추가: 중단 이벤트가 설정된 경우, 작업을 중단 - break message = await websocket.recv() logger.debug(f"message: {message}") @@ -212,30 +216,7 @@ class DaminationAPI(): return payload - async def start_fetching_alerts(self): - """ - 설정된 모든 위젯 ID에 대해 알림을 가져오는 작업을 시작합니다. - """ - - self.stop_event.clear() # 추가: 중단 이벤트를 초기화합니다. - - for widget_id in self.WIDGET_IDS: - task = asyncio.create_task(self.fetch_alerts(self.queue, widget_id)) - self.fetch_tasks.append(task) - - await asyncio.gather(*self.fetch_tasks) - - - async def stop_fetching_alerts(self): - """ - 설정된 모든 위젯 ID에 대한 알림 가져오기 작업을 중단합니다. - 중단 이벤트를 설정하고 모든 fetch_tasks를 기다립니다. - """ - - self.stop_event.set() - await asyncio.gather(*self.fetch_tasks) - - async def add_widget_id_and_start_fetching(self, widget_id: str): + async def add_widget_id_and_start_fetching(self, widget_id: str) -> bool: """ 주어진 widget_id를 위젯 ID 목록에 추가하고 알림을 가져오는 작업을 시작합니다. widget_id: 추가할 위젯 ID @@ -245,9 +226,11 @@ class DaminationAPI(): self.add_widget_id(widget_id) task = asyncio.create_task(self.fetch_alerts(self.queue, widget_id)) self.fetch_tasks.append(task) + return True + else: logger.warning(f"Widget ID {widget_id} already exists in the list.") - + return False async def remove_widget_id_and_stop_fetching(self, widget_id: str): """ @@ -275,6 +258,11 @@ class DaminationAPI(): else: return None + async def get_connection(self, url: str): + if url not in self.connections: + self.connections[url] = aiohttp.ClientSession() + return self.connections[url] + def get_subscribers(self, widget_id: str) -> list: """ 주어진 widget_id에 해당하는 구독자 목록을 반환합니다. @@ -286,37 +274,39 @@ class DaminationAPI(): else: return [] - async def notify_subscribers(self, widget_id, donate): + async def notify_subscribers(self, widget_id: str, donate): """ 주어진 widget_id에 해당하는 모든 구독자들에게 알림을 전송합니다. widget_id: 알림을 보낼 구독자들의 위젯 ID donate: 전송할 알림 데이터 """ subscribers = self.get_subscribers(widget_id) - tasks = [self.notify_subscriber(self.session, subscriber_url, donate) for subscriber_url in subscribers] # 변경: 기존 클래스 세션 사용 + tasks = [asyncio.create_task(self.send_webhook(await self.get_connection(subscriber_url), subscriber_url, donate)) for subscriber_url in subscribers] # 변경: self.get_connection() 사용 await asyncio.gather(*tasks) - async def notify_subscriber(self, session, subscriber_url, donate): - """ - 주어진 구독자에게 알림을 전송합니다. - session: aiohttp ClientSession 객체 - subscriber_url: 알림을 받을 구독자의 URL - donate: 전송할 알림 데이터 - """ - async with session.post(subscriber_url, json=donate) as response: - if response.status // 100 == 2: - logger.info(f"Successfully sent notification to {subscriber_url}.") + + async def send_webhook(self, session, subscriber_url, donate): + payload = { + "text": f"**{donate['content']['name']}({donate['content']['account']})**\n{donate}", + "data": donate + } + + async with session.post(subscriber_url, json=payload) as response: + if response.status == 200: + logger.info(f"Successfully sent Webhook to {subscriber_url}, {payload}") else: logger.error(f"Failed to send notification to {subscriber_url} with status code {response.status}.") - async def run(self): - await self.start_fetching_alerts() - + await self.init_widget() + while True: if not self.queue.empty(): - donate = await self.pop_from_queue() - for widget_id, value in donate.items(): - await self.notify_subscribers(widget_id, value) - await asyncio.sleep(2) + task = asyncio.create_task(self.process_notification()) + self.processing_tasks.append(task) + await task + await asyncio.sleep(0.1) + async def close(self): + for session in self.connections.values(): + await session.close() \ No newline at end of file diff --git a/app/main.py b/app/main.py index 5a5f2df..78367ee 100644 --- a/app/main.py +++ b/app/main.py @@ -58,8 +58,11 @@ async def add_widget(request): widget_id: 추가할 위젯 ID """ widget_id = request.match_info['widget_id'] - await toonat.add_widget_id_and_start_fetching(widget_id) - return web.Response(text=f"Widget ID {widget_id} added and fetching started.") + result = await toonat.add_widget_id_and_start_fetching(widget_id) + if result: + return web.Response(text=f"Widget ID {widget_id} added and fetching started.") + else: + return web.Response(text=f"Widget ID {widget_id} already exists in the list.") @routes.get('/remove_widget/{widget_id}') async def remove_widget(request):