automatically set default_version
[web/firmware-selector-openwrt-org.git] / misc / collect.py
1 #!/usr/bin/env python3
2 """
3 Tool to create overview.json files and update the config.js.
4 """
5
6 from pathlib import Path
7 import urllib.request
8 import tempfile
9 import datetime
10 import argparse
11 import email
12 import time
13 import json
14 import glob
15 import sys
16 import os
17 import re
18 from distutils.version import StrictVersion
19
20 SUPPORTED_METADATA_VERSION = 1
21 BUILD_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
22
23 assert sys.version_info >= (3, 5), "Python version too old. Python >=3.5.0 needed."
24
25
26 def add_profile(output, path, id, target, profile, code=None, build_date=None):
27 def get_title(title):
28 if "title" in title:
29 return title["title"]
30 else:
31 return "{} {} {}".format(
32 title.get("vendor", ""), title["model"], title.get("variant", "")
33 ).strip()
34
35 images = []
36 for image in profile["images"]:
37 images.append({"name": image["name"], "type": image["type"]})
38
39 if target is None:
40 target = profile["target"]
41
42 for entry in profile["titles"]:
43 title = get_title(entry)
44
45 if len(title) == 0:
46 sys.stderr.write("Empty title. Skip title for {} in {}\n".format(id, path))
47 continue
48
49 """
50 Some devices are in ar71xx and ath79. But use TP-LINK" and "TP-Link".
51 E.g: `TP-LINK Archer C7 v5` and `TP-Link Archer C7 v5`
52 To be able to detect this, we need to make "same" titles identical.
53 """
54 if title.startswith("TP-LINK "):
55 title = "TP-Link {}".format(title[8:])
56
57 # device is a duplicate, try to differentiate by target
58 if title in output["models"]:
59 title = "{} ({})".format(title, target)
60
61 output["models"][title] = {"id": id, "target": target, "images": images}
62
63 if build_date is not None:
64 output["models"][title]["build_date"] = build_date
65
66 if code is not None:
67 output["models"][title]["code"] = code
68
69
70 # accepts {<file-path>: <file-content>}
71 def merge_profiles(profiles, download_url):
72 # json output data
73 output = {}
74
75 for profile in profiles:
76 obj = json.loads(profile["file_content"])
77
78 if obj["metadata_version"] != SUPPORTED_METADATA_VERSION:
79 sys.stderr.write(
80 "{} has unsupported metadata version: {} => skip\n".format(
81 profile["file_path"], obj["metadata_version"]
82 )
83 )
84 continue
85
86 code = obj.get("version_code", obj.get("version_commit"))
87 file_path = profile["file_path"]
88 build_date = profile["last_modified"]
89
90 if "version_code" not in output:
91 output = {"version_code": code, "download_url": download_url, "models": {}}
92
93 # if we have mixed codes/commits, store in device object
94 if output["version_code"] == code:
95 code = None
96
97 try:
98 if "profiles" in obj:
99 for id in obj["profiles"]:
100 add_profile(
101 output,
102 file_path,
103 id,
104 obj.get("target"),
105 obj["profiles"][id],
106 code,
107 build_date,
108 )
109 else:
110 add_profile(
111 output, file_path, obj["id"], obj["target"], obj, code, build_date
112 )
113 except json.decoder.JSONDecodeError as e:
114 sys.stderr.write("Skip {}\n {}\n".format(file_path, e))
115 except KeyError as e:
116 sys.stderr.write("Abort on {}\n Missing key {}\n".format(file_path, e))
117 exit(1)
118
119 return output
120
121
122 def update_config(www_path, versions):
123 config_path = "{}/config.js".format(www_path)
124
125 if os.path.isfile(config_path):
126 content = ""
127 with open(str(config_path), "r", encoding="utf-8") as file:
128 content = file.read()
129
130 latest_version = "0.0.0"
131 for version in versions.keys():
132 try:
133 if StrictVersion(version) > StrictVersion(latest_version):
134 latest_version = version
135 except ValueError:
136 print("Non numeric version: {}".format(version))
137 continue
138
139 content = re.sub(
140 "versions:[\\s]*{[^}]*}", "versions: {}".format(versions), content
141 )
142 content = re.sub(
143 "default_version:.*,",
144 'default_version: "{}",'.format(latest_version),
145 content,
146 )
147 with open(str(config_path), "w+") as file:
148 file.write(content)
149 else:
150 sys.stderr.write("Warning: File not found: {}\n".format(config_path))
151
152
153 """
154 Scrape profiles.json using links like https://downloads.openwrt.org/releases/19.07.3/targets/?json
155 Merge into overview.json files.
156 Update config.json.
157 """
158
159
160 def scrape(args):
161 url = args.domain
162 data_path = "{}/data".format(args.www_path)
163 versions = {}
164
165 def handle_release(target):
166 profiles = []
167 with urllib.request.urlopen("{}/?json".format(target)) as file:
168 array = json.loads(file.read().decode("utf-8"))
169 for profile in filter(lambda x: x.endswith("/profiles.json"), array):
170 with urllib.request.urlopen("{}/{}".format(target, profile)) as file:
171 last_modified = datetime.datetime(
172 *email.utils.parsedate(file.headers.get("last-modified"))[:6]
173 ).strftime(BUILD_DATE_FORMAT)
174 profiles.append(
175 {
176 "file_path": "{}/{}".format(target, profile),
177 "file_content": file.read().decode("utf-8"),
178 "last_modified": last_modified,
179 }
180 )
181 return profiles
182
183 # fetch release URLs
184 with urllib.request.urlopen(url) as infile:
185 for path in re.findall(r"href=[\"']?([^'\" >]+)", str(infile.read())):
186 if not path.startswith("/") and path.endswith("targets/"):
187 release = path.strip("/").split("/")[-2]
188 download_url = "{}/{}/{{target}}".format(url, path)
189
190 profiles = handle_release("{}/{}".format(url, path))
191 output = merge_profiles(profiles, download_url)
192 if len(output) > 0:
193 os.makedirs("{}/{}".format(data_path, release), exist_ok=True)
194 # write overview.json
195 with open(
196 "{}/{}/overview.json".format(data_path, release), "w"
197 ) as outfile:
198 if args.formatted:
199 json.dump(output, outfile, indent=" ", sort_keys=True)
200 else:
201 json.dump(output, outfile, sort_keys=True)
202
203 versions[release] = "data/{}/overview.json".format(release)
204
205 update_config(args.www_path, versions)
206
207
208 """
209 Scrape profiles.json using wget (slower but more generic).
210 Merge into overview.json files.
211 Update config.json.
212 """
213
214
215 def scrape_wget(args):
216 url = args.domain
217 data_path = "{}/data".format(args.www_path)
218 versions = {}
219
220 with tempfile.TemporaryDirectory() as tmp_dir:
221 # download all profiles.json files
222 os.system(
223 "wget -c -r -P {} -A 'profiles.json' --reject-regex 'kmods|packages' --no-parent {}".format(
224 tmp_dir, url
225 )
226 )
227
228 # delete empty folders
229 os.system("find {}/* -type d -empty -delete".format(tmp_dir))
230
231 # create overview.json files
232 for path in glob.glob("{}/*/snapshots".format(tmp_dir)) + glob.glob(
233 "{}/*/releases/*".format(tmp_dir)
234 ):
235 release = os.path.basename(path)
236 base = path[len(tmp_dir) + 1 :]
237
238 profiles = []
239 for ppath in Path(path).rglob("profiles.json"):
240 with open(str(ppath), "r", encoding="utf-8") as file:
241 # we assume local timezone is UTC/GMT
242 last_modified = datetime.datetime.fromtimestamp(
243 os.path.getmtime(ppath)
244 ).strftime(BUILD_DATE_FORMAT)
245 profiles.append(
246 {
247 "file_path": str(ppath),
248 "file_content": file.read(),
249 "last_modified": last_modified,
250 }
251 )
252
253 if len(profiles) == 0:
254 continue
255
256 versions[release] = "data/{}/overview.json".format(release)
257
258 output = merge_profiles(
259 profiles, "https://{}/targets/{{target}}".format(base)
260 )
261 os.makedirs("{}/{}".format(data_path, release), exist_ok=True)
262
263 # write overview.json
264 with open("{}/{}/overview.json".format(data_path, release), "w") as outfile:
265 if args.formatted:
266 json.dump(output, outfile, indent=" ", sort_keys=True)
267 else:
268 json.dump(output, outfile, sort_keys=True)
269
270 update_config(args.www_path, versions)
271
272
273 """
274 Find and merge json files for a single release.
275 """
276
277
278 def merge(args):
279 input_paths = args.input_path
280 # OpenWrt JSON device files
281 profiles = []
282
283 def add_path(path):
284 with open(str(path), "r", encoding="utf-8") as file:
285 last_modified = time.strftime(
286 BUILD_DATE_FORMAT, time.gmtime(os.path.getmtime(str(path)))
287 )
288 profiles.append(
289 {
290 "file_path": str(path),
291 "file_content": file.read(),
292 "last_modified": last_modified,
293 }
294 )
295
296 for path in input_paths:
297 if os.path.isdir(path):
298 for filepath in Path(path).rglob("*.json"):
299 add_path(filepath)
300 else:
301 if not path.endswith(".json"):
302 sys.stderr.write("Folder does not exists: {}\n".format(path))
303 exit(1)
304 add_path(path)
305
306 output = merge_profiles(profiles, args.download_url)
307
308 if args.formatted:
309 json.dump(output, sys.stdout, indent=" ", sort_keys=True)
310 else:
311 json.dump(output, sys.stdout, sort_keys=True)
312
313
314 """
315 Scan local directory for releases with profiles.json.
316 Merge into overview.json files.
317 Update config.json.
318 """
319
320
321 def scan(args):
322 # the overview.json files are placed here
323 data_path = "{}/data".format(args.www_path)
324 versions = {}
325
326 # args.images_path => args.releases_path
327 releases = {}
328 for path in Path(args.images_path).rglob("profiles.json"):
329 with open(str(path), "r", encoding="utf-8") as file:
330 content = file.read()
331 obj = json.loads(content)
332 release = obj["version_number"]
333 last_modified = time.strftime(
334 BUILD_DATE_FORMAT, time.gmtime(os.path.getmtime(str(path)))
335 )
336 releases.setdefault(release, []).append(
337 {
338 "file_path": str(path),
339 "file_content": content,
340 "last_modified": last_modified,
341 }
342 )
343
344 """
345 Replace {base} variable in download URL with the intersection
346 of all profile.json paths. E.g.:
347 ../tmp/releases/18.06.8/targets => base is releases/18.06.8/targets
348 ../tmp/snapshots/targets => base in snapshots/targets
349 """
350
351 def replace_base(releases, target_release, download_url):
352 if "{base}" in download_url:
353 # release => base path (of profiles.json locations)
354 paths = {}
355 for release, profiles in releases.items():
356 profile_paths = [profile["file_path"] for profile in profiles]
357 paths[release] = os.path.commonpath(profile_paths)
358 # base path of all releases
359 release_path_base = os.path.commonpath(paths.values())
360 # get path intersection
361 base = str(paths[target_release])[len(release_path_base) + 1 :]
362 return download_url.replace("{base}", base)
363 else:
364 return download_url
365
366 for release, profiles in releases.items():
367 download_url = replace_base(releases, release, args.download_url)
368 output = merge_profiles(profiles, download_url)
369
370 versions[release] = "data/{}/overview.json".format(release)
371 os.makedirs("{}/{}".format(data_path, release), exist_ok=True)
372
373 # write overview.json
374 with open("{}/{}/overview.json".format(data_path, release), "w") as outfile:
375 if args.formatted:
376 json.dump(output, outfile, indent=" ", sort_keys=True)
377 else:
378 json.dump(output, outfile, sort_keys=True)
379
380 update_config(args.www_path, versions)
381
382
383 def main():
384 parser = argparse.ArgumentParser()
385 parser.add_argument(
386 "--formatted", action="store_true", help="Output formatted JSON data."
387 )
388 subparsers = parser.add_subparsers(dest="action")
389 subparsers.required = True
390
391 parser_merge = subparsers.add_parser(
392 "merge", help="Search for profiles.json files and output an overview.json."
393 )
394 parser_merge.add_argument(
395 "input_path",
396 nargs="+",
397 help="Input folder that is traversed for OpenWrt JSON device files.",
398 )
399 parser_merge.add_argument(
400 "--download-url",
401 action="store",
402 default="",
403 help="Link to get the image from. May contain {target} (replaced by e.g. ath79/generic), {version} (replace by the version key from config.js) and {commit} (git commit in hex notation).",
404 )
405
406 parser_scrape = subparsers.add_parser("scrape", help="Scrape webpage for releases.")
407 parser_scrape.add_argument(
408 "domain", help="Domain to scrape. E.g. https://downloads.openwrt.org"
409 )
410 parser_scrape.add_argument("www_path", help="Path the config.js file is in.")
411 parser_scrape.add_argument(
412 "--use-wget", action="store_true", help="Use wget to scrape the site."
413 )
414
415 parser_scan = subparsers.add_parser("scan", help="Scan directory for releases.")
416 parser_scan.add_argument(
417 "download_url", help="Download for images. E.g. https://downloads.openwrt.org"
418 )
419 parser_scan.add_argument("images_path", help="Directory to scan for releases.")
420 parser_scan.add_argument("www_path", help="Path the config.js file is in.")
421
422 args = parser.parse_args()
423
424 if args.action == "merge":
425 merge(args)
426
427 if args.action == "scan":
428 scan(args)
429
430 if args.action == "scrape":
431 if args.use_wget:
432 scrape_wget(args)
433 else:
434 scrape(args)
435
436
437 if __name__ == "__main__":
438 main()