Files
damination/app/damination.py

298 lines
9.3 KiB
Python

import asyncio
import json
import re
import requests
from aiohttp import web
from websockets import connect
import logging.handlers
import configparser
from pydantic import BaseModel
from datetime import datetime
from aiohttp import ClientSession
class StatusEndpointFilter(logging.Filter):
def filter(self, record):
if 'GET /status' in record.getMessage():
return False
return True
# Config Parser
config = configparser.ConfigParser()
config.read("config/settings.ini")
if config.has_option("widget", "ids"):
WIDGET_IDS = config.get("widget", "ids").split(",")
if not WIDGET_IDS[0]:
WIDGET_IDS = []
else:
WIDGET_IDS = []
MAX_RETRIES = config.getint("config", "MAX_RETRIES")
SERVER_IP = config.get("server", "ip")
SERVER_PORT = config.get("server", "port")
# Create logger
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
# Create file handler
log_filename = datetime.now().strftime('logs/%Y-%m-%d_%H:%M:%S_damination') + '.log'
file_handler = logging.handlers.RotatingFileHandler(log_filename, maxBytes=100000, backupCount=3)
file_handler.setLevel(logging.INFO)
file_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(file_formatter)
file_handler.addFilter(StatusEndpointFilter())
# Create stream handler
stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.DEBUG)
stream_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
stream_handler.setFormatter(stream_formatter)
# Add handlers to logger
logger.addHandler(file_handler)
logger.addHandler(stream_handler)
class Roulette(BaseModel):
name : str
conf : dict
class Content(BaseModel):
account : str
name : str
message : str | None
count : int | None
amount : int | None
roulette : Roulette | None
class Donate(BaseModel):
code : str
content : Content
class DaminationAPI():
def __init__(self):
self.queue = asyncio.Queue()
self.fetch_tasks = []
self.stop_event = asyncio.Event()
self.subscribers = {}
def add_subscriber(self, widget_id: str, url: str) -> bool:
if widget_id not in self.subscribers:
self.subscribers[widget_id] = []
if url not in self.subscribers[widget_id]:
self.subscribers[widget_id].append(url)
return True
else:
return False
def remove_subscriber(self, widget_id: str, url: str) -> bool:
if widget_id in self.subscribers and url in self.subscribers[widget_id]:
self.subscribers[widget_id].remove(url)
return True
else:
return False
def get_widget_ids(self) -> list:
"""
현재 설정된 모든 위젯 ID 목록을 반환합니다.
"""
logger.debug(f"get widget {WIDGET_IDS}")
return WIDGET_IDS
def add_widget_id(self, widget_id:str):
"""
주어진 widget_id를 위젯 ID 목록에 추가합니다.
widget_id: 추가할 위젯 ID
"""
try:
WIDGET_IDS.append(widget_id)
logger.debug(f"add widget {widget_id}")
except Exception as e:
logger.error(e)
def remove_widget_id(self, widget_id: str) -> str:
"""
주어진 widget_id를 위젯 ID 목록에서 제거합니다. 제거한 widget_id를 반환하며,
widget_id가 목록에 없는 경우 빈 문자열을 반환합니다.
widget_id: 제거할 위젯 ID
"""
try:
if widget_id in WIDGET_IDS:
widget = WIDGET_IDS.pop(WIDGET_IDS.index(widget_id))
logger.debug(f"delete widget {widget}")
return widget
else:
return ''
except Exception as e:
logger.error(e)
async def fetch_alerts(self, queue, widget_id: str):
"""
주어진 widget_id에 대한 알림을 가져와 queue에 저장합니다.
연결이 끊어지면 최대 MAX_RETRIES 횟수만큼 재시도합니다.
queue: 알림 데이터를 저장할 asyncio 큐
widget_id: 알림을 가져올 위젯 ID
"""
retries = 0
while retries < MAX_RETRIES:
try:
logger.info(f"retries: {retries}")
payload = self.get_websocket_payload(widget_id)
wss_url = f"wss://toon.at:8071/{payload}"
async with connect(wss_url, ping_interval=12) as websocket:
token = json.loads(await websocket.recv())['token']
logger.debug(f"token : {token}")
logger.info(f"WebSocket connection established for widget_id: {widget_id}")
while True:
if self.stop_event.is_set(): # 추가: 중단 이벤트가 설정된 경우, 작업을 중단
break
message = await websocket.recv()
logger.debug(f"message: {message}")
alert_data = json.loads(message)
try:
reformed_data = Donate(**alert_data)
except Exception as e:
logger.error(f"Error: {e}, Raw data: {alert_data}")
pass
await queue.put({widget_id: json.loads(reformed_data.json().encode('utf-8').decode('unicode_escape'))})
except asyncio.CancelledError: # 추가: 취소 예외 처리
logger.info(f"Widget {widget_id} fetching task was cancelled.")
break
except Exception as e:
logger.error(f"Error: {e}")
retries += 1
await asyncio.sleep(2 ** retries)
self.remove_widget_id(widget_id)
logger.info(f"Connection failed for widget_id: {widget_id}")
@staticmethod
def get_websocket_payload(widget_id:str) -> str:
"""
주어진 widget_id에 대한 WebSocket 연결 페이로드를 반환합니다.
widget_id: 페이로드를 얻을 위젯 ID
"""
res = requests.get(f"https://toon.at/widget/alertbox/{widget_id}")
match = re.search(r"payload(.+)", res.text)
if not match:
raise ValueError("Payload not found")
payload = json.loads(match.group()[10:-1])["payload"]
return payload
async def start_fetching_alerts(self):
"""
설정된 모든 위젯 ID에 대해 알림을 가져오는 작업을 시작합니다.
"""
self.stop_event.clear() # 추가: 중단 이벤트를 초기화합니다.
for widget_id in WIDGET_IDS:
task = asyncio.create_task(self.fetch_alerts(self.queue, widget_id))
self.fetch_tasks.append(task)
await asyncio.gather(*self.fetch_tasks)
async def stop_fetching_alerts(self):
"""
설정된 모든 위젯 ID에 대한 알림 가져오기 작업을 중단합니다.
중단 이벤트를 설정하고 모든 fetch_tasks를 기다립니다.
"""
self.stop_event.set()
await asyncio.gather(*self.fetch_tasks)
async def add_widget_id_and_start_fetching(self, widget_id: str):
"""
주어진 widget_id를 위젯 ID 목록에 추가하고 알림을 가져오는 작업을 시작합니다.
widget_id: 추가할 위젯 ID
"""
if widget_id not in WIDGET_IDS:
self.add_widget_id(widget_id)
task = asyncio.create_task(self.fetch_alerts(self.queue, widget_id))
self.fetch_tasks.append(task)
else:
logger.warning(f"Widget ID {widget_id} already exists in the list.")
async def remove_widget_id_and_stop_fetching(self, widget_id: str):
"""
주어진 widget_id를 위젯 ID 목록에서 제거하고 알림을 가져오는 작업을 중단합니다.
widget_id: 제거할 위젯 ID
"""
if widget_id in WIDGET_IDS:
index = WIDGET_IDS.index(widget_id)
self.remove_widget_id(widget_id)
fetch_task = self.fetch_tasks.pop(index)
fetch_task.cancel()
await fetch_task
else:
logger.warning(f"Widget ID {widget_id} not found.")
async def pop_from_queue(self):
"""
queue에서 가장 먼저 저장된 항목을 꺼내어 반환합니다.
queue가 비어있는 경우 None을 반환합니다.
"""
if not self.queue.empty():
item = await self.queue.get()
return item
else:
return None
async def notify_subscribers(self, widget_id: str, donate):
if widget_id not in self.subscribers:
return
for subscriber_url in self.subscribers[widget_id]:
async with ClientSession() as session:
await session.post(subscriber_url, json=donate)
logger.info(f"send to subscriber({widget_id}: {donate})")
async def run(self):
await self.start_fetching_alerts()
while True:
if not self.queue.empty():
donate = await self.pop_from_queue()
for widget_id, value in donate.items():
await self.notify_subscribers(widget_id, value)
await asyncio.sleep(2)