misc/collect.py: move out nested method to please linter
[web/firmware-selector-openwrt-org.git] / misc / collect.py
1 #!/usr/bin/env python3
2 """
3 Tool to create overview.json files and update the config.js.
4 """
5
6 from pathlib import Path
7 import urllib.request
8 import tempfile
9 import argparse
10 import json
11 import glob
12 import sys
13 import os
14 import re
15
16 SUPPORTED_METADATA_VERSION = 1
17
18 assert sys.version_info >= (3, 5), "Python version too old. Python >=3.5.0 needed."
19
20
21 def add_profile(output, path, id, target, profile, code=None):
22 def get_title(title):
23 if "title" in title:
24 return title["title"]
25 else:
26 return "{} {} {}".format(
27 title.get("vendor", ""), title["model"], title.get("variant", "")
28 ).strip()
29
30 images = []
31 for image in profile["images"]:
32 images.append({"name": image["name"], "type": image["type"]})
33
34 if target is None:
35 target = profile["target"]
36
37 for entry in profile["titles"]:
38 title = get_title(entry)
39
40 if len(title) == 0:
41 sys.stderr.write("Empty title. Skip title for {} in {}\n".format(id, path))
42 continue
43
44 """
45 Some devices are in ar71xx and ath79. But use TP-LINK" and "TP-Link".
46 E.g: `TP-LINK Archer C7 v5` and `TP-Link Archer C7 v5`
47 To be able to detect this, we need to make "same" titles identical.
48 """
49 if title.startswith("TP-LINK "):
50 title = "TP-Link {}".format(title[8:])
51
52 # device is a duplicate, try to differentiate by target
53 if title in output["models"]:
54 title = "{} ({})".format(title, target)
55
56 output["models"][title] = {"id": id, "target": target, "images": images}
57
58 if code is not None:
59 output["models"][title]["code"] = code
60
61
62 # accepts {<file-path>: <file-content>}
63 def merge_profiles(profiles, download_url):
64 # json output data
65 output = {}
66
67 for path, content in profiles.items():
68 obj = json.loads(content)
69
70 if obj["metadata_version"] != SUPPORTED_METADATA_VERSION:
71 sys.stderr.write(
72 "{} has unsupported metadata version: {} => skip\n".format(
73 path, obj["metadata_version"]
74 )
75 )
76 continue
77
78 code = obj.get("version_code", obj.get("version_commit"))
79
80 if "version_code" not in output:
81 output = {"version_code": code, "download_url": download_url, "models": {}}
82
83 # if we have mixed codes/commits, store in device object
84 if output["version_code"] == code:
85 code = None
86
87 try:
88 if "profiles" in obj:
89 for id in obj["profiles"]:
90 add_profile(
91 output, path, id, obj.get("target"), obj["profiles"][id], code
92 )
93 else:
94 add_profile(output, path, obj["id"], obj["target"], obj, code)
95 except json.decoder.JSONDecodeError as e:
96 sys.stderr.write("Skip {}\n {}\n".format(path, e))
97 except KeyError as e:
98 sys.stderr.write("Abort on {}\n Missing key {}\n".format(path, e))
99 exit(1)
100
101 return output
102
103
104 def update_config(config_path, versions):
105 content = ""
106 with open(str(config_path), "r", encoding="utf-8") as file:
107 content = file.read()
108
109 content = re.sub("versions:[\\s]*{[^}]*}", "versions: {}".format(versions), content)
110 with open(str(config_path), "w+") as file:
111 file.write(content)
112
113
114 """
115 Scrape profiles.json using links like https://downloads.openwrt.org/releases/19.07.3/targets/?json
116 Merge into overview.json files.
117 Update config.json.
118 """
119
120
121 def scrape(args):
122 url = args.domain
123 www_path = args.www_path
124 config_path = "{}/config.js".format(www_path)
125 data_path = "{}/data".format(www_path)
126 versions = {}
127
128 def handle_release(target):
129 profiles = {}
130 with urllib.request.urlopen("{}/?json".format(target)) as file:
131 array = json.loads(file.read().decode("utf-8"))
132 for profile in filter(lambda x: x.endswith("/profiles.json"), array):
133 with urllib.request.urlopen("{}/{}".format(target, profile)) as file:
134 profiles["{}/{}".format(target, profile)] = file.read().decode(
135 "utf-8"
136 )
137 return profiles
138
139 if not os.path.isfile(config_path):
140 print("file not found: {}".format(config_path))
141 exit(1)
142
143 # fetch release URLs
144 with urllib.request.urlopen(url) as infile:
145 for path in re.findall(r"href=[\"']?([^'\" >]+)", str(infile.read())):
146 if not path.startswith("/") and path.endswith("targets/"):
147 release = path.strip("/").split("/")[-2]
148 download_url = "{}/{}/{{target}}".format(url, path)
149
150 profiles = handle_release("{}/{}".format(url, path))
151 output = merge_profiles(profiles, download_url)
152 if len(output) > 0:
153 os.makedirs("{}/{}".format(data_path, release), exist_ok=True)
154 # write overview.json
155 with open(
156 "{}/{}/overview.json".format(data_path, release), "w"
157 ) as outfile:
158 if args.formatted:
159 json.dump(output, outfile, indent=" ", sort_keys=True)
160 else:
161 json.dump(output, outfile, sort_keys=True)
162
163 versions[release] = "data/{}/overview.json".format(release)
164
165 update_config(config_path, versions)
166
167
168 """
169 Scrape profiles.json using wget (slower but more generic).
170 Merge into overview.json files.
171 Update config.json.
172 """
173
174
175 def scrape_wget(args):
176 url = args.domain
177 www_path = args.www_path
178 config_path = "{}/config.js".format(www_path)
179 data_path = "{}/data".format(www_path)
180 versions = {}
181
182 with tempfile.TemporaryDirectory() as tmp_dir:
183 # download all profiles.json files
184 os.system(
185 "wget -c -r -P {} -A 'profiles.json' --reject-regex 'kmods|packages' --no-parent {}".format(
186 tmp_dir, url
187 )
188 )
189
190 # delete empty folders
191 os.system("find {}/* -type d -empty -delete".format(tmp_dir))
192
193 # create overview.json files
194 for path in glob.glob("{}/*/snapshots".format(tmp_dir)) + glob.glob(
195 "{}/*/releases/*".format(tmp_dir)
196 ):
197 release = os.path.basename(path)
198 base = path[len(tmp_dir) + 1 :]
199
200 profiles = {}
201 for ppath in Path(path).rglob("profiles.json"):
202 with open(str(ppath), "r", encoding="utf-8") as file:
203 profiles[ppath] = file.read()
204
205 if len(profiles) == 0:
206 continue
207
208 versions[release] = "data/{}/overview.json".format(release)
209
210 output = merge_profiles(
211 profiles, "https://{}/targets/{{target}}".format(base)
212 )
213 os.makedirs("{}/{}".format(data_path, release), exist_ok=True)
214
215 # write overview.json
216 with open("{}/{}/overview.json".format(data_path, release), "w") as outfile:
217 if args.formatted:
218 json.dump(output, outfile, indent=" ", sort_keys=True)
219 else:
220 json.dump(output, outfile, sort_keys=True)
221
222 update_config(config_path, versions)
223
224
225 """
226 Find and merge json files for a single release.
227 """
228
229
230 def merge(args):
231 input_paths = args.input_path
232 # OpenWrt JSON device files
233 profiles = {}
234
235 def add_path(path):
236 with open(str(path), "r", encoding="utf-8") as file:
237 profiles[path] = file.read()
238
239 for path in input_paths:
240 if os.path.isdir(path):
241 for filepath in Path(path).rglob("*.json"):
242 add_path(filepath)
243 else:
244 if not path.endswith(".json"):
245 sys.stderr.write("Folder does not exists: {}\n".format(path))
246 exit(1)
247 add_path(path)
248
249 output = merge_profiles(profiles, args.download_url)
250
251 if args.formatted:
252 json.dump(output, sys.stdout, indent=" ", sort_keys=True)
253 else:
254 json.dump(output, sys.stdout, sort_keys=True)
255
256
257 """
258 Scan local directory for releases with profiles.json.
259 Merge into overview.json files.
260 Update config.json.
261 """
262
263
264 def scan(args):
265 # firmware selector config
266 config_path = "{}/config.js".format(args.www_path)
267 # the overview.json files are placed here
268 data_path = "{}/data".format(args.www_path)
269 versions = {}
270
271 # args.images_path => args.releases_path
272 releases = {}
273 for path in Path(args.images_path).rglob("profiles.json"):
274 with open(str(path), "r", encoding="utf-8") as file:
275 content = file.read()
276 obj = json.loads(content)
277 release = obj["version_number"]
278 releases.setdefault(release, {})[path] = content
279
280 for release, profiles in releases.items():
281 output = merge_profiles(profiles, args.download_url)
282
283 versions[release] = "data/{}/overview.json".format(release)
284 os.makedirs("{}/{}".format(data_path, release), exist_ok=True)
285
286 # write overview.json
287 with open("{}/{}/overview.json".format(data_path, release), "w") as outfile:
288 if args.formatted:
289 json.dump(output, outfile, indent=" ", sort_keys=True)
290 else:
291 json.dump(output, outfile, sort_keys=True)
292
293 update_config(config_path, versions)
294
295
296 def main():
297 parser = argparse.ArgumentParser()
298 parser.add_argument(
299 "--formatted", action="store_true", help="Output formatted JSON data."
300 )
301 subparsers = parser.add_subparsers(dest="action")
302 subparsers.required = True
303
304 parser_merge = subparsers.add_parser(
305 "merge", help="Search for profiles.json files and output an overview.json."
306 )
307 parser_merge.add_argument(
308 "input_path",
309 nargs="+",
310 help="Input folder that is traversed for OpenWrt JSON device files.",
311 )
312 parser_merge.add_argument(
313 "--download-url",
314 action="store",
315 default="",
316 help="Link to get the image from. May contain {target} (replaced by e.g. ath79/generic), {version} (replace by the version key from config.js) and {commit} (git commit in hex notation).",
317 )
318
319 parser_scrape = subparsers.add_parser("scrape", help="Scrape webpage for releases.")
320 parser_scrape.add_argument(
321 "domain", help="Domain to scrape. E.g. https://downloads.openwrt.org"
322 )
323 parser_scrape.add_argument("www_path", help="Path the config.js file is in.")
324 parser_scrape.add_argument(
325 "--use-wget", action="store_true", help="Use wget to scrape the site."
326 )
327
328 parser_scan = subparsers.add_parser("scan", help="Scan directory for releases.")
329 parser_scan.add_argument(
330 "download_url", help="Download for images. E.g. https://downloads.openwrt.org"
331 )
332 parser_scan.add_argument("images_path", help="Directory to scan for releases.")
333 parser_scan.add_argument("www_path", help="Path the config.js file is in.")
334
335 args = parser.parse_args()
336
337 if args.action == "merge":
338 merge(args)
339
340 if args.action == "scan":
341 scan(args)
342
343 if args.action == "scrape":
344 if args.use_wget:
345 scrape_wget(args)
346 else:
347 scrape(args)
348
349
350 if __name__ == "__main__":
351 main()