Files
damination/app/damination.py

323 lines
10 KiB
Python

import asyncio
import json
import re
import requests
from aiohttp import web
from websockets import connect
import logging.handlers
import configparser
from pydantic import BaseModel
from datetime import datetime
import aiohttp
class StatusEndpointFilter(logging.Filter):
def filter(self, record):
if 'GET /status' in record.getMessage():
return False
return True
# Config Parser
config = configparser.ConfigParser()
config.read("config/settings.ini")
if config.has_option("widget", "ids"):
WIDGET_IDS = config.get("widget", "ids").split(",")
if not WIDGET_IDS[0]:
WIDGET_IDS = []
else:
WIDGET_IDS = []
MAX_RETRIES = config.getint("config", "MAX_RETRIES")
SERVER_IP = config.get("server", "ip")
SERVER_PORT = config.get("server", "port")
# Create logger
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
# Create file handler
log_filename = datetime.now().strftime('logs/%Y-%m-%d_%H:%M:%S_damination') + '.log'
file_handler = logging.handlers.RotatingFileHandler(log_filename, maxBytes=100000, backupCount=3)
file_handler.setLevel(logging.INFO)
file_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(file_formatter)
file_handler.addFilter(StatusEndpointFilter())
# Create stream handler
stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.DEBUG)
stream_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
stream_handler.setFormatter(stream_formatter)
# Add handlers to logger
logger.addHandler(file_handler)
logger.addHandler(stream_handler)
class Roulette(BaseModel):
name : str
conf : dict
class Content(BaseModel):
account : str
name : str
message : str | None
count : int | None
amount : int | None
roulette : Roulette | None
class Donate(BaseModel):
code : str
content : Content
class DaminationAPI():
def __init__(self):
self.WIDGET_IDS = WIDGET_IDS
self.queue = asyncio.Queue()
self.fetch_tasks = []
self.stop_event = asyncio.Event()
self.subscribers = {}
self.session = aiohttp.ClientSession()
def add_subscriber(self, widget_id: str, url: str) -> bool:
if widget_id not in self.subscribers:
self.subscribers[widget_id] = []
if url not in self.subscribers[widget_id]:
self.subscribers[widget_id].append(url)
return True
else:
return False
def remove_subscriber(self, widget_id: str, url: str) -> bool:
if widget_id in self.subscribers and url in self.subscribers[widget_id]:
self.subscribers[widget_id].remove(url)
return True
else:
return False
def get_widget_ids(self) -> list:
"""
현재 설정된 모든 위젯 ID 목록을 반환합니다.
"""
logger.debug(f"get widget {self.WIDGET_IDS}")
return self.WIDGET_IDS
def add_widget_id(self, widget_id:str):
"""
주어진 widget_id를 위젯 ID 목록에 추가합니다.
widget_id: 추가할 위젯 ID
"""
try:
self.WIDGET_IDS.append(widget_id)
logger.debug(f"add widget {widget_id}")
except Exception as e:
logger.error(e)
def remove_widget_id(self, widget_id: str) -> str:
"""
주어진 widget_id를 위젯 ID 목록에서 제거합니다. 제거한 widget_id를 반환하며,
widget_id가 목록에 없는 경우 빈 문자열을 반환합니다.
widget_id: 제거할 위젯 ID
"""
try:
if widget_id in self.WIDGET_IDS:
widget = self.WIDGET_IDS.pop(self.WIDGET_IDS.index(widget_id))
logger.debug(f"delete widget {widget}")
return widget
else:
return ''
except Exception as e:
logger.error(e)
async def fetch_alerts(self, queue, widget_id: str):
"""
주어진 widget_id에 대한 알림을 가져와 queue에 저장합니다.
연결이 끊어지면 최대 MAX_RETRIES 횟수만큼 재시도합니다.
queue: 알림 데이터를 저장할 asyncio 큐
widget_id: 알림을 가져올 위젯 ID
"""
retries = 0
while retries < MAX_RETRIES:
try:
logger.info(f"retries: {retries}")
payload = self.get_websocket_payload(widget_id)
wss_url = f"wss://toon.at:8071/{payload}"
async with connect(wss_url, ping_interval=12) as websocket:
token = json.loads(await websocket.recv())['token']
logger.debug(f"token : {token}")
logger.info(f"WebSocket connection established for widget_id: {widget_id}")
while True:
if self.stop_event.is_set(): # 추가: 중단 이벤트가 설정된 경우, 작업을 중단
break
message = await websocket.recv()
logger.debug(f"message: {message}")
alert_data = json.loads(message)
try:
reformed_data = Donate(**alert_data)
except Exception as e:
logger.error(f"Error: {e}, Raw data: {alert_data}")
pass
await queue.put({widget_id: json.loads(reformed_data.json().encode('utf-8').decode('unicode_escape'))})
except asyncio.CancelledError: # 추가: 취소 예외 처리
logger.info(f"Widget {widget_id} fetching task was cancelled.")
break
except Exception as e:
logger.error(f"Error: {e}")
retries += 1
await asyncio.sleep(2 ** retries)
self.remove_widget_id(widget_id)
logger.info(f"Connection failed for widget_id: {widget_id}")
@staticmethod
def get_websocket_payload(widget_id:str) -> str:
"""
주어진 widget_id에 대한 WebSocket 연결 페이로드를 반환합니다.
widget_id: 페이로드를 얻을 위젯 ID
"""
res = requests.get(f"https://toon.at/widget/alertbox/{widget_id}")
match = re.search(r"payload(.+)", res.text)
if not match:
raise ValueError("Payload not found")
payload = json.loads(match.group()[10:-1])["payload"]
return payload
async def start_fetching_alerts(self):
"""
설정된 모든 위젯 ID에 대해 알림을 가져오는 작업을 시작합니다.
"""
self.stop_event.clear() # 추가: 중단 이벤트를 초기화합니다.
for widget_id in self.WIDGET_IDS:
task = asyncio.create_task(self.fetch_alerts(self.queue, widget_id))
self.fetch_tasks.append(task)
await asyncio.gather(*self.fetch_tasks)
async def stop_fetching_alerts(self):
"""
설정된 모든 위젯 ID에 대한 알림 가져오기 작업을 중단합니다.
중단 이벤트를 설정하고 모든 fetch_tasks를 기다립니다.
"""
self.stop_event.set()
await asyncio.gather(*self.fetch_tasks)
async def add_widget_id_and_start_fetching(self, widget_id: str):
"""
주어진 widget_id를 위젯 ID 목록에 추가하고 알림을 가져오는 작업을 시작합니다.
widget_id: 추가할 위젯 ID
"""
if widget_id not in self.WIDGET_IDS:
self.add_widget_id(widget_id)
task = asyncio.create_task(self.fetch_alerts(self.queue, widget_id))
self.fetch_tasks.append(task)
else:
logger.warning(f"Widget ID {widget_id} already exists in the list.")
async def remove_widget_id_and_stop_fetching(self, widget_id: str):
"""
주어진 widget_id를 위젯 ID 목록에서 제거하고 알림을 가져오는 작업을 중단합니다.
widget_id: 제거할 위젯 ID
"""
if widget_id in self.WIDGET_IDS:
index = self.WIDGET_IDS.index(widget_id)
self.remove_widget_id(widget_id)
fetch_task = self.fetch_tasks.pop(index)
fetch_task.cancel()
await fetch_task
else:
logger.warning(f"Widget ID {widget_id} not found.")
async def pop_from_queue(self):
"""
queue에서 가장 먼저 저장된 항목을 꺼내어 반환합니다.
queue가 비어있는 경우 None을 반환합니다.
"""
if not self.queue.empty():
item = await self.queue.get()
return item
else:
return None
def get_subscribers(self, widget_id: str) -> list:
"""
주어진 widget_id에 해당하는 구독자 목록을 반환합니다.
widget_id: 구독자 목록을 얻을 위젯 ID
"""
if widget_id in self.subscribers:
return self.subscribers[widget_id]
else:
return []
async def notify_subscribers(self, widget_id, donate):
"""
주어진 widget_id에 해당하는 모든 구독자들에게 알림을 전송합니다.
widget_id: 알림을 보낼 구독자들의 위젯 ID
donate: 전송할 알림 데이터
"""
subscribers = self.get_subscribers(widget_id)
tasks = [self.notify_subscriber(self.session, subscriber_url, donate) for subscriber_url in subscribers] # 변경: 기존 클래스 세션 사용
await asyncio.gather(*tasks)
async def notify_subscriber(self, session, subscriber_url, donate):
"""
주어진 구독자에게 알림을 전송합니다.
session: aiohttp ClientSession 객체
subscriber_url: 알림을 받을 구독자의 URL
donate: 전송할 알림 데이터
"""
async with session.post(subscriber_url, json=donate) as response:
if response.status // 100 == 2:
logger.info(f"Successfully sent notification to {subscriber_url}.")
else:
logger.error(f"Failed to send notification to {subscriber_url} with status code {response.status}.")
async def run(self):
await self.start_fetching_alerts()
while True:
if not self.queue.empty():
donate = await self.pop_from_queue()
for widget_id, value in donate.items():
await self.notify_subscribers(widget_id, value)
await asyncio.sleep(2)