-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathpacker.py
292 lines (265 loc) · 11.4 KB
/
packer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
import codecs
import glob
import json
import os
import re
import subprocess
import sys
import time
import zipfile
from datetime import datetime
folder_ix_all = re.compile(r'mods/[.\d]*( Common Test)?/')
printed = False
def get_git_date(path):
try:
return subprocess.check_output('git log -n 1 --format="%ct" --'.split() + [path])[1:-2]
except subprocess.CalledProcessError:
return ''
def pack_dir(path, o_dir, max_levels=10, v_str=None, v_date=None, force=False, quiet=False):
if not quiet:
print 'Listing', path, '...'
try:
names = os.listdir(path)
except os.error:
print "Can't list", path
names = []
success = True
for name in sorted(names):
f_name = os.path.join(path, name).replace(os.sep, '/')
o_file = os.path.join(o_dir, name).replace(os.sep, '/')
if not os.path.isdir(f_name):
success &= pack_file(f_name, o_file, v_str, v_date, force, quiet)
elif max_levels > 0 and name not in (os.curdir, os.pardir) and os.path.isdir(f_name) and not os.path.islink(f_name):
success &= pack_dir(f_name, o_file, max_levels - 1, v_str, v_date, force, quiet)
return success
def ch_print(path, quiet, *args):
global printed
if quiet and not printed:
print 'Checking', path, '...'
printed = True
if args:
print ' '.join(args)
def pack_file(conf_name, out_path, v_str=None, v_date=None, force=False, quiet=False):
global printed
printed = False
if not quiet:
ch_print(conf_name, True)
with codecs.open(conf_name, 'r', 'utf-8-sig') as fp:
data = json.load(fp, 'utf-8-sig')
if not data['enabled']:
ch_print(conf_name, quiet, 'Archive disabled.')
return True
if any('{GAME_VERSION}' in path for path in data['files']) and v_str is None:
ch_print(conf_name, quiet, 'Encountered an archive which requires game version data, but it was not provided.')
sys.exit(2)
new_ext = data['ext']
if new_ext == '.zip':
mode = zipfile.ZIP_DEFLATED
elif new_ext == '.wotmod':
mode = zipfile.ZIP_STORED
else:
ch_print(conf_name, quiet, 'Unknown extension:', new_ext)
return False
for path_key, path in data['files'].items():
if not path_key.endswith('**'):
continue
data['files'].pop(path_key)
path_key = path_key[:-3]
path = path[:-3]
for rootDir, _, fNames in os.walk(path):
for fName in fNames:
fullPath = os.path.join(rootDir, fName).replace(os.sep, '/')
data['files'][fullPath.replace(path, path_key)] = fullPath
out_path = os.path.splitext(out_path)[0].decode('cp1251') + new_ext
if os.path.isfile(out_path):
if not force and check_identical(out_path, data['files'], v_str, quiet):
return True
else:
ch_print(out_path, quiet, 'Output file not found, creating a new one...')
return do_pack(out_path, data['files'], mode, v_str, v_date)
def compute_names(fp, paths, v_str, quiet):
file_names = {}
mismatched = []
for idx, filename in enumerate(paths):
if not filename.endswith('/') or (idx == (len(paths) - 1) or filename not in paths[idx + 1]):
new = ''
search = folder_ix_all.search(filename)
if search:
ver_dir = search.group()
file_ver = ver_dir.replace('mods', '').strip('/')
if file_ver != v_str:
mismatch = filename[:search.end()]
if mismatch not in mismatched:
mismatched.append(mismatch)
ch_print(fp, quiet, 'Updating versioned folder:', mismatch)
new = folder_ix_all.sub('mods/{GAME_VERSION}/', filename)
for api_type in ('modssettingsapi', 'guiflash', 'modslistapi'):
if api_type in os.path.basename(filename):
file_names['%s/*%s*' % (os.path.dirname(new or filename), api_type)] = filename
break
else:
file_names[new or filename] = filename
return file_names, mismatched
def get_stat_size_time(path):
stat = os.stat(path)
stat_date = datetime.fromtimestamp(stat.st_mtime)
stat_date = stat_date.replace(second=stat_date.second / 2 * 2)
return stat.st_size, stat_date.timetuple()[:6]
def check_identical(fp, arc_data, v_str, quiet=False):
identical = True
with zipfile.ZipFile(fp) as zf_orig:
paths = sorted(x.decode('cp866') for x in zf_orig.namelist())
act_data, mismatched = compute_names(fp, paths, v_str, quiet)
if mismatched or sorted(act_data) != sorted(arc_data):
identical = False
for f_path in sorted(arc_data):
if f_path.endswith('/'): # empty folder
if f_path not in act_data:
ch_print(fp, quiet, 'Adding missing folder:', f_path)
else:
if f_path in act_data:
act_path = act_data[f_path] or f_path
info = zf_orig.getinfo(act_path.encode('cp866'))
files = [p.replace(os.sep, '/') for p in glob.iglob(arc_data[f_path])]
if files:
if len(files) > 1:
ch_print(fp, quiet, 'Ambiguous wildcard:', arc_data[f_path], 'picked file:', files[0],
'other matches:', files[1:])
act_path = files[0]
else:
ch_print(fp, quiet, 'Could not find file:', arc_data[f_path])
continue
if (info.file_size, info.date_time) != get_stat_size_time(act_path):
identical = False
ch_print(fp, quiet, 'Updating file', info.filename)
else:
ch_print(fp, quiet, 'Adding missing file:', f_path)
for f_path in sorted(act_data):
if f_path not in arc_data:
ch_print(fp, quiet, 'Deleting file:', f_path)
return identical
def make_tree(paths):
tree = {}
for path in paths:
sub = tree
dirs = path.split('/')
for x in dirs[:-1]:
if x:
sub = sub.setdefault(x + '/', {})
if dirs[-1]:
sub.setdefault(dirs[-1], '')
return tree
def pack_directory(zf, mode, path, st_time, v_str, v_date):
v_was = False
if isinstance(path, unicode):
path = path.encode('cp866')
if v_str is not None:
if '{GAME_VERSION}' in path:
if path.endswith('{GAME_VERSION}/'):
st_time = v_date.timetuple()[:6]
v_was = True
path = path.replace('{GAME_VERSION}', v_str)
zf.writestr(zipfile.ZipInfo(path, st_time), '', mode)
return v_was
def pack_stuff(zf_new, mode, tree, arc_data, v_str, v_date, v_was, cur_path):
min_time, max_time = datetime.fromtimestamp(time.time()), datetime(1970, 1, 1)
for sub_name, sub_data in sorted(tree.iteritems(), key=lambda i: (isinstance(i[1], dict), not bool(i[1]), i[0])):
sub_path = sub_name if not cur_path else cur_path + sub_name
if isinstance(sub_data, dict): # dicts are directories, strings are files. No exceptions.
if sub_data: # non-empty folder
packed = pack_stuff(zf_new, mode, sub_data, arc_data, v_str, v_date, v_was, sub_path)
min_time = min(min_time, packed[0])
max_time = max(max_time, packed[1])
v_was |= packed[2]
else:
v_was |= pack_directory(zf_new, mode, sub_path, min_time.timetuple()[:6], v_str, v_date)
else:
paths = glob.glob(arc_data[sub_path])
if not paths:
raise ValueError('file %s does not exist' % arc_data[sub_path])
path = paths[0].replace(os.sep, '/')
with open(path, 'rb') as f:
st_time = get_stat_size_time(path)[1]
min_time = min(min_time, datetime(*st_time))
max_time = max(max_time, datetime(*st_time))
path = sub_path if '*' not in sub_path else os.path.dirname(sub_path) + '/' + os.path.basename(path)
if isinstance(path, unicode):
path = path.encode('cp866')
if v_str is not None and '{GAME_VERSION}' in path:
v_was = True
path = path.replace('{GAME_VERSION}', v_str)
zf_new.writestr(zipfile.ZipInfo(path, st_time), f.read(), mode)
if cur_path:
v_was |= pack_directory(zf_new, mode, cur_path, min_time.timetuple()[:6], v_str, v_date)
return min_time, max_time, v_was
def do_pack(fp, arc_data, mode, v_str, v_date):
fd = os.path.dirname(fp)
if not os.path.isdir(fd):
os.makedirs(fd)
tree = make_tree(sorted(arc_data))
with zipfile.ZipFile(fp, 'w', mode) as zf_new:
_, max_time, v_was = pack_stuff(zf_new, mode, tree, arc_data, v_str, v_date, False, '')
if v_str is not None and v_was:
max_time = max(max_time, v_date)
os.utime(fp, (time.time(), time.mktime(max_time.timetuple())))
return True
def main():
import getopt
try:
opts, args = getopt.getopt(sys.argv[1:], 'lfqv:')
except getopt.error, msg:
print msg
print "usage: python %s [-l] [-f] [-q] [-v version_file] configs_dir output_dir" % os.path.basename(sys.argv[0])
print ''' arguments:
configs_dir: directory to pick configs from
output_dir: directory to place built archives into
options:
-l: don't recurse into subdirectories
-f: force rebuild even if timestamps are up-to-date
-q: output only error messages and archive update reasons
-v version_file: if a {GAME_VERSION} macro is encountered and this path is not provided - build will fail'''
sys.exit(2)
max_levels = 10
force = False
quiet = False
version_file = None
for o, a in opts:
if o == '-l': max_levels = 0
if o == '-f': force = True
if o == '-q': quiet = True
if o == '-v': version_file = a
if len(args) != 2:
print 'packer only needs to know where to pick configs from and where to put the results to'
sys.exit(2)
success = True
version_str = None
version_date = None
if version_file:
try:
with open(version_file, 'r') as v_file:
version_str = v_file.read().strip()
assert version_str
timeStr = get_git_date(version_file)
version_date = datetime.fromtimestamp(long(timeStr) if timeStr else long(os.stat(version_file).st_mtime))
except IOError:
print 'WARNING: version file not found:', version_file
except AssertionError:
print 'WARNING: version was empty'
version_str = None
else:
print 'WARNING: version file not provided'
try:
if os.path.exists(args[0]):
if os.path.isdir(args[0]):
success &= pack_dir(args[0], args[1], max_levels, version_str, version_date, force, quiet)
else:
success &= pack_file(args[0], args[1], version_str, version_date, force, quiet)
else:
print 'Build config file/directory not found'
success = False
except KeyboardInterrupt:
print "\n[interrupted]"
success = False
return success
if __name__ == '__main__':
sys.exit(int(not main()))