1차 수정

This commit is contained in:
2023-04-27 23:22:03 +09:00
parent 97a9ad6c72
commit 2f563e4324
2 changed files with 51 additions and 58 deletions

View File

@ -2,7 +2,6 @@ import asyncio
import json import json
import re import re
import requests import requests
from aiohttp import web
from websockets import connect from websockets import connect
import logging.handlers import logging.handlers
import configparser import configparser
@ -23,13 +22,6 @@ config = configparser.ConfigParser()
config.read("config/settings.ini") config.read("config/settings.ini")
if config.has_option("widget", "ids"):
WIDGET_IDS = config.get("widget", "ids").split(",")
if not WIDGET_IDS[0]:
WIDGET_IDS = []
else:
WIDGET_IDS = []
MAX_RETRIES = config.getint("config", "MAX_RETRIES") MAX_RETRIES = config.getint("config", "MAX_RETRIES")
SERVER_IP = config.get("server", "ip") SERVER_IP = config.get("server", "ip")
@ -79,13 +71,27 @@ class Donate(BaseModel):
class DaminationAPI(): class DaminationAPI():
def __init__(self): def __init__(self):
self.WIDGET_IDS = WIDGET_IDS self.WIDGET_IDS = []
self.queue = asyncio.Queue() self.queue = asyncio.Queue()
self.fetch_tasks = [] self.fetch_tasks = []
self.stop_event = asyncio.Event()
self.subscribers = {} self.subscribers = {}
self.session = aiohttp.ClientSession() self.session = aiohttp.ClientSession()
self.connections = {}
self.processing_tasks = []
async def init_widget(self):
if config.has_option("widget", "ids"):
for widget in config.get("widget", "ids").split(","):
await self.add_widget_id_and_start_fetching(widget)
async def process_notification(self):
while True:
donate = await self.pop_from_queue()
if donate is not None:
for widget_id, value in donate.items():
await self.notify_subscribers(widget_id, value)
else:
break
def add_subscriber(self, widget_id: str, url: str) -> bool: def add_subscriber(self, widget_id: str, url: str) -> bool:
if widget_id not in self.subscribers: if widget_id not in self.subscribers:
@ -167,8 +173,6 @@ class DaminationAPI():
logger.debug(f"token : {token}") logger.debug(f"token : {token}")
logger.info(f"WebSocket connection established for widget_id: {widget_id}") logger.info(f"WebSocket connection established for widget_id: {widget_id}")
while True: while True:
if self.stop_event.is_set(): # 추가: 중단 이벤트가 설정된 경우, 작업을 중단
break
message = await websocket.recv() message = await websocket.recv()
logger.debug(f"message: {message}") logger.debug(f"message: {message}")
@ -212,30 +216,7 @@ class DaminationAPI():
return payload return payload
async def start_fetching_alerts(self): async def add_widget_id_and_start_fetching(self, widget_id: str) -> bool:
"""
설정된 모든 위젯 ID에 대해 알림을 가져오는 작업을 시작합니다.
"""
self.stop_event.clear() # 추가: 중단 이벤트를 초기화합니다.
for widget_id in self.WIDGET_IDS:
task = asyncio.create_task(self.fetch_alerts(self.queue, widget_id))
self.fetch_tasks.append(task)
await asyncio.gather(*self.fetch_tasks)
async def stop_fetching_alerts(self):
"""
설정된 모든 위젯 ID에 대한 알림 가져오기 작업을 중단합니다.
중단 이벤트를 설정하고 모든 fetch_tasks를 기다립니다.
"""
self.stop_event.set()
await asyncio.gather(*self.fetch_tasks)
async def add_widget_id_and_start_fetching(self, widget_id: str):
""" """
주어진 widget_id를 위젯 ID 목록에 추가하고 알림을 가져오는 작업을 시작합니다. 주어진 widget_id를 위젯 ID 목록에 추가하고 알림을 가져오는 작업을 시작합니다.
widget_id: 추가할 위젯 ID widget_id: 추가할 위젯 ID
@ -245,9 +226,11 @@ class DaminationAPI():
self.add_widget_id(widget_id) self.add_widget_id(widget_id)
task = asyncio.create_task(self.fetch_alerts(self.queue, widget_id)) task = asyncio.create_task(self.fetch_alerts(self.queue, widget_id))
self.fetch_tasks.append(task) self.fetch_tasks.append(task)
return True
else: else:
logger.warning(f"Widget ID {widget_id} already exists in the list.") logger.warning(f"Widget ID {widget_id} already exists in the list.")
return False
async def remove_widget_id_and_stop_fetching(self, widget_id: str): async def remove_widget_id_and_stop_fetching(self, widget_id: str):
""" """
@ -275,6 +258,11 @@ class DaminationAPI():
else: else:
return None return None
async def get_connection(self, url: str):
if url not in self.connections:
self.connections[url] = aiohttp.ClientSession()
return self.connections[url]
def get_subscribers(self, widget_id: str) -> list: def get_subscribers(self, widget_id: str) -> list:
""" """
주어진 widget_id에 해당하는 구독자 목록을 반환합니다. 주어진 widget_id에 해당하는 구독자 목록을 반환합니다.
@ -286,37 +274,39 @@ class DaminationAPI():
else: else:
return [] return []
async def notify_subscribers(self, widget_id, donate): async def notify_subscribers(self, widget_id: str, donate):
""" """
주어진 widget_id에 해당하는 모든 구독자들에게 알림을 전송합니다. 주어진 widget_id에 해당하는 모든 구독자들에게 알림을 전송합니다.
widget_id: 알림을 보낼 구독자들의 위젯 ID widget_id: 알림을 보낼 구독자들의 위젯 ID
donate: 전송할 알림 데이터 donate: 전송할 알림 데이터
""" """
subscribers = self.get_subscribers(widget_id) subscribers = self.get_subscribers(widget_id)
tasks = [self.notify_subscriber(self.session, subscriber_url, donate) for subscriber_url in subscribers] # 변경: 기존 클래스 세션 사용 tasks = [asyncio.create_task(self.send_webhook(await self.get_connection(subscriber_url), subscriber_url, donate)) for subscriber_url in subscribers] # 변경: self.get_connection() 사용
await asyncio.gather(*tasks) await asyncio.gather(*tasks)
async def notify_subscriber(self, session, subscriber_url, donate):
""" async def send_webhook(self, session, subscriber_url, donate):
주어진 구독자에게 알림을 전송합니다. payload = {
session: aiohttp ClientSession 객체 "text": f"**{donate['content']['name']}({donate['content']['account']})**\n{donate}",
subscriber_url: 알림을 받을 구독자의 URL "data": donate
donate: 전송할 알림 데이터 }
"""
async with session.post(subscriber_url, json=donate) as response: async with session.post(subscriber_url, json=payload) as response:
if response.status // 100 == 2: if response.status == 200:
logger.info(f"Successfully sent notification to {subscriber_url}.") logger.info(f"Successfully sent Webhook to {subscriber_url}, {payload}")
else: else:
logger.error(f"Failed to send notification to {subscriber_url} with status code {response.status}.") logger.error(f"Failed to send notification to {subscriber_url} with status code {response.status}.")
async def run(self): async def run(self):
await self.start_fetching_alerts() await self.init_widget()
while True: while True:
if not self.queue.empty(): if not self.queue.empty():
donate = await self.pop_from_queue() task = asyncio.create_task(self.process_notification())
for widget_id, value in donate.items(): self.processing_tasks.append(task)
await self.notify_subscribers(widget_id, value) await task
await asyncio.sleep(2) await asyncio.sleep(0.1)
async def close(self):
for session in self.connections.values():
await session.close()

View File

@ -58,8 +58,11 @@ async def add_widget(request):
widget_id: 추가할 위젯 ID widget_id: 추가할 위젯 ID
""" """
widget_id = request.match_info['widget_id'] widget_id = request.match_info['widget_id']
await toonat.add_widget_id_and_start_fetching(widget_id) result = await toonat.add_widget_id_and_start_fetching(widget_id)
if result:
return web.Response(text=f"Widget ID {widget_id} added and fetching started.") return web.Response(text=f"Widget ID {widget_id} added and fetching started.")
else:
return web.Response(text=f"Widget ID {widget_id} already exists in the list.")
@routes.get('/remove_widget/{widget_id}') @routes.get('/remove_widget/{widget_id}')
async def remove_widget(request): async def remove_widget(request):