forked from RealmTeam/slack-sounds
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsounds.py
executable file
·222 lines (184 loc) · 6.85 KB
/
sounds.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
#!/usr/bin/env python
import time
import subprocess
from datetime import datetime
from distutils.util import strtobool
import os
import re
import json
import urllib
from slackclient import SlackClient
BASE_DIR = os.path.dirname(os.path.realpath(__file__))
SOUNDS_DIR = os.path.join(BASE_DIR, 'sounds')
CONFIG_FILE = os.path.join(BASE_DIR, 'config.json')
PLAYER = 'afplay'
FILETYPE = 'mp3'
DEFAULT_OPTIONS = {
"_token": None,
"throttling": True,
"throttling_reset": 10 * 60,
"throttling_count": 5
}
PLAY_REGEX = re.compile("play\s([a-z0-9_' -]+)", re.IGNORECASE)
REMOVE_REGEX = re.compile("remove\s([a-z0-9_' -]+)", re.IGNORECASE)
UPDATE_CONF_REGEX = re.compile("^set ([A-Z0-9_]+) to ([A-Z0-9_]+)$", re.IGNORECASE)
SHOW_CONF_REGEX = re.compile("^show conf$", re.IGNORECASE)
LIST_SOUNDS_REGEX = re.compile("list\ssounds", re.IGNORECASE)
def load_config():
config = {}
with open(CONFIG_FILE, 'r') as f:
config = json.loads(f.read())
for key, value in DEFAULT_OPTIONS.iteritems():
config.setdefault(key, value)
return config
def write_config(config):
with open(CONFIG_FILE, 'w') as f:
f.write(json.dumps(config))
def find_sound(sound_name):
directories = (file_ for file_ in os.listdir(SOUNDS_DIR)
if os.path.isdir(os.path.join(SOUNDS_DIR, file_)))
for d in directories:
path = os.path.join(SOUNDS_DIR, d, '{}.{}'.format(sound_name.replace(' ', '_'), FILETYPE))
if os.path.isfile(path):
return path
def play_action(match, user, config):
sound_name = match.group(1).strip()
sound_file = find_sound(sound_name)
def throttle():
if not config["throttling"] or user["is_admin"]:
return False, None
record = throttling_record.get(user["name"], {"time": time.time(), "count": 0})
if (time.time() - record["time"]) < config["throttling_reset"]:
record["count"] += 1
else:
record["count"] = 1
record["time"] = time.time()
throttling_record[user["name"]] = record
return record["count"] > config["throttling_count"], record
if sound_file:
throttled, record = throttle()
if throttled:
message = 'You reached your throttling limit. Try again later.'
else:
message = 'Playing ' + sound_name
subprocess.Popen([PLAYER, "{}".format(sound_file)])
if record:
message += '\n {} plays left. Reset at {}.'.format(
config["throttling_count"] - record["count"],
datetime.fromtimestamp(record["time"] + config["throttling_reset"]).strftime('%H:%M:%S')
)
else:
message = 'No sound matching ' + sound_name
return message
def remove_action(match, user, config):
if not user["is_admin"]:
return
sound_name = match.group(1).strip()
sound_file = find_sound(sound_name)
if sound_file:
os.remove(sound_file)
message = 'Removed ' + sound_name
else:
message = 'No sound matching ' + sound_name
return message
def list_sounds_action(match, user, config):
message = '```\nAvailable sounds are :\n'
directories = sorted(file_ for file_ in os.listdir(SOUNDS_DIR)
if os.path.isdir(os.path.join(SOUNDS_DIR, file_)))
def split_by_cols(l, n=4):
output = ''
for row in (l[i:i + n] for i in xrange(0, len(l), n)):
fmt = "| {:<30s} " * len(row)
output += fmt.format(*row) + '\n'
return output
for directory in directories:
message += '\n' + directory.upper() + ':\n'
sounds = sorted(s.split('.')[0].replace('_', ' ') for s in os.listdir(os.path.join(SOUNDS_DIR, directory)))
message += split_by_cols(sounds)
message += '```'
return message
def show_conf_action(match, user, config):
if not user["is_admin"]:
return
message = ''
for key, value in config.iteritems():
message += '{}: {}\n'.format(key, value)
return message
def update_conf_action(match, user, config):
if not user["is_admin"]:
return
key = match.group(1)
value = match.group(2)
if key.startswith('_'):
return "Can't set private variables"
try:
value = int(value)
except ValueError:
try:
value = bool(strtobool(value))
except ValueError:
pass
config[key] = value
write_config(config)
return "Config set"
ACTIONS = {
PLAY_REGEX: play_action,
REMOVE_REGEX: remove_action,
UPDATE_CONF_REGEX: update_conf_action,
SHOW_CONF_REGEX: show_conf_action,
LIST_SOUNDS_REGEX: list_sounds_action,
}
def add_sound(file_id):
info = sc.api_call("files.info", file=file_id)
file_url = info.get("file").get("url_private_download") if info["ok"] else ''
filename = info.get("file").get("title") if info["ok"] else ''
if filename.endswith('.mp3') and file_url.endswith('.mp3'):
folder = 'misc'
if ':' in filename:
folder, filename = filename.split(':')
try:
os.makedirs(os.path.join(SOUNDS_DIR, folder.strip()))
except OSError:
pass
urllib.urlretrieve(file_url, os.path.join(SOUNDS_DIR, folder.strip(), filename.strip()))
def load_users():
user_list = sc.api_call("users.list")
users = {}
for user in user_list["members"]:
users[user["id"]] = {
"name": user["name"],
"is_admin": user.get("is_admin", False),
"id": user["id"]
}
return users
if __name__ == '__main__':
throttling_record = {}
config = load_config()
sc = SlackClient(config["_token"])
if sc.rtm_connect():
bot_id = sc.api_call("auth.test")["user_id"]
users = load_users()
while True:
for event in sc.rtm_read():
event_type = event.get('type', None)
if event_type == 'message':
text = event.get('text', '')
user = users.get(event.get('user', None), None)
channel = event.get('channel', None)
if not user or not text or not channel:
continue
message = None
for regex, action in ACTIONS.iteritems():
match = regex.match(text)
if match:
message = action(match, user, config)
break
if message:
sc.api_call("chat.postEphemeral", channel=channel, text=message, user=user["id"])
elif event_type == 'file_created' or event_type == 'file_shared':
file_id = event.get('file', {}).get('id', None)
if file_id:
add_sound(file_id)
time.sleep(1);
else:
print 'Connection failed, invalid token?'