mirror of
https://github.com/NixOS/nixpkgs.git
synced 2024-11-26 17:03:01 +00:00
nixos/apache-kafka: structured settings
- Use lazyAttrs (for config references) settings for main server.properties. - Drop dangerous default for "log.dirs" - Drop apache-kafka homedir; unused and confusing - Support formatting kraft logdirs
This commit is contained in:
parent
e4ad989506
commit
45f84cdfd5
@ -5,77 +5,119 @@ with lib;
|
||||
let
|
||||
cfg = config.services.apache-kafka;
|
||||
|
||||
serverProperties =
|
||||
if cfg.serverProperties != null then
|
||||
cfg.serverProperties
|
||||
else
|
||||
''
|
||||
# Generated by nixos
|
||||
broker.id=${toString cfg.brokerId}
|
||||
port=${toString cfg.port}
|
||||
host.name=${cfg.hostname}
|
||||
log.dirs=${concatStringsSep "," cfg.logDirs}
|
||||
zookeeper.connect=${cfg.zookeeper}
|
||||
${toString cfg.extraProperties}
|
||||
'';
|
||||
# The `javaProperties` generator takes care of various escaping rules and
|
||||
# generation of the properties file, but we'll handle stringly conversion
|
||||
# ourselves in mkPropertySettings and stringlySettings, since we know more
|
||||
# about the specifically allowed format eg. for lists of this type, and we
|
||||
# don't want to coerce-downsample values to str too early by having the
|
||||
# coercedTypes from javaProperties directly in our NixOS option types.
|
||||
#
|
||||
# Make sure every `freeformType` and any specific option type in `settings` is
|
||||
# supported here.
|
||||
|
||||
serverConfig = pkgs.writeText "server.properties" serverProperties;
|
||||
logConfig = pkgs.writeText "log4j.properties" cfg.log4jProperties;
|
||||
mkPropertyString = let
|
||||
render = {
|
||||
bool = boolToString;
|
||||
int = toString;
|
||||
list = concatMapStringsSep "," mkPropertyString;
|
||||
string = id;
|
||||
};
|
||||
in
|
||||
v: render.${builtins.typeOf v} v;
|
||||
|
||||
stringlySettings = mapAttrs (_: mkPropertyString)
|
||||
(filterAttrs (_: v: v != null) cfg.settings);
|
||||
|
||||
generator = (pkgs.formats.javaProperties {}).generate;
|
||||
in {
|
||||
|
||||
options.services.apache-kafka = {
|
||||
enable = mkOption {
|
||||
description = lib.mdDoc "Whether to enable Apache Kafka.";
|
||||
default = false;
|
||||
type = types.bool;
|
||||
};
|
||||
enable = mkEnableOption (lib.mdDoc "Apache Kafka event streaming broker");
|
||||
|
||||
brokerId = mkOption {
|
||||
description = lib.mdDoc "Broker ID.";
|
||||
default = -1;
|
||||
type = types.int;
|
||||
};
|
||||
|
||||
port = mkOption {
|
||||
description = lib.mdDoc "Port number the broker should listen on.";
|
||||
default = 9092;
|
||||
type = types.port;
|
||||
};
|
||||
|
||||
hostname = mkOption {
|
||||
description = lib.mdDoc "Hostname the broker should bind to.";
|
||||
default = "localhost";
|
||||
type = types.str;
|
||||
};
|
||||
|
||||
logDirs = mkOption {
|
||||
description = lib.mdDoc "Log file directories";
|
||||
default = [ "/tmp/kafka-logs" ];
|
||||
type = types.listOf types.path;
|
||||
};
|
||||
|
||||
zookeeper = mkOption {
|
||||
description = lib.mdDoc "Zookeeper connection string";
|
||||
default = "localhost:2181";
|
||||
type = types.str;
|
||||
};
|
||||
|
||||
extraProperties = mkOption {
|
||||
description = lib.mdDoc "Extra properties for server.properties.";
|
||||
type = types.nullOr types.lines;
|
||||
default = null;
|
||||
};
|
||||
|
||||
serverProperties = mkOption {
|
||||
settings = mkOption {
|
||||
description = lib.mdDoc ''
|
||||
Complete server.properties content. Other server.properties config
|
||||
options will be ignored if this option is used.
|
||||
[Kafka broker configuration](https://kafka.apache.org/documentation.html#brokerconfigs)
|
||||
{file}`server.properties`.
|
||||
|
||||
Note that .properties files contain mappings from string to string.
|
||||
Keys with dots are NOT represented by nested attrs in these settings,
|
||||
but instead as quoted strings (ie. `settings."broker.id"`, NOT
|
||||
`settings.broker.id`).
|
||||
'';
|
||||
type = types.submodule {
|
||||
freeformType = with types; let
|
||||
primitive = oneOf [bool int str];
|
||||
in lazyAttrsOf (nullOr (either primitive (listOf primitive)));
|
||||
|
||||
options = {
|
||||
"broker.id" = mkOption {
|
||||
description = lib.mdDoc "Broker ID. -1 or null to auto-allocate in zookeeper mode.";
|
||||
default = null;
|
||||
type = with types; nullOr int;
|
||||
};
|
||||
|
||||
"log.dirs" = mkOption {
|
||||
description = lib.mdDoc "Log file directories.";
|
||||
# Deliberaly leave out old default and use the rewrite opportunity
|
||||
# to have users choose a safer value -- /tmp might be volatile and is a
|
||||
# slightly scary default choice.
|
||||
# default = [ "/tmp/apache-kafka" ];
|
||||
type = with types; listOf path;
|
||||
};
|
||||
|
||||
"listeners" = mkOption {
|
||||
description = lib.mdDoc ''
|
||||
Kafka Listener List.
|
||||
See [listeners](https://kafka.apache.org/documentation/#brokerconfigs_listeners).
|
||||
'';
|
||||
type = types.listOf types.str;
|
||||
default = [ "PLAINTEXT://localhost:9092" ];
|
||||
};
|
||||
};
|
||||
};
|
||||
};
|
||||
|
||||
clusterId = mkOption {
|
||||
description = lib.mdDoc ''
|
||||
KRaft mode ClusterId used for formatting log directories. Can be generated with `kafka-storage.sh random-uuid`
|
||||
'';
|
||||
type = types.nullOr types.lines;
|
||||
type = with types; nullOr str;
|
||||
default = null;
|
||||
};
|
||||
|
||||
configFiles.serverProperties = mkOption {
|
||||
description = lib.mdDoc ''
|
||||
Kafka server.properties configuration file path.
|
||||
Defaults to the rendered `settings`.
|
||||
'';
|
||||
type = types.path;
|
||||
};
|
||||
|
||||
configFiles.log4jProperties = mkOption {
|
||||
description = lib.mdDoc "Kafka log4j property configuration file path";
|
||||
type = types.path;
|
||||
default = pkgs.writeText "log4j.properties" cfg.log4jProperties;
|
||||
defaultText = ''pkgs.writeText "log4j.properties" cfg.log4jProperties'';
|
||||
};
|
||||
|
||||
formatLogDirs = mkOption {
|
||||
description = lib.mdDoc ''
|
||||
Whether to format log dirs in KRaft mode if all log dirs are
|
||||
unformatted, ie. they contain no meta.properties.
|
||||
'';
|
||||
type = types.bool;
|
||||
default = false;
|
||||
};
|
||||
|
||||
formatLogDirsIgnoreFormatted = mkOption {
|
||||
description = lib.mdDoc ''
|
||||
Whether to ignore already formatted log dirs when formatting log dirs,
|
||||
instead of failing. Useful when replacing or adding disks.
|
||||
'';
|
||||
type = types.bool;
|
||||
default = false;
|
||||
};
|
||||
|
||||
log4jProperties = mkOption {
|
||||
description = lib.mdDoc "Kafka log4j property configuration.";
|
||||
default = ''
|
||||
@ -112,35 +154,61 @@ in {
|
||||
defaultText = literalExpression "pkgs.apacheKafka.passthru.jre";
|
||||
type = types.package;
|
||||
};
|
||||
|
||||
};
|
||||
|
||||
config = mkIf cfg.enable {
|
||||
imports = [
|
||||
(mkRenamedOptionModule
|
||||
[ "services" "apache-kafka" "brokerId" ]
|
||||
[ "services" "apache-kafka" "settings" ''broker.id'' ])
|
||||
(mkRenamedOptionModule
|
||||
[ "services" "apache-kafka" "logDirs" ]
|
||||
[ "services" "apache-kafka" "settings" ''log.dirs'' ])
|
||||
(mkRenamedOptionModule
|
||||
[ "services" "apache-kafka" "zookeeper" ]
|
||||
[ "services" "apache-kafka" "settings" ''zookeeper.connect'' ])
|
||||
|
||||
environment.systemPackages = [cfg.package];
|
||||
(mkRemovedOptionModule [ "services" "apache-kafka" "port" ]
|
||||
"Please see services.apache-kafka.settings.listeners and its documentation instead")
|
||||
(mkRemovedOptionModule [ "services" "apache-kafka" "hostname" ]
|
||||
"Please see services.apache-kafka.settings.listeners and its documentation instead")
|
||||
(mkRemovedOptionModule [ "services" "apache-kafka" "extraProperties" ]
|
||||
"Please see services.apache-kafka.settings and its documentation instead")
|
||||
(mkRemovedOptionModule [ "services" "apache-kafka" "serverProperties" ]
|
||||
"Please see services.apache-kafka.settings and its documentation instead")
|
||||
];
|
||||
|
||||
config = mkIf cfg.enable {
|
||||
services.apache-kafka.configFiles.serverProperties = generator "server.properties" stringlySettings;
|
||||
|
||||
users.users.apache-kafka = {
|
||||
isSystemUser = true;
|
||||
group = "apache-kafka";
|
||||
description = "Apache Kafka daemon user";
|
||||
home = head cfg.logDirs;
|
||||
};
|
||||
users.groups.apache-kafka = {};
|
||||
|
||||
systemd.tmpfiles.rules = map (logDir: "d '${logDir}' 0700 apache-kafka - - -") cfg.logDirs;
|
||||
systemd.tmpfiles.rules = map (logDir: "d '${logDir}' 0700 apache-kafka - - -") cfg.settings."log.dirs";
|
||||
|
||||
systemd.services.apache-kafka = {
|
||||
description = "Apache Kafka Daemon";
|
||||
wantedBy = [ "multi-user.target" ];
|
||||
after = [ "network.target" ];
|
||||
preStart = mkIf cfg.formatLogDirs
|
||||
(if cfg.formatLogDirsIgnoreFormatted then ''
|
||||
${cfg.package}/bin/kafka-storage.sh format -t "${cfg.clusterId}" -c ${cfg.configFiles.serverProperties} --ignore-formatted
|
||||
'' else ''
|
||||
if ${concatMapStringsSep " && " (l: ''[ ! -f "${l}/meta.properties" ]'') cfg.settings."log.dirs"}; then
|
||||
${cfg.package}/bin/kafka-storage.sh format -t "${cfg.clusterId}" -c ${cfg.configFiles.serverProperties}
|
||||
fi
|
||||
'');
|
||||
serviceConfig = {
|
||||
ExecStart = ''
|
||||
${cfg.jre}/bin/java \
|
||||
-cp "${cfg.package}/libs/*" \
|
||||
-Dlog4j.configuration=file:${logConfig} \
|
||||
-Dlog4j.configuration=file:${cfg.configFiles.log4jProperties} \
|
||||
${toString cfg.jvmOptions} \
|
||||
kafka.Kafka \
|
||||
${serverConfig}
|
||||
${cfg.configFiles.serverProperties}
|
||||
'';
|
||||
User = "apache-kafka";
|
||||
SuccessExitStatus = "0 143";
|
||||
|
@ -23,12 +23,14 @@ let
|
||||
kafka = { ... }: {
|
||||
services.apache-kafka = {
|
||||
enable = true;
|
||||
extraProperties = ''
|
||||
offsets.topic.replication.factor = 1
|
||||
zookeeper.session.timeout.ms = 600000
|
||||
'';
|
||||
settings = {
|
||||
"offsets.topic.replication.factor" = 1;
|
||||
"zookeeper.session.timeout.ms" = 600000;
|
||||
"zookeeper.connect" = [ "zookeeper1:2181" ];
|
||||
"log.dirs" = [ "/tmp/apache-kafka" ];
|
||||
};
|
||||
|
||||
package = kafkaPackage;
|
||||
zookeeper = "zookeeper1:2181";
|
||||
};
|
||||
|
||||
networking.firewall.allowedTCPPorts = [ 9092 ];
|
||||
|
Loading…
Reference in New Issue
Block a user