espelhamento de https://github.com/DanilaFe/AdventBot.git
170 linhas
5.7 KiB
Python
170 linhas
5.7 KiB
Python
import discord
|
|
from discord.ext import tasks
|
|
import os
|
|
import sqlite3
|
|
from collections import defaultdict
|
|
from aiohttp import ClientSession
|
|
import json
|
|
|
|
client = discord.Client()
|
|
conn = sqlite3.connect('data.db')
|
|
star_names = { '1': 'silver', '2': 'gold' }
|
|
aoc_session = os.getenv('AOC_SESSION')
|
|
bot_token = os.getenv('AOC_BOT_TOKEN')
|
|
|
|
@client.event
|
|
async def on_ready():
|
|
print('We have logged in as {0.user}'.format(client))
|
|
|
|
@client.event
|
|
async def on_message(message):
|
|
if message.content.startswith("!setup"):
|
|
leaderboard = int(message.content[7:])
|
|
conn.execute('''
|
|
insert into guild(id, channel, leaderboard, last_sync)
|
|
values (?, ?, ?, ?) on conflict(id) do
|
|
update set channel=excluded.channel, leaderboard=excluded.leaderboard''',
|
|
(message.guild.id, message.channel.id, leaderboard, None))
|
|
conn.commit()
|
|
await message.channel.send(
|
|
'Updated {0.guild} to use channel {0.channel} and leaderboard {1}!'
|
|
.format(message, leaderboard))
|
|
|
|
def setup():
|
|
'''
|
|
Create all the necessary tables and all that.
|
|
'''
|
|
|
|
conn.execute('''
|
|
CREATE TABLE IF NOT EXISTS
|
|
guild(id int primary key, channel int, leaderboard int, last_sync text)
|
|
''')
|
|
|
|
def member_stars(member):
|
|
'''
|
|
Find the stars attributed to a single member
|
|
in the JSON response. Takes a JSON object
|
|
from the 'members' dict.
|
|
'''
|
|
|
|
star_list = []
|
|
for day in member['completion_day_level']:
|
|
stars = member['completion_day_level'][day]
|
|
for (star, name) in star_names.items():
|
|
if star not in stars: continue
|
|
star_list.append(
|
|
(member['name'], day, name, stars[star]['get_star_ts']))
|
|
return star_list
|
|
|
|
def all_stars(json):
|
|
'''
|
|
Find all the stars in a given JSON response.
|
|
The stars are not ordered in any way.
|
|
'''
|
|
|
|
return [star for m in json['members'].values() for star in member_stars(m)]
|
|
|
|
def stars_by_day(stars):
|
|
'''
|
|
Groups stars by day, then by type (silver/gold).
|
|
Helpful when trying to figure out if a given star
|
|
was "early" (first/second/third) for a particular puzzle.
|
|
'''
|
|
|
|
by_day = defaultdict(lambda: defaultdict(lambda: []))
|
|
for star in stars:
|
|
by_day[star[1]][star[2]].append(star)
|
|
return by_day
|
|
|
|
def read_json(s):
|
|
'''
|
|
Try to parse a JSON response. If the argument
|
|
is None (which it can be if we're pulling from a fresh database)
|
|
fill it with just enough dummy enough to make the update
|
|
algorithms work.
|
|
'''
|
|
|
|
if s is None: return {'members': {}}
|
|
return json.loads(s)
|
|
|
|
def detect_early_stars(stars):
|
|
'''
|
|
Detects stars for each puzzle that are 'first', 'second', and 'third'.
|
|
This helps make the announcements a little more interesting!
|
|
'''
|
|
|
|
by_day = stars_by_day(stars)
|
|
early_stars = []
|
|
for (day, names) in by_day.items():
|
|
for (name, stars) in names.items():
|
|
stars.sort(key=lambda star: int(star[3]))
|
|
early_stars += list(zip(['first', 'second', 'third'], stars))
|
|
return early_stars
|
|
|
|
def find_updates(old_json, new_json):
|
|
'''
|
|
Find interesting differences between the old JSON and the new JSON,
|
|
both given as either strings or None.
|
|
|
|
Interesting differences include member joins (for whom
|
|
we do not print new stars, since they weren't on the learderboard
|
|
when they won them), early stars (first/second/third) and other
|
|
stars (anyone winning a star at any point).
|
|
'''
|
|
|
|
old_json = read_json(old_json)
|
|
new_json = read_json(new_json)
|
|
|
|
old_stars = all_stars(old_json)
|
|
new_stars = all_stars(new_json)
|
|
|
|
join = [new_json['members'][m]['name'] for m in new_json['members'] if m not in old_json['members']]
|
|
early_stars = [ star for star in detect_early_stars(new_stars)
|
|
if star[1] not in old_stars
|
|
and star[1][0] not in join ]
|
|
skip_stars = [early_star for (place, early_star) in early_stars]
|
|
ann_stars = [ star for star in new_stars
|
|
if star not in skip_stars
|
|
and star not in old_stars
|
|
and star[0] not in join ]
|
|
return { 'join': join, 'early_stars': early_stars, 'ann_stars': ann_stars }
|
|
|
|
async def send_updates(id, channel, diff):
|
|
'''
|
|
Send updates via the Discord client given a diff
|
|
produced by `find_updates`.
|
|
'''
|
|
|
|
announcements = []
|
|
for user in diff['join']:
|
|
full_text = "User {0} joined the leaderboard!".format(user)
|
|
announcements.append(full_text)
|
|
for (place, (user, day, star, ts)) in diff['early_stars']:
|
|
full_text = "{0} won the {1} {2} star from day {3}!".format(user, place, star, day)
|
|
announcements.append(full_text)
|
|
for (user, day, star, ts) in diff['ann_stars']:
|
|
full_text = "{0} won the {1} star from day {2}!".format(user, star, day)
|
|
announcements.append(full_text)
|
|
|
|
if len(announcements) != 0:
|
|
channel = await client.fetch_channel(channel)
|
|
await channel.send("\n".join(announcements))
|
|
|
|
@tasks.loop(minutes=1.0)
|
|
async def update_aoc():
|
|
url_string = "https://adventofcode.com/2020/leaderboard/private/view/{0}.json"
|
|
cookies = { 'session': aoc_session }
|
|
|
|
async with ClientSession(cookies=cookies) as session:
|
|
for (id, channel, leaderboard, json) in conn.execute('select * from guild'):
|
|
async with session.get(url_string.format(leaderboard)) as resp:
|
|
new_json = await resp.text()
|
|
diff = find_updates(json, new_json)
|
|
await send_updates(id, channel, diff)
|
|
conn.execute('update guild set last_sync=? where id=?', (new_json, id))
|
|
conn.commit()
|
|
|
|
setup()
|
|
update_aoc.start()
|
|
client.run(bot_token)
|