527d27a5bfe7a2e2e85e1a340bf3b17e7ddebb47
[web/firmware-selector-openwrt-org.git] / misc / collect.py
1 #!/usr/bin/env python3
2 """
3 Tool to create overview.json files and update the config.js.
4 """
5
6 from pathlib import Path
7 import urllib.request
8 import tempfile
9 import datetime
10 import argparse
11 import email
12 import time
13 import json
14 import glob
15 import sys
16 import os
17 import re
18
19 SUPPORTED_METADATA_VERSION = 1
20 BUILD_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
21
22 assert sys.version_info >= (3, 5), "Python version too old. Python >=3.5.0 needed."
23
24
25 def add_profile(output, path, id, target, profile, code=None, build_date=None):
26 def get_title(title):
27 if "title" in title:
28 return title["title"]
29 else:
30 return "{} {} {}".format(
31 title.get("vendor", ""), title["model"], title.get("variant", "")
32 ).strip()
33
34 images = []
35 for image in profile["images"]:
36 images.append({"name": image["name"], "type": image["type"]})
37
38 if target is None:
39 target = profile["target"]
40
41 for entry in profile["titles"]:
42 title = get_title(entry)
43
44 if len(title) == 0:
45 sys.stderr.write("Empty title. Skip title for {} in {}\n".format(id, path))
46 continue
47
48 """
49 Some devices are in ar71xx and ath79. But use TP-LINK" and "TP-Link".
50 E.g: `TP-LINK Archer C7 v5` and `TP-Link Archer C7 v5`
51 To be able to detect this, we need to make "same" titles identical.
52 """
53 if title.startswith("TP-LINK "):
54 title = "TP-Link {}".format(title[8:])
55
56 # device is a duplicate, try to differentiate by target
57 if title in output["models"]:
58 title = "{} ({})".format(title, target)
59
60 output["models"][title] = {"id": id, "target": target, "images": images}
61
62 if build_date is not None:
63 output["models"][title]["build_date"] = build_date
64
65 if code is not None:
66 output["models"][title]["code"] = code
67
68
69 # accepts {<file-path>: <file-content>}
70 def merge_profiles(profiles, download_url):
71 # json output data
72 output = {}
73
74 for profile in profiles:
75 obj = json.loads(profile["file_content"])
76
77 if obj["metadata_version"] != SUPPORTED_METADATA_VERSION:
78 sys.stderr.write(
79 "{} has unsupported metadata version: {} => skip\n".format(
80 profile["file_path"], obj["metadata_version"]
81 )
82 )
83 continue
84
85 code = obj.get("version_code", obj.get("version_commit"))
86 file_path = profile["file_path"]
87 build_date = profile["last_modified"]
88
89 if "version_code" not in output:
90 output = {"version_code": code, "download_url": download_url, "models": {}}
91
92 # if we have mixed codes/commits, store in device object
93 if output["version_code"] == code:
94 code = None
95
96 try:
97 if "profiles" in obj:
98 for id in obj["profiles"]:
99 add_profile(
100 output,
101 file_path,
102 id,
103 obj.get("target"),
104 obj["profiles"][id],
105 code,
106 build_date,
107 )
108 else:
109 add_profile(
110 output, file_path, obj["id"], obj["target"], obj, code, build_date
111 )
112 except json.decoder.JSONDecodeError as e:
113 sys.stderr.write("Skip {}\n {}\n".format(file_path, e))
114 except KeyError as e:
115 sys.stderr.write("Abort on {}\n Missing key {}\n".format(file_path, e))
116 exit(1)
117
118 return output
119
120
121 def update_config(config_path, versions):
122 content = ""
123 with open(str(config_path), "r", encoding="utf-8") as file:
124 content = file.read()
125
126 content = re.sub("versions:[\\s]*{[^}]*}", "versions: {}".format(versions), content)
127 with open(str(config_path), "w+") as file:
128 file.write(content)
129
130
131 """
132 Scrape profiles.json using links like https://downloads.openwrt.org/releases/19.07.3/targets/?json
133 Merge into overview.json files.
134 Update config.json.
135 """
136
137
138 def scrape(args):
139 url = args.domain
140 www_path = args.www_path
141 config_path = "{}/config.js".format(www_path)
142 data_path = "{}/data".format(www_path)
143 versions = {}
144
145 def handle_release(target):
146 profiles = []
147 with urllib.request.urlopen("{}/?json".format(target)) as file:
148 array = json.loads(file.read().decode("utf-8"))
149 for profile in filter(lambda x: x.endswith("/profiles.json"), array):
150 with urllib.request.urlopen("{}/{}".format(target, profile)) as file:
151 last_modified = datetime.datetime(
152 *email.utils.parsedate(file.headers.get("last-modified"))[:6]
153 ).strftime(BUILD_DATE_FORMAT)
154 profiles.append(
155 {
156 "file_path": "{}/{}".format(target, profile),
157 "file_content": file.read().decode("utf-8"),
158 "last_modified": last_modified,
159 }
160 )
161 return profiles
162
163 if not os.path.isfile(config_path):
164 print("file not found: {}".format(config_path))
165 exit(1)
166
167 # fetch release URLs
168 with urllib.request.urlopen(url) as infile:
169 for path in re.findall(r"href=[\"']?([^'\" >]+)", str(infile.read())):
170 if not path.startswith("/") and path.endswith("targets/"):
171 release = path.strip("/").split("/")[-2]
172 download_url = "{}/{}/{{target}}".format(url, path)
173
174 profiles = handle_release("{}/{}".format(url, path))
175 output = merge_profiles(profiles, download_url)
176 if len(output) > 0:
177 os.makedirs("{}/{}".format(data_path, release), exist_ok=True)
178 # write overview.json
179 with open(
180 "{}/{}/overview.json".format(data_path, release), "w"
181 ) as outfile:
182 if args.formatted:
183 json.dump(output, outfile, indent=" ", sort_keys=True)
184 else:
185 json.dump(output, outfile, sort_keys=True)
186
187 versions[release] = "data/{}/overview.json".format(release)
188
189 update_config(config_path, versions)
190
191
192 """
193 Scrape profiles.json using wget (slower but more generic).
194 Merge into overview.json files.
195 Update config.json.
196 """
197
198
199 def scrape_wget(args):
200 url = args.domain
201 www_path = args.www_path
202 config_path = "{}/config.js".format(www_path)
203 data_path = "{}/data".format(www_path)
204 versions = {}
205
206 with tempfile.TemporaryDirectory() as tmp_dir:
207 # download all profiles.json files
208 os.system(
209 "wget -c -r -P {} -A 'profiles.json' --reject-regex 'kmods|packages' --no-parent {}".format(
210 tmp_dir, url
211 )
212 )
213
214 # delete empty folders
215 os.system("find {}/* -type d -empty -delete".format(tmp_dir))
216
217 # create overview.json files
218 for path in glob.glob("{}/*/snapshots".format(tmp_dir)) + glob.glob(
219 "{}/*/releases/*".format(tmp_dir)
220 ):
221 release = os.path.basename(path)
222 base = path[len(tmp_dir) + 1 :]
223
224 profiles = []
225 for ppath in Path(path).rglob("profiles.json"):
226 with open(str(ppath), "r", encoding="utf-8") as file:
227 # we assume local timezone is UTC/GMT
228 last_modified = datetime.datetime.fromtimestamp(
229 os.path.getmtime(ppath)
230 ).strftime(BUILD_DATE_FORMAT)
231 profiles.append(
232 {
233 "file_path": str(ppath),
234 "file_content": file.read(),
235 "last_modified": last_modified,
236 }
237 )
238
239 if len(profiles) == 0:
240 continue
241
242 versions[release] = "data/{}/overview.json".format(release)
243
244 output = merge_profiles(
245 profiles, "https://{}/targets/{{target}}".format(base)
246 )
247 os.makedirs("{}/{}".format(data_path, release), exist_ok=True)
248
249 # write overview.json
250 with open("{}/{}/overview.json".format(data_path, release), "w") as outfile:
251 if args.formatted:
252 json.dump(output, outfile, indent=" ", sort_keys=True)
253 else:
254 json.dump(output, outfile, sort_keys=True)
255
256 update_config(config_path, versions)
257
258
259 """
260 Find and merge json files for a single release.
261 """
262
263
264 def merge(args):
265 input_paths = args.input_path
266 # OpenWrt JSON device files
267 profiles = []
268
269 def add_path(path):
270 with open(str(path), "r", encoding="utf-8") as file:
271 last_modified = time.strftime(
272 BUILD_DATE_FORMAT, time.gmtime(os.path.getmtime(str(path)))
273 )
274 profiles.append(
275 {
276 "file_path": str(path),
277 "file_content": file.read(),
278 "last_modified": last_modified,
279 }
280 )
281
282 for path in input_paths:
283 if os.path.isdir(path):
284 for filepath in Path(path).rglob("*.json"):
285 add_path(filepath)
286 else:
287 if not path.endswith(".json"):
288 sys.stderr.write("Folder does not exists: {}\n".format(path))
289 exit(1)
290 add_path(path)
291
292 output = merge_profiles(profiles, args.download_url)
293
294 if args.formatted:
295 json.dump(output, sys.stdout, indent=" ", sort_keys=True)
296 else:
297 json.dump(output, sys.stdout, sort_keys=True)
298
299
300 """
301 Scan local directory for releases with profiles.json.
302 Merge into overview.json files.
303 Update config.json.
304 """
305
306
307 def scan(args):
308 # firmware selector config
309 config_path = "{}/config.js".format(args.www_path)
310 # the overview.json files are placed here
311 data_path = "{}/data".format(args.www_path)
312 versions = {}
313
314 # args.images_path => args.releases_path
315 releases = {}
316 for path in Path(args.images_path).rglob("profiles.json"):
317 with open(str(path), "r", encoding="utf-8") as file:
318 content = file.read()
319 obj = json.loads(content)
320 release = obj["version_number"]
321 last_modified = time.strftime(
322 BUILD_DATE_FORMAT, time.gmtime(os.path.getmtime(str(path)))
323 )
324 releases.setdefault(release, []).append(
325 {
326 "file_path": str(path),
327 "file_content": content,
328 "last_modified": last_modified,
329 }
330 )
331
332 """
333 Replace {base} variable in download URL with the intersection
334 of all profile.json paths. E.g.:
335 ../tmp/releases/18.06.8/targets => base is releases/18.06.8/targets
336 ../tmp/snapshots/targets => base in snapshots/targets
337 """
338
339 def replace_base(releases, target_release, download_url):
340 if "{base}" in download_url:
341 # release => base path (of profiles.json locations)
342 paths = {}
343 for release, profiles in releases.items():
344 paths[release] = os.path.commonpath(profiles.keys())
345 # base path of all releases
346 release_path_base = os.path.commonpath(paths.values())
347 # get path intersection
348 base = str(paths[target_release])[len(release_path_base) + 1 :]
349 return download_url.replace("{base}", base)
350 else:
351 return download_url
352
353 for release, profiles in releases.items():
354 download_url = replace_base(releases, release, args.download_url)
355 output = merge_profiles(profiles, download_url)
356
357 versions[release] = "data/{}/overview.json".format(release)
358 os.makedirs("{}/{}".format(data_path, release), exist_ok=True)
359
360 # write overview.json
361 with open("{}/{}/overview.json".format(data_path, release), "w") as outfile:
362 if args.formatted:
363 json.dump(output, outfile, indent=" ", sort_keys=True)
364 else:
365 json.dump(output, outfile, sort_keys=True)
366
367 update_config(config_path, versions)
368
369
370 def main():
371 parser = argparse.ArgumentParser()
372 parser.add_argument(
373 "--formatted", action="store_true", help="Output formatted JSON data."
374 )
375 subparsers = parser.add_subparsers(dest="action")
376 subparsers.required = True
377
378 parser_merge = subparsers.add_parser(
379 "merge", help="Search for profiles.json files and output an overview.json."
380 )
381 parser_merge.add_argument(
382 "input_path",
383 nargs="+",
384 help="Input folder that is traversed for OpenWrt JSON device files.",
385 )
386 parser_merge.add_argument(
387 "--download-url",
388 action="store",
389 default="",
390 help="Link to get the image from. May contain {target} (replaced by e.g. ath79/generic), {version} (replace by the version key from config.js) and {commit} (git commit in hex notation).",
391 )
392
393 parser_scrape = subparsers.add_parser("scrape", help="Scrape webpage for releases.")
394 parser_scrape.add_argument(
395 "domain", help="Domain to scrape. E.g. https://downloads.openwrt.org"
396 )
397 parser_scrape.add_argument("www_path", help="Path the config.js file is in.")
398 parser_scrape.add_argument(
399 "--use-wget", action="store_true", help="Use wget to scrape the site."
400 )
401
402 parser_scan = subparsers.add_parser("scan", help="Scan directory for releases.")
403 parser_scan.add_argument(
404 "download_url", help="Download for images. E.g. https://downloads.openwrt.org"
405 )
406 parser_scan.add_argument("images_path", help="Directory to scan for releases.")
407 parser_scan.add_argument("www_path", help="Path the config.js file is in.")
408
409 args = parser.parse_args()
410
411 if args.action == "merge":
412 merge(args)
413
414 if args.action == "scan":
415 scan(args)
416
417 if args.action == "scrape":
418 if args.use_wget:
419 scrape_wget(args)
420 else:
421 scrape(args)
422
423
424 if __name__ == "__main__":
425 main()