최종 개선
This commit is contained in:
@ -22,8 +22,6 @@ config = configparser.ConfigParser()
|
|||||||
config.read("config/settings.ini")
|
config.read("config/settings.ini")
|
||||||
|
|
||||||
MAX_RETRIES = config.getint("config", "MAX_RETRIES")
|
MAX_RETRIES = config.getint("config", "MAX_RETRIES")
|
||||||
SERVER_IP = config.get("server", "ip")
|
|
||||||
SERVER_PORT = config.get("server", "port")
|
|
||||||
|
|
||||||
# Create logger
|
# Create logger
|
||||||
logger = logging.getLogger()
|
logger = logging.getLogger()
|
||||||
@ -31,7 +29,7 @@ logger.setLevel(logging.DEBUG)
|
|||||||
|
|
||||||
# Create file handler
|
# Create file handler
|
||||||
log_filename = datetime.now().strftime('logs/%Y-%m-%d_%H:%M:%S_damination') + '.log'
|
log_filename = datetime.now().strftime('logs/%Y-%m-%d_%H:%M:%S_damination') + '.log'
|
||||||
file_handler = logging.handlers.RotatingFileHandler(log_filename, maxBytes=100000, backupCount=3)
|
file_handler = logging.handlers.RotatingFileHandler(log_filename, maxBytes=10000000, backupCount=3)
|
||||||
file_handler.setLevel(logging.INFO)
|
file_handler.setLevel(logging.INFO)
|
||||||
file_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
|
file_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
|
||||||
file_handler.setFormatter(file_formatter)
|
file_handler.setFormatter(file_formatter)
|
||||||
@ -60,7 +58,7 @@ class Content(BaseModel):
|
|||||||
count : int | None
|
count : int | None
|
||||||
amount : int | None
|
amount : int | None
|
||||||
roulette : Roulette | None
|
roulette : Roulette | None
|
||||||
|
streamer : str
|
||||||
|
|
||||||
class Donate(BaseModel):
|
class Donate(BaseModel):
|
||||||
code : str
|
code : str
|
||||||
@ -94,15 +92,22 @@ class DaminationAPI():
|
|||||||
|
|
||||||
|
|
||||||
async def process_notification(self):
|
async def process_notification(self):
|
||||||
while True:
|
|
||||||
donate = await self.pop_notification_from_queue()
|
donate = await self.pop_notification_from_queue()
|
||||||
if donate is not None:
|
if len(donate):
|
||||||
|
try:
|
||||||
for widget_id, value in donate.items():
|
for widget_id, value in donate.items():
|
||||||
|
|
||||||
|
logger.debug(f"[process_notification] Raw Data : widget_id: {widget_id}, value: {value}")
|
||||||
await self.send_notifications_to_subscribers(widget_id, value)
|
await self.send_notifications_to_subscribers(widget_id, value)
|
||||||
|
|
||||||
logger.debug(f"[process_notification] QUEUE 추출: {widget_id, value}")
|
logger.debug(f"[process_notification] QUEUE 추출: {widget_id, value}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"[process_notification] Raw Data : {donate}, {e}")
|
||||||
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
logger.error("[process_notification] QUEUE 추출 실패 - 저장된 QUEUE 없음")
|
logger.error("[process_notification] QUEUE 추출 실패 - 저장된 QUEUE 없음")
|
||||||
break
|
|
||||||
|
|
||||||
|
|
||||||
def get_subscribers(self, widget_id: str) -> list:
|
def get_subscribers(self, widget_id: str) -> list:
|
||||||
@ -150,6 +155,7 @@ class DaminationAPI():
|
|||||||
"""
|
"""
|
||||||
현재 설정된 모든 위젯 ID 목록을 반환합니다.
|
현재 설정된 모든 위젯 ID 목록을 반환합니다.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if self.WIDGET_IDS:
|
if self.WIDGET_IDS:
|
||||||
logger.info(f"[get_widget_ids] WIDGET ID 반환 {self.WIDGET_IDS}")
|
logger.info(f"[get_widget_ids] WIDGET ID 반환 {self.WIDGET_IDS}")
|
||||||
return self.WIDGET_IDS
|
return self.WIDGET_IDS
|
||||||
@ -164,6 +170,7 @@ class DaminationAPI():
|
|||||||
주어진 widget_id를 위젯 ID 목록에 추가합니다.
|
주어진 widget_id를 위젯 ID 목록에 추가합니다.
|
||||||
widget_id: 추가할 위젯 ID
|
widget_id: 추가할 위젯 ID
|
||||||
"""
|
"""
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if widget_id not in self.WIDGET_IDS:
|
if widget_id not in self.WIDGET_IDS:
|
||||||
self.WIDGET_IDS.append(widget_id)
|
self.WIDGET_IDS.append(widget_id)
|
||||||
@ -211,7 +218,6 @@ class DaminationAPI():
|
|||||||
|
|
||||||
while retries < MAX_RETRIES:
|
while retries < MAX_RETRIES:
|
||||||
try:
|
try:
|
||||||
logger.info(f"retries: {retries}")
|
|
||||||
payload = self.get_websocket_payload(widget_id)
|
payload = self.get_websocket_payload(widget_id)
|
||||||
wss_url = f"wss://toon.at:8071/{payload}"
|
wss_url = f"wss://toon.at:8071/{payload}"
|
||||||
|
|
||||||
@ -224,13 +230,14 @@ class DaminationAPI():
|
|||||||
message = await websocket.recv()
|
message = await websocket.recv()
|
||||||
logger.debug(f"message: {message}")
|
logger.debug(f"message: {message}")
|
||||||
alert_data = json.loads(message)
|
alert_data = json.loads(message)
|
||||||
|
alert_data["content"]["streamer"] = token["twitch"]["account"]
|
||||||
|
logger.info(f"alert_data : {alert_data}")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
reformed_data = Donate(**alert_data)
|
reformed_data = Donate(**alert_data)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"[fetch_notifications] Error: {e}, Raw data: {alert_data}")
|
logger.error(f"[fetch_notifications] Error: {e}, Raw data: {alert_data}")
|
||||||
|
|
||||||
pass
|
pass
|
||||||
|
|
||||||
await queue.put({widget_id: json.loads(reformed_data.json().encode('utf-8').decode('unicode_escape'))})
|
await queue.put({widget_id: json.loads(reformed_data.json().encode('utf-8').decode('unicode_escape'))})
|
||||||
@ -246,7 +253,7 @@ class DaminationAPI():
|
|||||||
|
|
||||||
if retries > 2:
|
if retries > 2:
|
||||||
self.remove_widget_id(widget_id)
|
self.remove_widget_id(widget_id)
|
||||||
logger.info(f"[fetch_notifications] 연결 실패 : {widget_id}")
|
logger.error(f"[fetch_notifications] 연결 실패 : {widget_id}")
|
||||||
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
@ -258,6 +265,7 @@ class DaminationAPI():
|
|||||||
|
|
||||||
res = requests.get(f"https://toon.at/widget/alertbox/{widget_id}")
|
res = requests.get(f"https://toon.at/widget/alertbox/{widget_id}")
|
||||||
match = re.search(r"payload(.+)", res.text)
|
match = re.search(r"payload(.+)", res.text)
|
||||||
|
|
||||||
if not match:
|
if not match:
|
||||||
raise ValueError("Payload not found")
|
raise ValueError("Payload not found")
|
||||||
|
|
||||||
@ -288,6 +296,7 @@ class DaminationAPI():
|
|||||||
주어진 widget_id를 위젯 ID 목록에서 제거하고 알림을 가져오는 작업을 중단합니다.
|
주어진 widget_id를 위젯 ID 목록에서 제거하고 알림을 가져오는 작업을 중단합니다.
|
||||||
widget_id: 제거할 위젯 ID
|
widget_id: 제거할 위젯 ID
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if widget_id in self.WIDGET_IDS:
|
if widget_id in self.WIDGET_IDS:
|
||||||
index = self.WIDGET_IDS.index(widget_id)
|
index = self.WIDGET_IDS.index(widget_id)
|
||||||
self.remove_widget_id(widget_id)
|
self.remove_widget_id(widget_id)
|
||||||
@ -296,6 +305,7 @@ class DaminationAPI():
|
|||||||
await fetch_task
|
await fetch_task
|
||||||
logger.info(f"[remove_widget_and_stop_fetching_notifications] 투네이션 알림 수신 중단 : {widget_id}")
|
logger.info(f"[remove_widget_and_stop_fetching_notifications] 투네이션 알림 수신 중단 : {widget_id}")
|
||||||
return widget_id
|
return widget_id
|
||||||
|
|
||||||
else:
|
else:
|
||||||
logger.error(f"[remove_widget_and_stop_fetching_notifications] 수신중인 투네이션 알림을 찾을 수 없음 {widget_id}")
|
logger.error(f"[remove_widget_and_stop_fetching_notifications] 수신중인 투네이션 알림을 찾을 수 없음 {widget_id}")
|
||||||
return ''
|
return ''
|
||||||
@ -306,9 +316,11 @@ class DaminationAPI():
|
|||||||
queue에서 가장 먼저 저장된 항목을 꺼내어 반환합니다.
|
queue에서 가장 먼저 저장된 항목을 꺼내어 반환합니다.
|
||||||
queue가 비어있는 경우 None을 반환합니다.
|
queue가 비어있는 경우 None을 반환합니다.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if not self.queue.empty():
|
if not self.queue.empty():
|
||||||
item = await self.queue.get()
|
item = await self.queue.get()
|
||||||
return item
|
return item
|
||||||
|
|
||||||
else:
|
else:
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
@ -316,6 +328,7 @@ class DaminationAPI():
|
|||||||
async def get_connection(self, url: str):
|
async def get_connection(self, url: str):
|
||||||
if url not in self.connections:
|
if url not in self.connections:
|
||||||
self.connections[url] = aiohttp.ClientSession()
|
self.connections[url] = aiohttp.ClientSession()
|
||||||
|
|
||||||
return self.connections[url]
|
return self.connections[url]
|
||||||
|
|
||||||
|
|
||||||
@ -325,6 +338,7 @@ class DaminationAPI():
|
|||||||
widget_id: 알림을 보낼 구독자들의 위젯 ID
|
widget_id: 알림을 보낼 구독자들의 위젯 ID
|
||||||
donate: 전송할 알림 데이터
|
donate: 전송할 알림 데이터
|
||||||
"""
|
"""
|
||||||
|
|
||||||
subscribers = self.get_subscribers(widget_id)
|
subscribers = self.get_subscribers(widget_id)
|
||||||
tasks = [asyncio.create_task(self.send_notification_webhook(await self.get_connection(subscriber_url), subscriber_url, donate)) for subscriber_url in subscribers]
|
tasks = [asyncio.create_task(self.send_notification_webhook(await self.get_connection(subscriber_url), subscriber_url, donate)) for subscriber_url in subscribers]
|
||||||
await asyncio.gather(*tasks)
|
await asyncio.gather(*tasks)
|
||||||
@ -332,13 +346,14 @@ class DaminationAPI():
|
|||||||
|
|
||||||
async def send_notification_webhook(self, session, subscriber_url, donate):
|
async def send_notification_webhook(self, session, subscriber_url, donate):
|
||||||
payload = {
|
payload = {
|
||||||
"text": f"**{donate['content']['name']}({donate['content']['account']})**\n{donate}",
|
"text": f"**{donate['content']['name']}({donate['content']['account']}) to {donate['content']['streamer']}**\n{donate}",
|
||||||
"data": donate
|
"data": donate
|
||||||
}
|
}
|
||||||
|
|
||||||
async with session.post(subscriber_url, json=payload) as response:
|
async with session.post(subscriber_url, json=payload) as response:
|
||||||
if response.status == 200:
|
if response.status == 200:
|
||||||
logger.info(f"[send_notification_webhook] 알림 전송 {subscriber_url}, {payload}")
|
logger.info(f"[send_notification_webhook] 알림 전송 {subscriber_url}, {payload}")
|
||||||
|
|
||||||
else:
|
else:
|
||||||
logger.error(f"[send_notification_webhook] 알림 전송 실패 {subscriber_url}, 응답: {response.status}")
|
logger.error(f"[send_notification_webhook] 알림 전송 실패 {subscriber_url}, 응답: {response.status}")
|
||||||
|
|
||||||
@ -351,6 +366,7 @@ class DaminationAPI():
|
|||||||
task = asyncio.create_task(self.process_notification())
|
task = asyncio.create_task(self.process_notification())
|
||||||
self.processing_tasks.append(task)
|
self.processing_tasks.append(task)
|
||||||
await task
|
await task
|
||||||
|
|
||||||
await asyncio.sleep(0.1)
|
await asyncio.sleep(0.1)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -3,7 +3,3 @@ MAX_RETRIES=3
|
|||||||
|
|
||||||
[widget]
|
[widget]
|
||||||
ids=
|
ids=
|
||||||
|
|
||||||
[server]
|
|
||||||
ip=192.168.1.210
|
|
||||||
port=8113
|
|
||||||
|
|||||||
Reference in New Issue
Block a user