factorio: reformat update.py with ruff and make lint clean

This commit is contained in:
Luke Granger-Brown 2024-10-22 19:39:11 +01:00
parent 8387c1626d
commit 0d3fab394f

View File

@ -6,7 +6,6 @@ import copy
from dataclasses import dataclass
import json
import os.path
import subprocess
from typing import Callable, Dict
from absl import app
@ -21,9 +20,17 @@ FACTORIO_HASHES = "https://factorio.com/download/sha256sums/"
FLAGS = flags.FLAGS
flags.DEFINE_string('out', '', 'Output path for versions.json.')
flags.DEFINE_list('release_type', '', 'If non-empty, a comma-separated list of release types to update (e.g. alpha).')
flags.DEFINE_list('release_channel', '', 'If non-empty, a comma-separated list of release channels to update (e.g. experimental).')
flags.DEFINE_string("out", "", "Output path for versions.json.")
flags.DEFINE_list(
"release_type",
"",
"If non-empty, a comma-separated list of release types to update (e.g. alpha).",
)
flags.DEFINE_list(
"release_channel",
"",
"If non-empty, a comma-separated list of release channels to update (e.g. experimental).",
)
@dataclass
@ -56,10 +63,24 @@ SYSTEMS = [
]
RELEASE_TYPES = [
ReleaseType("alpha", needs_auth=True, hash_filename_format=["factorio_linux_{version}.tar.xz"]),
ReleaseType(
"alpha",
needs_auth=True,
hash_filename_format=["factorio_linux_{version}.tar.xz"],
),
ReleaseType("demo", hash_filename_format=["factorio_demo_x64_{version}.tar.xz"]),
ReleaseType("headless", hash_filename_format=["factorio-headless_linux_{version}.tar.xz", "factorio_headless_x64_{version}.tar.xz"]),
ReleaseType("expansion", needs_auth=True, hash_filename_format=["factorio-space-age_linux_{version}.tar.xz"]),
ReleaseType(
"headless",
hash_filename_format=[
"factorio-headless_linux_{version}.tar.xz",
"factorio_headless_x64_{version}.tar.xz",
],
),
ReleaseType(
"expansion",
needs_auth=True,
hash_filename_format=["factorio-space-age_linux_{version}.tar.xz"],
),
]
RELEASE_CHANNELS = [
@ -75,7 +96,9 @@ def find_versions_json() -> str:
for path in try_paths:
if os.path.exists(path):
return path
raise Exception("Couldn't figure out where to write versions.json; try specifying --out")
raise Exception(
"Couldn't figure out where to write versions.json; try specifying --out"
)
def fetch_versions() -> FactorioVersionsJSON:
@ -86,7 +109,7 @@ def fetch_hashes() -> FactorioHashes:
resp = requests.get(FACTORIO_HASHES)
resp.raise_for_status()
out = {}
for ln in resp.text.split('\n'):
for ln in resp.text.split("\n"):
ln = ln.strip()
if not ln:
continue
@ -96,83 +119,148 @@ def fetch_hashes() -> FactorioHashes:
def generate_our_versions(factorio_versions: FactorioVersionsJSON) -> OurVersionJSON:
rec_dd = lambda: defaultdict(rec_dd)
def rec_dd():
return defaultdict(rec_dd)
output = rec_dd()
# Deal with times where there's no experimental version
for rc in RELEASE_CHANNELS:
if rc.name not in factorio_versions or not factorio_versions[rc.name]:
factorio_versions[rc.name] = factorio_versions['stable']
factorio_versions[rc.name] = factorio_versions["stable"]
for rt in RELEASE_TYPES:
if rt.name not in factorio_versions[rc.name] or not factorio_versions[rc.name][rt.name]:
factorio_versions[rc.name][rt.name] = factorio_versions['stable'][rt.name]
if (
rt.name not in factorio_versions[rc.name]
or not factorio_versions[rc.name][rt.name]
):
factorio_versions[rc.name][rt.name] = factorio_versions["stable"][
rt.name
]
for system in SYSTEMS:
for release_type in RELEASE_TYPES:
for release_channel in RELEASE_CHANNELS:
version = factorio_versions[release_channel.name].get(release_type.name)
if version == None:
if version is None:
continue
this_release = {
"name": f"factorio_{release_type.name}_{system.tar_name}-{version}.tar.xz",
"url": f"https://factorio.com/get-download/{version}/{release_type.name}/{system.url_name}",
"version": version,
"needsAuth": release_type.needs_auth,
"candidateHashFilenames": [fmt.format(version=version) for fmt in release_type.hash_filename_format],
"name": f"factorio_{release_type.name}_{system.tar_name}-{version}.tar.xz",
"url": f"https://factorio.com/get-download/{version}/{release_type.name}/{system.url_name}",
"version": version,
"needsAuth": release_type.needs_auth,
"candidateHashFilenames": [
fmt.format(version=version)
for fmt in release_type.hash_filename_format
],
"tarDirectory": system.tar_name,
}
output[system.nix_name][release_type.name][release_channel.name] = this_release
output[system.nix_name][release_type.name][release_channel.name] = (
this_release
)
return output
def iter_version(versions: OurVersionJSON, it: Callable[[str, str, str, Dict[str, str]], Dict[str, str]]) -> OurVersionJSON:
def iter_version(
versions: OurVersionJSON,
it: Callable[[str, str, str, Dict[str, str]], Dict[str, str]],
) -> OurVersionJSON:
versions = copy.deepcopy(versions)
for system_name, system in versions.items():
for release_type_name, release_type in system.items():
for release_channel_name, release in release_type.items():
release_type[release_channel_name] = it(system_name, release_type_name, release_channel_name, dict(release))
release_type[release_channel_name] = it(
system_name, release_type_name, release_channel_name, dict(release)
)
return versions
def merge_versions(old: OurVersionJSON, new: OurVersionJSON) -> OurVersionJSON:
"""Copies already-known hashes from version.json to avoid having to re-fetch."""
def _merge_version(system_name: str, release_type_name: str, release_channel_name: str, release: Dict[str, str]) -> Dict[str, str]:
def _merge_version(
system_name: str,
release_type_name: str,
release_channel_name: str,
release: Dict[str, str],
) -> Dict[str, str]:
old_system = old.get(system_name, {})
old_release_type = old_system.get(release_type_name, {})
old_release = old_release_type.get(release_channel_name, {})
if FLAGS.release_type and release_type_name not in FLAGS.release_type:
logging.info("%s/%s/%s: not in --release_type, not updating", system_name, release_type_name, release_channel_name)
logging.info(
"%s/%s/%s: not in --release_type, not updating",
system_name,
release_type_name,
release_channel_name,
)
return old_release
if FLAGS.release_channel and release_channel_name not in FLAGS.release_channel:
logging.info("%s/%s/%s: not in --release_channel, not updating", system_name, release_type_name, release_channel_name)
logging.info(
"%s/%s/%s: not in --release_channel, not updating",
system_name,
release_type_name,
release_channel_name,
)
return old_release
if not "sha256" in old_release:
logging.info("%s/%s/%s: not copying sha256 since it's missing", system_name, release_type_name, release_channel_name)
if "sha256" not in old_release:
logging.info(
"%s/%s/%s: not copying sha256 since it's missing",
system_name,
release_type_name,
release_channel_name,
)
return release
if not all(old_release.get(k, None) == release[k] for k in ['name', 'version', 'url']):
logging.info("%s/%s/%s: not copying sha256 due to mismatch", system_name, release_type_name, release_channel_name)
if not all(
old_release.get(k, None) == release[k] for k in ["name", "version", "url"]
):
logging.info(
"%s/%s/%s: not copying sha256 due to mismatch",
system_name,
release_type_name,
release_channel_name,
)
return release
release["sha256"] = old_release["sha256"]
return release
return iter_version(new, _merge_version)
def fill_in_hash(versions: OurVersionJSON, factorio_hashes: FactorioHashes) -> OurVersionJSON:
def fill_in_hash(
versions: OurVersionJSON, factorio_hashes: FactorioHashes
) -> OurVersionJSON:
"""Fill in sha256 hashes for anything missing them."""
urls_to_hash = {}
def _fill_in_hash(system_name: str, release_type_name: str, release_channel_name: str, release: Dict[str, str]) -> Dict[str, str]:
def _fill_in_hash(
system_name: str,
release_type_name: str,
release_channel_name: str,
release: Dict[str, str],
) -> Dict[str, str]:
for candidate_filename in release["candidateHashFilenames"]:
if candidate_filename in factorio_hashes:
release["sha256"] = factorio_hashes[candidate_filename]
break
else:
logging.error("%s/%s/%s: failed to find any of %s in %s", system_name, release_type_name, release_channel_name, release["candidateHashFilenames"], FACTORIO_HASHES)
logging.error(
"%s/%s/%s: failed to find any of %s in %s",
system_name,
release_type_name,
release_channel_name,
release["candidateHashFilenames"],
FACTORIO_HASHES,
)
return release
if "sha256" in release:
logging.info("%s/%s/%s: skipping fetch, sha256 already present", system_name, release_type_name, release_channel_name)
logging.info(
"%s/%s/%s: skipping fetch, sha256 already present",
system_name,
release_type_name,
release_channel_name,
)
return release
return release
return iter_version(versions, _fill_in_hash)
@ -183,18 +271,19 @@ def main(argv):
old_our_versions = None
our_versions_path = find_versions_json()
if our_versions_path:
logging.info('Loading old versions.json from %s', our_versions_path)
with open(our_versions_path, 'r') as f:
logging.info("Loading old versions.json from %s", our_versions_path)
with open(our_versions_path, "r") as f:
old_our_versions = json.load(f)
if old_our_versions:
logging.info('Merging in old hashes')
logging.info("Merging in old hashes")
new_our_versions = merge_versions(old_our_versions, new_our_versions)
logging.info('Updating hashes from Factorio SHA256')
logging.info("Updating hashes from Factorio SHA256")
new_our_versions = fill_in_hash(new_our_versions, factorio_hashes)
with open(our_versions_path, 'w') as f:
logging.info('Writing versions.json to %s', our_versions_path)
with open(our_versions_path, "w") as f:
logging.info("Writing versions.json to %s", our_versions_path)
json.dump(new_our_versions, f, sort_keys=True, indent=2)
f.write("\n")
if __name__ == '__main__':
if __name__ == "__main__":
app.run(main)