import asyncio import json import re import requests from aiohttp import web from websockets import connect import logging.handlers import configparser from pydantic import BaseModel from datetime import datetime from aiohttp import ClientSession class StatusEndpointFilter(logging.Filter): def filter(self, record): if 'GET /status' in record.getMessage(): return False return True # Config Parser config = configparser.ConfigParser() config.read("config/settings.ini") if config.has_option("widget", "ids"): WIDGET_IDS = config.get("widget", "ids").split(",") if not WIDGET_IDS[0]: WIDGET_IDS = [] else: WIDGET_IDS = [] MAX_RETRIES = config.getint("config", "MAX_RETRIES") SERVER_IP = config.get("server", "ip") SERVER_PORT = config.get("server", "port") # Create logger logger = logging.getLogger() logger.setLevel(logging.DEBUG) # Create file handler log_filename = datetime.now().strftime('logs/%Y-%m-%d_%H:%M:%S_damination') + '.log' file_handler = logging.handlers.RotatingFileHandler(log_filename, maxBytes=100000, backupCount=3) file_handler.setLevel(logging.INFO) file_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') file_handler.setFormatter(file_formatter) file_handler.addFilter(StatusEndpointFilter()) # Create stream handler stream_handler = logging.StreamHandler() stream_handler.setLevel(logging.DEBUG) stream_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') stream_handler.setFormatter(stream_formatter) # Add handlers to logger logger.addHandler(file_handler) logger.addHandler(stream_handler) class Roulette(BaseModel): name : str conf : dict class Content(BaseModel): account : str name : str message : str | None count : int | None amount : int | None roulette : Roulette | None class Donate(BaseModel): code : str content : Content class DaminationAPI(): def __init__(self): self.queue = asyncio.Queue() self.fetch_tasks = [] self.stop_event = asyncio.Event() self.subscribers = {} def add_subscriber(self, widget_id: str, url: str) -> bool: if widget_id not in self.subscribers: self.subscribers[widget_id] = [] if url not in self.subscribers[widget_id]: self.subscribers[widget_id].append(url) return True else: return False def remove_subscriber(self, widget_id: str, url: str) -> bool: if widget_id in self.subscribers and url in self.subscribers[widget_id]: self.subscribers[widget_id].remove(url) return True else: return False def get_widget_ids(self) -> list: """ 현재 설정된 모든 위젯 ID 목록을 반환합니다. """ logger.debug(f"get widget {WIDGET_IDS}") return WIDGET_IDS def add_widget_id(self, widget_id:str): """ 주어진 widget_id를 위젯 ID 목록에 추가합니다. widget_id: 추가할 위젯 ID """ try: WIDGET_IDS.append(widget_id) logger.debug(f"add widget {widget_id}") except Exception as e: logger.error(e) def remove_widget_id(self, widget_id: str) -> str: """ 주어진 widget_id를 위젯 ID 목록에서 제거합니다. 제거한 widget_id를 반환하며, widget_id가 목록에 없는 경우 빈 문자열을 반환합니다. widget_id: 제거할 위젯 ID """ try: if widget_id in WIDGET_IDS: widget = WIDGET_IDS.pop(WIDGET_IDS.index(widget_id)) logger.debug(f"delete widget {widget}") return widget else: return '' except Exception as e: logger.error(e) async def fetch_alerts(self, queue, widget_id: str): """ 주어진 widget_id에 대한 알림을 가져와 queue에 저장합니다. 연결이 끊어지면 최대 MAX_RETRIES 횟수만큼 재시도합니다. queue: 알림 데이터를 저장할 asyncio 큐 widget_id: 알림을 가져올 위젯 ID """ retries = 0 while retries < MAX_RETRIES: try: logger.info(f"retries: {retries}") payload = self.parse_payload(widget_id) wss_url = f"wss://toon.at:8071/{payload}" async with connect(wss_url, ping_interval=12) as websocket: token = json.loads(await websocket.recv())['token'] logger.debug(f"token : {token}") logger.info(f"WebSocket connection established for widget_id: {widget_id}") while True: if self.stop_event.is_set(): # 추가: 중단 이벤트가 설정된 경우, 작업을 중단 break message = await websocket.recv() logger.debug(f"message: {message}") alert_data = json.loads(message) try: reformed_data = Donate(**alert_data) except Exception as e: logger.error(f"Error: {e}, Raw data: {alert_data}") pass await queue.put({widget_id: json.loads(reformed_data.json().encode('utf-8').decode('unicode_escape'))}) except asyncio.CancelledError: # 추가: 취소 예외 처리 logger.info(f"Widget {widget_id} fetching task was cancelled.") break except Exception as e: logger.error(f"Error: {e}") retries += 1 await asyncio.sleep(2 ** retries) self.remove_widget_id(widget_id) logger.info(f"Connection failed for widget_id: {widget_id}") @staticmethod def parse_payload(widget_id:str) -> str: """ 주어진 widget_id에 대한 WebSocket 연결 페이로드를 반환합니다. widget_id: 페이로드를 얻을 위젯 ID """ res = requests.get(f"https://toon.at/widget/alertbox/{widget_id}") match = re.search(r"payload(.+)", res.text) if not match: raise ValueError("Payload not found") payload = json.loads(match.group()[10:-1])["payload"] return payload async def start_fetching_alerts(self): """ 설정된 모든 위젯 ID에 대해 알림을 가져오는 작업을 시작합니다. """ self.stop_event.clear() # 추가: 중단 이벤트를 초기화합니다. for widget_id in WIDGET_IDS: task = asyncio.create_task(self.fetch_alerts(self.queue, widget_id)) self.fetch_tasks.append(task) await asyncio.gather(*self.fetch_tasks) async def stop_fetching_alerts(self): """ 설정된 모든 위젯 ID에 대한 알림 가져오기 작업을 중단합니다. 중단 이벤트를 설정하고 모든 fetch_tasks를 기다립니다. """ self.stop_event.set() await asyncio.gather(*self.fetch_tasks) async def add_widget_id_and_start_fetching(self, widget_id: str): """ 주어진 widget_id를 위젯 ID 목록에 추가하고 알림을 가져오는 작업을 시작합니다. widget_id: 추가할 위젯 ID """ if widget_id not in WIDGET_IDS: self.add_widget_id(widget_id) task = asyncio.create_task(self.fetch_alerts(self.queue, widget_id)) self.fetch_tasks.append(task) else: logger.warning(f"Widget ID {widget_id} already exists in the list.") async def remove_widget_id_and_stop_fetching(self, widget_id: str): """ 주어진 widget_id를 위젯 ID 목록에서 제거하고 알림을 가져오는 작업을 중단합니다. widget_id: 제거할 위젯 ID """ if widget_id in WIDGET_IDS: index = WIDGET_IDS.index(widget_id) self.remove_widget_id(widget_id) fetch_task = self.fetch_tasks.pop(index) fetch_task.cancel() await fetch_task else: logger.warning(f"Widget ID {widget_id} not found.") async def pop_from_queue(self): """ queue에서 가장 먼저 저장된 항목을 꺼내어 반환합니다. queue가 비어있는 경우 None을 반환합니다. """ if not self.queue.empty(): item = await self.queue.get() return item else: return None async def notify_subscribers(self, widget_id: str, donate): if widget_id not in self.subscribers: return for subscriber_url in self.subscribers[widget_id]: async with ClientSession() as session: await session.post(subscriber_url, json=donate) logger.info(f"send to subscriber({widget_id}: {donate})") async def send_data_to_service(self, widget_id: str, donate): if widget_id in self.subscribers: for subscriber in self.subscribers[widget_id]: async with ClientSession() as session: await session.post(subscriber, json=donate) logger.info(f"send to subscriber({widget_id}: {donate})") ''' async def send_data_to_service(self, alert_data): """ 외부 서비스로 데이터를 전송하는 비동기 함수입니다. """ url = f"http://{SERVER_IP}:{SERVER_PORT}/donate" # 외부 서비스의 API 엔드포인트를 설정합니다. headers = { "Content-Type": "application/json" } async with ClientSession() as session: async with session.post(url, data=json.dumps(alert_data), headers=headers) as response: if response.status == 200: logger.info(f"Data sent successfully: {alert_data}") else: logger.error(f"Failed to send data: {alert_data}. Status: {response.status}") ''' async def run(self): await self.start_fetching_alerts() while True: if not self.queue.empty(): donate = await self.pop_from_queue() for widget_id, value in donate.items(): await self.notify_subscribers(widget_id, value) await self.send_data_to_service(widget_id, value) await asyncio.sleep(2) from aiohttp import web routes = web.RouteTableDef() toonat = DaminationAPI() app = web.Application() @routes.get('/subscribe/{widget_id}/{url}') async def subscribe(request): widget_id = request.match_info['widget_id'] url = request.match_info['url'] success = toonat.add_subscriber(widget_id, url) if success: return web.Response(text=f"Subscribed to widget ID {widget_id} with URL {url}.") else: return web.Response(text=f"Already subscribed to widget ID {widget_id} with URL {url}.") @routes.get('/unsubscribe/{widget_id}/{url}') async def unsubscribe(request): widget_id = request.match_info['widget_id'] url = request.match_info['url'] success = toonat.remove_subscriber(widget_id, url) if success: return web.Response(text=f"Unsubscribed from widget ID {widget_id} with URL {url}.") else: return web.Response(text=f"Not subscribed to widget ID {widget_id} with URL {url}.") @routes.get('/init_widget_id') async def init_widget_id(request): if not WIDGET_IDS: widget_id = request.query.get("widget_id") if widget_id: WIDGET_IDS.append(widget_id) return web.Response(text=f"Widget ID {widget_id} added.") else: return web.Response(text="Invalid widget ID. Please try again.") else: return web.Response(text="Widget ID already exists.") @routes.get('/get_widgets') async def get_widgets(request): """ 현재 설정된 모든 위젯 ID 목록을 반환하는 웹 API 엔드포인트입니다. """ widget_ids = toonat.get_widget_ids() return web.json_response(widget_ids) @routes.get('/add_widget/{widget_id}') async def add_widget(request): """ 주어진 widget_id를 위젯 ID 목록에 추가하고 알림을 가져오는 작업을 시작하는 웹 API 엔드포인트입니다. widget_id: 추가할 위젯 ID """ widget_id = request.match_info['widget_id'] await toonat.add_widget_id_and_start_fetching(widget_id) return web.Response(text=f"Widget ID {widget_id} added and fetching started.") @routes.get('/remove_widget/{widget_id}') async def remove_widget(request): """ 주어진 widget_id를 위젯 ID 목록에서 제거하고 알림을 가져오는 작업을 중단하는 웹 API 엔드포인트입니다. widget_id: 제거할 위젯 ID """ widget_id = request.match_info['widget_id'] await toonat.remove_widget_id_and_stop_fetching(widget_id) return web.Response(text=f"Widget ID {widget_id} removed and fetching stopped.") @routes.get('/status') async def status(request): """ 서버 상태를 확인하는 웹 API 엔드포인트입니다. 이 엔드포인트는 서버가 정상적으로 작동 중임을 확인하는 데 사용됩니다. """ return web.HTTPOk() async def web_server(app): """ aiohttp 웹 서버를 설정하고 시작하는 비동기 함수입니다. """ app.add_routes(routes) runner = web.AppRunner(app) await runner.setup() site = web.TCPSite(runner, '0.0.0.0', 80) await site.start() async def main(): # DaminationAPI를 실행합니다. api_task = asyncio.create_task(toonat.run()) # 웹 서버를 실행합니다. web_server_task = asyncio.create_task(web_server(app)) # 두 작업을 병렬로 실행하지 않고 순서대로 처리합니다. await api_task await web_server_task if __name__ == "__main__": if not WIDGET_IDS: print("No widget ID found in settings.ini.") print("Please enter the widget ID via web endpoint (http://localhost/init_widget_id?widget_id=YOUR_WIDGET_ID)") asyncio.run(main())