nixpkgs/nixos/modules/services/misc/apache-kafka.nix

166 lines
4.2 KiB
Nix

{ config, lib, pkgs, ... }:
with lib;
let
cfg = config.services.apache-kafka;
serverProperties =
if cfg.serverProperties != null then
cfg.serverProperties
else
''
# Generated by nixos
broker.id=${toString cfg.brokerId}
port=${toString cfg.port}
host.name=${cfg.hostname}
log.dirs=${concatStringsSep "," cfg.logDirs}
zookeeper.connect=${cfg.zookeeper}
${toString cfg.extraProperties}
'';
configDir = pkgs.buildEnv {
name = "apache-kafka-conf";
paths = [
(pkgs.writeTextDir "server.properties" serverProperties)
(pkgs.writeTextDir "log4j.properties" cfg.log4jProperties)
];
};
in {
options.services.apache-kafka = {
enable = mkOption {
description = "Whether to enable Apache Kafka.";
default = false;
type = types.uniq types.bool;
};
brokerId = mkOption {
description = "Broker ID.";
default = 0;
type = types.int;
};
port = mkOption {
description = "Port number the broker should listen on.";
default = 9092;
type = types.int;
};
hostname = mkOption {
description = "Hostname the broker should bind to.";
default = "localhost";
type = types.string;
};
logDirs = mkOption {
description = "Log file directories";
default = [ "/tmp/kafka-logs" ];
type = types.listOf types.path;
};
zookeeper = mkOption {
description = "Zookeeper connection string";
default = "localhost:2181";
type = types.string;
};
extraProperties = mkOption {
description = "Extra properties for server.properties.";
type = types.nullOr types.lines;
default = null;
};
serverProperties = mkOption {
description = ''
Complete server.properties content. Other server.properties config
options will be ignored if this option is used.
'';
type = types.nullOr types.lines;
default = null;
};
log4jProperties = mkOption {
description = "Kafka log4j property configuration.";
default = ''
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
'';
type = types.lines;
};
jvmOptions = mkOption {
description = "Extra command line options for the JVM running Kafka.";
default = [
"-server"
"-Xmx1G"
"-Xms1G"
"-XX:+UseCompressedOops"
"-XX:+UseParNewGC"
"-XX:+UseConcMarkSweepGC"
"-XX:+CMSClassUnloadingEnabled"
"-XX:+CMSScavengeBeforeRemark"
"-XX:+DisableExplicitGC"
"-Djava.awt.headless=true"
"-Djava.net.preferIPv4Stack=true"
];
type = types.listOf types.string;
example = [
"-Djava.net.preferIPv4Stack=true"
"-Dcom.sun.management.jmxremote"
"-Dcom.sun.management.jmxremote.local.only=true"
];
};
package = mkOption {
description = "The kafka package to use";
default = pkgs.apacheKafka;
type = types.package;
};
};
config = mkIf cfg.enable {
environment.systemPackages = [cfg.package];
users.extraUsers = singleton {
name = "apache-kafka";
uid = config.ids.uids.apache-kafka;
description = "Apache Kafka daemon user";
home = head cfg.logDirs;
};
systemd.services.apache-kafka = {
description = "Apache Kafka Daemon";
wantedBy = [ "multi-user.target" ];
after = [ "network-interfaces.target" ];
serviceConfig = {
ExecStart = ''
${pkgs.jre}/bin/java \
-cp "${cfg.package}/libs/*:${configDir}" \
${toString cfg.jvmOptions} \
kafka.Kafka \
${configDir}/server.properties
'';
User = "apache-kafka";
PermissionsStartOnly = true;
SuccessExitStatus = "0 143";
};
preStart = ''
mkdir -m 0700 -p ${concatStringsSep " " cfg.logDirs}
if [ "$(id -u)" = 0 ]; then
chown apache-kafka ${concatStringsSep " " cfg.logDirs};
fi
'';
};
};
}