0b2968d3f05dd1f16dc6b47243eb208cbba1329f
[web/firmware-selector-openwrt-org.git] / misc / collect.py
1 #!/usr/bin/env python3
2 """
3 Tool to create overview.json files and update the config.js.
4 """
5
6 from pathlib import Path
7 import urllib.request
8 import tempfile
9 import datetime
10 import argparse
11 import email
12 import time
13 import json
14 import glob
15 import sys
16 import os
17 import re
18
19 SUPPORTED_METADATA_VERSION = 1
20 BUILD_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
21
22 assert sys.version_info >= (3, 5), "Python version too old. Python >=3.5.0 needed."
23
24
25 def add_profile(output, path, id, target, profile, code=None, build_date=None):
26 def get_title(title):
27 if "title" in title:
28 return title["title"]
29 else:
30 return "{} {} {}".format(
31 title.get("vendor", ""), title["model"], title.get("variant", "")
32 ).strip()
33
34 images = []
35 for image in profile["images"]:
36 images.append({"name": image["name"], "type": image["type"]})
37
38 if target is None:
39 target = profile["target"]
40
41 for entry in profile["titles"]:
42 title = get_title(entry)
43
44 if len(title) == 0:
45 sys.stderr.write("Empty title. Skip title for {} in {}\n".format(id, path))
46 continue
47
48 """
49 Some devices are in ar71xx and ath79. But use TP-LINK" and "TP-Link".
50 E.g: `TP-LINK Archer C7 v5` and `TP-Link Archer C7 v5`
51 To be able to detect this, we need to make "same" titles identical.
52 """
53 if title.startswith("TP-LINK "):
54 title = "TP-Link {}".format(title[8:])
55
56 # device is a duplicate, try to differentiate by target
57 if title in output["models"]:
58 title = "{} ({})".format(title, target)
59
60 output["models"][title] = {"id": id, "target": target, "images": images}
61
62 if build_date is not None:
63 output["models"][title]["build_date"] = build_date
64
65 if code is not None:
66 output["models"][title]["code"] = code
67
68
69 # accepts {<file-path>: <file-content>}
70 def merge_profiles(profiles, download_url):
71 # json output data
72 output = {}
73
74 for profile in profiles:
75 obj = json.loads(profile["file_content"])
76
77 if obj["metadata_version"] != SUPPORTED_METADATA_VERSION:
78 sys.stderr.write(
79 "{} has unsupported metadata version: {} => skip\n".format(
80 profile["file_path"], obj["metadata_version"]
81 )
82 )
83 continue
84
85 code = obj.get("version_code", obj.get("version_commit"))
86 file_path = profile["file_path"]
87 build_date = profile["last_modified"]
88
89 if "version_code" not in output:
90 output = {"version_code": code, "download_url": download_url, "models": {}}
91
92 # if we have mixed codes/commits, store in device object
93 if output["version_code"] == code:
94 code = None
95
96 try:
97 if "profiles" in obj:
98 for id in obj["profiles"]:
99 add_profile(
100 output,
101 file_path,
102 id,
103 obj.get("target"),
104 obj["profiles"][id],
105 code,
106 build_date,
107 )
108 else:
109 add_profile(
110 output, file_path, obj["id"], obj["target"], obj, code, build_date
111 )
112 except json.decoder.JSONDecodeError as e:
113 sys.stderr.write("Skip {}\n {}\n".format(file_path, e))
114 except KeyError as e:
115 sys.stderr.write("Abort on {}\n Missing key {}\n".format(file_path, e))
116 exit(1)
117
118 return output
119
120
121 def update_config(www_path, versions):
122 config_path = "{}/config.js".format(www_path)
123 content = ""
124 with open(str(config_path), "r", encoding="utf-8") as file:
125 content = file.read()
126
127 content = re.sub("versions:[\\s]*{[^}]*}", "versions: {}".format(versions), content)
128 with open(str(config_path), "w+") as file:
129 file.write(content)
130
131
132 """
133 Scrape profiles.json using links like https://downloads.openwrt.org/releases/19.07.3/targets/?json
134 Merge into overview.json files.
135 Update config.json.
136 """
137
138
139 def scrape(args):
140 url = args.domain
141 data_path = "{}/data".format(args.www_path)
142 versions = {}
143
144 def handle_release(target):
145 profiles = []
146 with urllib.request.urlopen("{}/?json".format(target)) as file:
147 array = json.loads(file.read().decode("utf-8"))
148 for profile in filter(lambda x: x.endswith("/profiles.json"), array):
149 with urllib.request.urlopen("{}/{}".format(target, profile)) as file:
150 last_modified = datetime.datetime(
151 *email.utils.parsedate(file.headers.get("last-modified"))[:6]
152 ).strftime(BUILD_DATE_FORMAT)
153 profiles.append(
154 {
155 "file_path": "{}/{}".format(target, profile),
156 "file_content": file.read().decode("utf-8"),
157 "last_modified": last_modified,
158 }
159 )
160 return profiles
161
162 if not os.path.isfile(config_path):
163 print("file not found: {}".format(config_path))
164 exit(1)
165
166 # fetch release URLs
167 with urllib.request.urlopen(url) as infile:
168 for path in re.findall(r"href=[\"']?([^'\" >]+)", str(infile.read())):
169 if not path.startswith("/") and path.endswith("targets/"):
170 release = path.strip("/").split("/")[-2]
171 download_url = "{}/{}/{{target}}".format(url, path)
172
173 profiles = handle_release("{}/{}".format(url, path))
174 output = merge_profiles(profiles, download_url)
175 if len(output) > 0:
176 os.makedirs("{}/{}".format(data_path, release), exist_ok=True)
177 # write overview.json
178 with open(
179 "{}/{}/overview.json".format(data_path, release), "w"
180 ) as outfile:
181 if args.formatted:
182 json.dump(output, outfile, indent=" ", sort_keys=True)
183 else:
184 json.dump(output, outfile, sort_keys=True)
185
186 versions[release] = "data/{}/overview.json".format(release)
187
188 update_config(args.www_path, versions)
189
190
191 """
192 Scrape profiles.json using wget (slower but more generic).
193 Merge into overview.json files.
194 Update config.json.
195 """
196
197
198 def scrape_wget(args):
199 url = args.domain
200 data_path = "{}/data".format(args.www_path)
201 versions = {}
202
203 with tempfile.TemporaryDirectory() as tmp_dir:
204 # download all profiles.json files
205 os.system(
206 "wget -c -r -P {} -A 'profiles.json' --reject-regex 'kmods|packages' --no-parent {}".format(
207 tmp_dir, url
208 )
209 )
210
211 # delete empty folders
212 os.system("find {}/* -type d -empty -delete".format(tmp_dir))
213
214 # create overview.json files
215 for path in glob.glob("{}/*/snapshots".format(tmp_dir)) + glob.glob(
216 "{}/*/releases/*".format(tmp_dir)
217 ):
218 release = os.path.basename(path)
219 base = path[len(tmp_dir) + 1 :]
220
221 profiles = []
222 for ppath in Path(path).rglob("profiles.json"):
223 with open(str(ppath), "r", encoding="utf-8") as file:
224 # we assume local timezone is UTC/GMT
225 last_modified = datetime.datetime.fromtimestamp(
226 os.path.getmtime(ppath)
227 ).strftime(BUILD_DATE_FORMAT)
228 profiles.append(
229 {
230 "file_path": str(ppath),
231 "file_content": file.read(),
232 "last_modified": last_modified,
233 }
234 )
235
236 if len(profiles) == 0:
237 continue
238
239 versions[release] = "data/{}/overview.json".format(release)
240
241 output = merge_profiles(
242 profiles, "https://{}/targets/{{target}}".format(base)
243 )
244 os.makedirs("{}/{}".format(data_path, release), exist_ok=True)
245
246 # write overview.json
247 with open("{}/{}/overview.json".format(data_path, release), "w") as outfile:
248 if args.formatted:
249 json.dump(output, outfile, indent=" ", sort_keys=True)
250 else:
251 json.dump(output, outfile, sort_keys=True)
252
253 update_config(args.www_path, versions)
254
255
256 """
257 Find and merge json files for a single release.
258 """
259
260
261 def merge(args):
262 input_paths = args.input_path
263 # OpenWrt JSON device files
264 profiles = []
265
266 def add_path(path):
267 with open(str(path), "r", encoding="utf-8") as file:
268 last_modified = time.strftime(
269 BUILD_DATE_FORMAT, time.gmtime(os.path.getmtime(str(path)))
270 )
271 profiles.append(
272 {
273 "file_path": str(path),
274 "file_content": file.read(),
275 "last_modified": last_modified,
276 }
277 )
278
279 for path in input_paths:
280 if os.path.isdir(path):
281 for filepath in Path(path).rglob("*.json"):
282 add_path(filepath)
283 else:
284 if not path.endswith(".json"):
285 sys.stderr.write("Folder does not exists: {}\n".format(path))
286 exit(1)
287 add_path(path)
288
289 output = merge_profiles(profiles, args.download_url)
290
291 if args.formatted:
292 json.dump(output, sys.stdout, indent=" ", sort_keys=True)
293 else:
294 json.dump(output, sys.stdout, sort_keys=True)
295
296
297 """
298 Scan local directory for releases with profiles.json.
299 Merge into overview.json files.
300 Update config.json.
301 """
302
303
304 def scan(args):
305 # the overview.json files are placed here
306 data_path = "{}/data".format(args.www_path)
307 versions = {}
308
309 # args.images_path => args.releases_path
310 releases = {}
311 for path in Path(args.images_path).rglob("profiles.json"):
312 with open(str(path), "r", encoding="utf-8") as file:
313 content = file.read()
314 obj = json.loads(content)
315 release = obj["version_number"]
316 last_modified = time.strftime(
317 BUILD_DATE_FORMAT, time.gmtime(os.path.getmtime(str(path)))
318 )
319 releases.setdefault(release, []).append(
320 {
321 "file_path": str(path),
322 "file_content": content,
323 "last_modified": last_modified,
324 }
325 )
326
327 """
328 Replace {base} variable in download URL with the intersection
329 of all profile.json paths. E.g.:
330 ../tmp/releases/18.06.8/targets => base is releases/18.06.8/targets
331 ../tmp/snapshots/targets => base in snapshots/targets
332 """
333
334 def replace_base(releases, target_release, download_url):
335 if "{base}" in download_url:
336 # release => base path (of profiles.json locations)
337 paths = {}
338 for release, profiles in releases.items():
339 profile_paths = [profile["file_path"] for profile in profiles]
340 paths[release] = os.path.commonpath(profile_paths)
341 # base path of all releases
342 release_path_base = os.path.commonpath(paths.values())
343 # get path intersection
344 base = str(paths[target_release])[len(release_path_base) + 1 :]
345 return download_url.replace("{base}", base)
346 else:
347 return download_url
348
349 for release, profiles in releases.items():
350 download_url = replace_base(releases, release, args.download_url)
351 output = merge_profiles(profiles, download_url)
352
353 versions[release] = "data/{}/overview.json".format(release)
354 os.makedirs("{}/{}".format(data_path, release), exist_ok=True)
355
356 # write overview.json
357 with open("{}/{}/overview.json".format(data_path, release), "w") as outfile:
358 if args.formatted:
359 json.dump(output, outfile, indent=" ", sort_keys=True)
360 else:
361 json.dump(output, outfile, sort_keys=True)
362
363 update_config(args.www_path, versions)
364
365
366 def main():
367 parser = argparse.ArgumentParser()
368 parser.add_argument(
369 "--formatted", action="store_true", help="Output formatted JSON data."
370 )
371 subparsers = parser.add_subparsers(dest="action")
372 subparsers.required = True
373
374 parser_merge = subparsers.add_parser(
375 "merge", help="Search for profiles.json files and output an overview.json."
376 )
377 parser_merge.add_argument(
378 "input_path",
379 nargs="+",
380 help="Input folder that is traversed for OpenWrt JSON device files.",
381 )
382 parser_merge.add_argument(
383 "--download-url",
384 action="store",
385 default="",
386 help="Link to get the image from. May contain {target} (replaced by e.g. ath79/generic), {version} (replace by the version key from config.js) and {commit} (git commit in hex notation).",
387 )
388
389 parser_scrape = subparsers.add_parser("scrape", help="Scrape webpage for releases.")
390 parser_scrape.add_argument(
391 "domain", help="Domain to scrape. E.g. https://downloads.openwrt.org"
392 )
393 parser_scrape.add_argument("www_path", help="Path the config.js file is in.")
394 parser_scrape.add_argument(
395 "--use-wget", action="store_true", help="Use wget to scrape the site."
396 )
397
398 parser_scan = subparsers.add_parser("scan", help="Scan directory for releases.")
399 parser_scan.add_argument(
400 "download_url", help="Download for images. E.g. https://downloads.openwrt.org"
401 )
402 parser_scan.add_argument("images_path", help="Directory to scan for releases.")
403 parser_scan.add_argument("www_path", help="Path the config.js file is in.")
404
405 args = parser.parse_args()
406
407 if args.action == "merge":
408 merge(args)
409
410 if args.action == "scan":
411 scan(args)
412
413 if args.action == "scrape":
414 if args.use_wget:
415 scrape_wget(args)
416 else:
417 scrape(args)
418
419
420 if __name__ == "__main__":
421 main()