Files
damination/app/damination.py
2023-04-30 10:26:08 +09:00

359 lines
12 KiB
Python

import asyncio
import json
import re
import requests
from websockets import connect
import logging.handlers
import configparser
from pydantic import BaseModel
from datetime import datetime
import aiohttp
class StatusEndpointFilter(logging.Filter):
def filter(self, record):
if 'GET /status' in record.getMessage():
return False
return True
# Config Parser
config = configparser.ConfigParser()
config.read("config/settings.ini")
MAX_RETRIES = config.getint("config", "MAX_RETRIES")
SERVER_IP = config.get("server", "ip")
SERVER_PORT = config.get("server", "port")
# Create logger
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
# Create file handler
log_filename = datetime.now().strftime('logs/%Y-%m-%d_%H:%M:%S_damination') + '.log'
file_handler = logging.handlers.RotatingFileHandler(log_filename, maxBytes=100000, backupCount=3)
file_handler.setLevel(logging.INFO)
file_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(file_formatter)
file_handler.addFilter(StatusEndpointFilter())
# Create stream handler
stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.DEBUG)
stream_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
stream_handler.setFormatter(stream_formatter)
# Add handlers to logger
logger.addHandler(file_handler)
logger.addHandler(stream_handler)
class Roulette(BaseModel):
name : str
conf : dict
class Content(BaseModel):
account : str
name : str
message : str | None
count : int | None
amount : int | None
roulette : Roulette | None
class Donate(BaseModel):
code : str
content : Content
class DaminationAPI():
def __init__(self):
self.WIDGET_IDS = []
self.queue = asyncio.Queue()
self.fetch_tasks = []
self.subscribers = {}
self.session = aiohttp.ClientSession()
self.connections = {}
self.processing_tasks = []
async def initialize_widgets(self):
if config.has_option("widget", "ids"):
widgets = config.get("widget", "ids").split(",")
if widgets[0]:
for widget in widgets:
await self.add_widget_and_start_fetching_notifications(widget)
logger.info(f"[initialize_widgets] WIDGET ID 초기화 : {widgets}")
else:
logger.error(f"[initialize_widgets] WIDGET 초기화 실패 - 등록된 WIDGET ID 없음")
async def process_notification(self):
while True:
donate = await self.pop_notification_from_queue()
if donate is not None:
for widget_id, value in donate.items():
await self.send_notifications_to_subscribers(widget_id, value)
logger.debug(f"[process_notification] QUEUE 추출: {widget_id, value}")
else:
logger.error("[process_notification] QUEUE 추출 실패 - 저장된 QUEUE 없음")
break
def get_subscribers(self, widget_id: str) -> list:
"""
주어진 widget_id에 해당하는 구독자 목록을 반환합니다.
widget_id: 구독자 목록을 얻을 위젯 ID
"""
if widget_id in self.subscribers:
subscribers = self.subscribers[widget_id]
logger.info(f"[get_subscribers] 구독자 반환({widget_id}) : {subscribers}")
return subscribers
else:
logger.error(f"[get_subscribers] 구독자 없음({widget_id})")
return []
def add_subscriber(self, widget_id: str, url: str) -> str:
if widget_id not in self.subscribers:
self.subscribers[widget_id] = []
if url not in self.subscribers[widget_id]:
self.subscribers[widget_id].append(url)
logger.info(f"[add_subscriber] 구독자 추가 widget_id={widget_id}, url={url}")
return widget_id
else:
logger.error(f"[add_subscriber] 이미 추가된 구독자 widget_id={widget_id}, url={url}")
return ''
def remove_subscriber(self, widget_id: str, url: str) -> str:
if widget_id in self.subscribers and url in self.subscribers[widget_id]:
self.subscribers[widget_id].remove(url)
logger.info(f"[remove_subscriber] 구독자 제거: widget_id={widget_id}, url={url}")
return widget_id
else:
logger.error(f"[remove_subscriber] 구독자를 찾을 수 없음 widget_id={widget_id}, url={url}")
return ''
def get_widget_ids(self) -> list:
"""
현재 설정된 모든 위젯 ID 목록을 반환합니다.
"""
if self.WIDGET_IDS:
logger.info(f"[get_widget_ids] WIDGET ID 반환 {self.WIDGET_IDS}")
return self.WIDGET_IDS
else:
logger.error(f"[get_widget_ids] WIDGET ID 반환 실패 - WIDGET ID 없음")
return []
def add_widget_id(self, widget_id:str) -> str:
"""
주어진 widget_id를 위젯 ID 목록에 추가합니다.
widget_id: 추가할 위젯 ID
"""
try:
if widget_id not in self.WIDGET_IDS:
self.WIDGET_IDS.append(widget_id)
logger.info(f"[add_widget_id] WIDGET ID 추가 {widget_id}")
return widget_id
else:
logger.error(f"[add_widget_id] 이미 추가된 WIDGET ID {widget_id}")
return ''
except Exception as e:
logger.error(e)
def remove_widget_id(self, widget_id: str) -> str:
"""
주어진 widget_id를 위젯 ID 목록에서 제거합니다. 제거한 widget_id를 반환하며,
widget_id가 목록에 없는 경우 빈 문자열을 반환합니다.
widget_id: 제거할 위젯 ID
"""
try:
if widget_id in self.WIDGET_IDS:
widget = self.WIDGET_IDS.pop(self.WIDGET_IDS.index(widget_id))
logger.info(f"[remove_widget_id] WIDGET ID 제거 {widget}")
return widget
else:
logger.error(f"[remove_widget_id] WIDGET ID 제거 실패 - WIDGET ID를 찾을 수 없음")
return ''
except Exception as e:
logger.error(e)
async def fetch_notifications(self, queue, widget_id: str):
"""
주어진 widget_id에 대한 알림을 가져와 queue에 저장합니다.
연결이 끊어지면 최대 MAX_RETRIES 횟수만큼 재시도합니다.
queue: 알림 데이터를 저장할 asyncio 큐
widget_id: 알림을 가져올 위젯 ID
"""
retries = 0
while retries < MAX_RETRIES:
try:
logger.info(f"retries: {retries}")
payload = self.get_websocket_payload(widget_id)
wss_url = f"wss://toon.at:8071/{payload}"
async with connect(wss_url, ping_interval=12) as websocket:
token = json.loads(await websocket.recv())['token']
logger.debug(f"token : {token}")
logger.info(f"[fetch_notifications] 웹 소켓 연결 : {widget_id}")
while True:
message = await websocket.recv()
logger.debug(f"message: {message}")
alert_data = json.loads(message)
try:
reformed_data = Donate(**alert_data)
except Exception as e:
logger.error(f"[fetch_notifications] Error: {e}, Raw data: {alert_data}")
pass
await queue.put({widget_id: json.loads(reformed_data.json().encode('utf-8').decode('unicode_escape'))})
except asyncio.CancelledError:
logger.info(f"[fetch_notifications] 웹 소켓 연결 취소 : {widget_id}")
break
except Exception as e:
retries += 1
logger.error(f"[fetch_notifications] 웹 소켓 연결 실패 : {e}, 재시도 : {retries}")
await asyncio.sleep(2 ** retries)
if retries > 2:
self.remove_widget_id(widget_id)
logger.info(f"[fetch_notifications] 연결 실패 : {widget_id}")
@staticmethod
def get_websocket_payload(widget_id:str) -> str:
"""
주어진 widget_id에 대한 WebSocket 연결 페이로드를 반환합니다.
widget_id: 페이로드를 얻을 위젯 ID
"""
res = requests.get(f"https://toon.at/widget/alertbox/{widget_id}")
match = re.search(r"payload(.+)", res.text)
if not match:
raise ValueError("Payload not found")
payload = json.loads(match.group()[10:-1])["payload"]
return payload
async def add_widget_and_start_fetching_notifications(self, widget_id: str) -> str:
"""
주어진 widget_id를 위젯 ID 목록에 추가하고 알림을 가져오는 작업을 시작합니다.
widget_id: 추가할 위젯 ID
"""
if widget_id not in self.WIDGET_IDS:
self.add_widget_id(widget_id)
task = asyncio.create_task(self.fetch_notifications(self.queue, widget_id))
self.fetch_tasks.append(task)
logger.info(f"[add_widget_and_start_fetching_notifications] 투네이션 알림 수신 시작 : {widget_id}")
return widget_id
else:
logger.error(f"[add_widget_and_start_fetching_notifications] 이미 투네이션 알림 수신 중 : {widget_id}")
return ''
async def remove_widget_and_stop_fetching_notifications(self, widget_id: str) -> str:
"""
주어진 widget_id를 위젯 ID 목록에서 제거하고 알림을 가져오는 작업을 중단합니다.
widget_id: 제거할 위젯 ID
"""
if widget_id in self.WIDGET_IDS:
index = self.WIDGET_IDS.index(widget_id)
self.remove_widget_id(widget_id)
fetch_task = self.fetch_tasks.pop(index)
fetch_task.cancel()
await fetch_task
logger.info(f"[remove_widget_and_stop_fetching_notifications] 투네이션 알림 수신 중단 : {widget_id}")
return widget_id
else:
logger.error(f"[remove_widget_and_stop_fetching_notifications] 수신중인 투네이션 알림을 찾을 수 없음 {widget_id}")
return ''
async def pop_notification_from_queue(self) -> dict:
"""
queue에서 가장 먼저 저장된 항목을 꺼내어 반환합니다.
queue가 비어있는 경우 None을 반환합니다.
"""
if not self.queue.empty():
item = await self.queue.get()
return item
else:
return {}
async def get_connection(self, url: str):
if url not in self.connections:
self.connections[url] = aiohttp.ClientSession()
return self.connections[url]
async def send_notifications_to_subscribers(self, widget_id: str, donate):
"""
주어진 widget_id에 해당하는 모든 구독자들에게 알림을 전송합니다.
widget_id: 알림을 보낼 구독자들의 위젯 ID
donate: 전송할 알림 데이터
"""
subscribers = self.get_subscribers(widget_id)
tasks = [asyncio.create_task(self.send_notification_webhook(await self.get_connection(subscriber_url), subscriber_url, donate)) for subscriber_url in subscribers]
await asyncio.gather(*tasks)
async def send_notification_webhook(self, session, subscriber_url, donate):
payload = {
"text": f"**{donate['content']['name']}({donate['content']['account']})**\n{donate}",
"data": donate
}
async with session.post(subscriber_url, json=payload) as response:
if response.status == 200:
logger.info(f"[send_notification_webhook] 알림 전송 {subscriber_url}, {payload}")
else:
logger.error(f"[send_notification_webhook] 알림 전송 실패 {subscriber_url}, 응답: {response.status}")
async def run(self):
await self.initialize_widgets()
while True:
if not self.queue.empty():
task = asyncio.create_task(self.process_notification())
self.processing_tasks.append(task)
await task
await asyncio.sleep(0.1)
async def close(self):
for session in self.connections.values():
await session.close()