import asyncio import json import re import requests from websockets import connect import logging.handlers import configparser from pydantic import BaseModel from datetime import datetime import aiohttp class StatusEndpointFilter(logging.Filter): def filter(self, record): if 'GET /status' in record.getMessage(): return False return True # Config Parser config = configparser.ConfigParser() config.read("config/settings.ini") MAX_RETRIES = config.getint("config", "MAX_RETRIES") SERVER_IP = config.get("server", "ip") SERVER_PORT = config.get("server", "port") # Create logger logger = logging.getLogger() logger.setLevel(logging.DEBUG) # Create file handler log_filename = datetime.now().strftime('logs/%Y-%m-%d_%H:%M:%S_damination') + '.log' file_handler = logging.handlers.RotatingFileHandler(log_filename, maxBytes=100000, backupCount=3) file_handler.setLevel(logging.INFO) file_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') file_handler.setFormatter(file_formatter) file_handler.addFilter(StatusEndpointFilter()) # Create stream handler stream_handler = logging.StreamHandler() stream_handler.setLevel(logging.DEBUG) stream_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') stream_handler.setFormatter(stream_formatter) # Add handlers to logger logger.addHandler(file_handler) logger.addHandler(stream_handler) class Roulette(BaseModel): name : str conf : dict class Content(BaseModel): account : str name : str message : str | None count : int | None amount : int | None roulette : Roulette | None class Donate(BaseModel): code : str content : Content class DaminationAPI(): def __init__(self): self.WIDGET_IDS = [] self.queue = asyncio.Queue() self.fetch_tasks = [] self.subscribers = {} self.session = aiohttp.ClientSession() self.connections = {} self.processing_tasks = [] async def initialize_widgets(self): if config.has_option("widget", "ids"): widgets = config.get("widget", "ids").split(",") if widgets[0]: for widget in widgets: await self.add_widget_and_start_fetching_notifications(widget) logger.info(f"initialize wigets : {widgets}") else: logger.error(f"widgets empty") async def process_notification(self): while True: donate = await self.pop_notification_from_queue() if donate is not None: for widget_id, value in donate.items(): await self.send_notifications_to_subscribers(widget_id, value) else: logger.error("No notifications in the queue, breaking loop.") break def get_subscribers(self, widget_id: str) -> list: """ 주어진 widget_id에 해당하는 구독자 목록을 반환합니다. widget_id: 구독자 목록을 얻을 위젯 ID """ if widget_id in self.subscribers: subscribers = self.subscribers[widget_id] logger.info(f"get {widget_id} subscribers : {subscribers}") return subscribers else: logger.error(f"no subscribers") return [] def add_subscriber(self, widget_id: str, url: str) -> str: if widget_id not in self.subscribers: self.subscribers[widget_id] = [] if url not in self.subscribers[widget_id]: self.subscribers[widget_id].append(url) logger.info(f"Added subscriber: widget_id={widget_id}, url={url}") return widget_id else: logger.error(f"Subscriber already exists: widget_id={widget_id}, url={url}") return '' def remove_subscriber(self, widget_id: str, url: str) -> str: if widget_id in self.subscribers and url in self.subscribers[widget_id]: self.subscribers[widget_id].remove(url) logger.info(f"Removed subscriber: widget_id={widget_id}, url={url}") return widget_id else: logger.error(f"Subscriber not found: widget_id={widget_id}, url={url}") return '' def get_widget_ids(self) -> list: """ 현재 설정된 모든 위젯 ID 목록을 반환합니다. """ if self.WIDGET_IDS: logger.info(f"get widget {self.WIDGET_IDS}") return self.WIDGET_IDS else: logger.error(f"widget id empty") return [] def add_widget_id(self, widget_id:str) -> str: """ 주어진 widget_id를 위젯 ID 목록에 추가합니다. widget_id: 추가할 위젯 ID """ try: if widget_id not in self.WIDGET_IDS: self.WIDGET_IDS.append(widget_id) logger.info(f"add widget {widget_id}") return widget_id else: logger.error(f"{widget_id} already exists in the list.") return '' except Exception as e: logger.error(e) def remove_widget_id(self, widget_id: str) -> str: """ 주어진 widget_id를 위젯 ID 목록에서 제거합니다. 제거한 widget_id를 반환하며, widget_id가 목록에 없는 경우 빈 문자열을 반환합니다. widget_id: 제거할 위젯 ID """ try: if widget_id in self.WIDGET_IDS: widget = self.WIDGET_IDS.pop(self.WIDGET_IDS.index(widget_id)) logger.info(f"remove widget {widget}") return widget else: logger.error(f"not in widget") return '' except Exception as e: logger.error(e) async def fetch_notifications(self, queue, widget_id: str): """ 주어진 widget_id에 대한 알림을 가져와 queue에 저장합니다. 연결이 끊어지면 최대 MAX_RETRIES 횟수만큼 재시도합니다. queue: 알림 데이터를 저장할 asyncio 큐 widget_id: 알림을 가져올 위젯 ID """ retries = 0 while retries < MAX_RETRIES: try: logger.info(f"retries: {retries}") payload = self.get_websocket_payload(widget_id) wss_url = f"wss://toon.at:8071/{payload}" async with connect(wss_url, ping_interval=12) as websocket: token = json.loads(await websocket.recv())['token'] logger.debug(f"token : {token}") logger.info(f"WebSocket connection established for widget_id: {widget_id}") while True: message = await websocket.recv() logger.debug(f"message: {message}") alert_data = json.loads(message) try: reformed_data = Donate(**alert_data) except Exception as e: logger.error(f"Error: {e}, Raw data: {alert_data}") pass await queue.put({widget_id: json.loads(reformed_data.json().encode('utf-8').decode('unicode_escape'))}) except asyncio.CancelledError: logger.info(f"Widget {widget_id} fetching task was cancelled.") break except Exception as e: logger.error(f"Error: {e}") retries += 1 await asyncio.sleep(2 ** retries) self.remove_widget_id(widget_id) logger.info(f"Connection failed for widget_id: {widget_id}") @staticmethod def get_websocket_payload(widget_id:str) -> str: """ 주어진 widget_id에 대한 WebSocket 연결 페이로드를 반환합니다. widget_id: 페이로드를 얻을 위젯 ID """ res = requests.get(f"https://toon.at/widget/alertbox/{widget_id}") match = re.search(r"payload(.+)", res.text) if not match: raise ValueError("Payload not found") payload = json.loads(match.group()[10:-1])["payload"] return payload async def add_widget_and_start_fetching_notifications(self, widget_id: str) -> str: """ 주어진 widget_id를 위젯 ID 목록에 추가하고 알림을 가져오는 작업을 시작합니다. widget_id: 추가할 위젯 ID """ if widget_id not in self.WIDGET_IDS: self.add_widget_id(widget_id) task = asyncio.create_task(self.fetch_notifications(self.queue, widget_id)) self.fetch_tasks.append(task) return widget_id else: logger.error(f"Widget ID {widget_id} already exists in the list.") return '' async def remove_widget_and_stop_fetching_notifications(self, widget_id: str) -> str: """ 주어진 widget_id를 위젯 ID 목록에서 제거하고 알림을 가져오는 작업을 중단합니다. widget_id: 제거할 위젯 ID """ if widget_id in self.WIDGET_IDS: index = self.WIDGET_IDS.index(widget_id) self.remove_widget_id(widget_id) fetch_task = self.fetch_tasks.pop(index) fetch_task.cancel() await fetch_task return widget_id else: logger.error(f"Widget ID {widget_id} not found.") return '' async def pop_notification_from_queue(self): """ queue에서 가장 먼저 저장된 항목을 꺼내어 반환합니다. queue가 비어있는 경우 None을 반환합니다. """ if not self.queue.empty(): item = await self.queue.get() return item else: return None async def get_connection(self, url: str): if url not in self.connections: self.connections[url] = aiohttp.ClientSession() return self.connections[url] async def send_notifications_to_subscribers(self, widget_id: str, donate): """ 주어진 widget_id에 해당하는 모든 구독자들에게 알림을 전송합니다. widget_id: 알림을 보낼 구독자들의 위젯 ID donate: 전송할 알림 데이터 """ subscribers = self.get_subscribers(widget_id) tasks = [asyncio.create_task(self.send_notification_webhook(await self.get_connection(subscriber_url), subscriber_url, donate)) for subscriber_url in subscribers] await asyncio.gather(*tasks) async def send_notification_webhook(self, session, subscriber_url, donate): payload = { "text": f"**{donate['content']['name']}({donate['content']['account']})**\n{donate}", "data": donate } async with session.post(subscriber_url, json=payload) as response: if response.status == 200: logger.info(f"Successfully sent Webhook to {subscriber_url}, {payload}") else: logger.error(f"Failed to send notification to {subscriber_url} with status code {response.status}.") async def run(self): await self.initialize_widgets() while True: if not self.queue.empty(): task = asyncio.create_task(self.process_notification()) self.processing_tasks.append(task) await task await asyncio.sleep(0.1) async def close(self): for session in self.connections.values(): await session.close()