#!/usr/bin/env bash
#
# Copyright 2016-2024 Confluent Inc.
#

set -e
if [ "${DEBUG_ENTRYPOINT}" -ne 0 ]; then
  set -xv
fi

AGENT_DIR="/opt/confluent/libs"

check_agent_jar() {
  if [ ! -f "$1" ]; then
    _caas_err "Can't find $1! Was it packaged correctly?";
  fi
}

echo "===> Adding disk usage agent to the java command ... "
DISK_USAGE_AGENT_VERSION="7.6.0-809"
DISK_USAGE_AGENT_JAR="${AGENT_DIR}/disk-usage-agent-${DISK_USAGE_AGENT_VERSION}.jar"
check_agent_jar "${DISK_USAGE_AGENT_JAR}"
EXTRA_ARGS="${EXTRA_ARGS} -javaagent:${DISK_USAGE_AGENT_JAR}=/opt/caas/config/kafka/disk-usage-agent.properties"


echo "===> Adding jolokia agent to the java command ... "
JOLOKIA_AGENT_PORT=${JOLOKIA_AGENT_PORT:-7777}
JOLOKIA_AGENT_HOST=${JOLOKIA_AGENT_HOST:-"0.0.0.0"}
JOLOKIA_AGENT_PROTOCOL=${JOLOKIA_AGENT_PROTOCOL:-"http"}
JOLOKIA_AGENT_VERSION="2.0.3"
JOLOKIA_AGENT_JAR="${AGENT_DIR}/jolokia-agent-jvm-${JOLOKIA_AGENT_VERSION}-javaagent.jar"
check_agent_jar "${JOLOKIA_AGENT_JAR}"
# It's important to keep jolokia-access.xml OUT of the classpath, as this makes Jolokia deny
# by default if it can't be loaded. If it's in the classpath, then Jolokia is allow-by-default
check_agent_jar "/opt/caas/etc/jolokia-access.xml"

# We use pem files for caCert, serverCert and the private key instead of specifying a keystore, so Jolokia invocation can be
# independent of the type of keystore in use.
# As the fullchain PEM file contains the full chain: server and CA certs, it can be used for both certs.
JOLOKIA_FULLCHAIN="${SSLCERTS_DIR}"/$(jq -r '.ssl_pem_fullchain_filename' "${SSLCERTS_SPEC_FILE}")
JOLOKIA_SERVERKEY="${SSLCERTS_DIR}"/$(jq -r '.ssl_pem_privkey_filename'   "${SSLCERTS_SPEC_FILE}")

if [ -f "${JOLOKIA_FULLCHAIN}" ] && [ -f "${JOLOKIA_SERVERKEY}" ]; then
  echo "Jolokia config: protocol=$JOLOKIA_AGENT_PROTOCOL, caCert&serverCert=$JOLOKIA_FULLCHAIN, serverKey=$JOLOKIA_SERVERKEY"
  JOLOKIA_CERTS="caCert=${JOLOKIA_FULLCHAIN},serverCert=${JOLOKIA_FULLCHAIN},serverKey=${JOLOKIA_SERVERKEY}"
elif [ "$FIPS_ENABLED" = "true" ]; then
    _caas_err "Can't find ${JOLOKIA_FULLCHAIN} or ${JOLOKIA_SERVERKEY} to be used by Jolokia"
else
  echo "WARNING: ${JOLOKIA_FULLCHAIN} or ${JOLOKIA_SERVERKEY} not found, defaulting to using keystore"
  JOLOKIA_CERTS="${KEYSTORE_LOCATION},keyStoreType=${KEYSTORE_TYPE},keystorePassword=mystorepassword"
fi

echo "Jolokia config: protocol=$JOLOKIA_AGENT_PROTOCOL, caCert&serverCert=$JOLOKIA_FULLCHAIN, serverKey=$JOLOKIA_SERVERKEY"
EXTRA_ARGS="${EXTRA_ARGS} -javaagent:${JOLOKIA_AGENT_JAR}=port=${JOLOKIA_AGENT_PORT},host=${JOLOKIA_AGENT_HOST},protocol=${JOLOKIA_AGENT_PROTOCOL},${JOLOKIA_CERTS},policyLocation=file:///opt/caas/etc/jolokia-access.xml"


echo "===> Adding jmx exporter to the java command ... "
export JMX_EXPORTER_AGENT_PORT=${JMX_EXPORTER_AGENT_PORT:-7778}
export JMX_EXPORTER_AGENT_HOST=${JMX_EXPORTER_AGENT_HOST:-"0.0.0.0"}

export EXTRA_ARGS="${EXTRA_ARGS} -javaagent:/usr/share/java/jmx_prometheus_javaagent/jmx_prometheus_javaagent.jar=${JMX_EXPORTER_AGENT_PORT}:/opt/caas/config/jmx-exporter.yaml"

export ENABLE_OTEL_JAVAAGENT=$(grep "^otel.javaagent.enabled=" ${KAFKA_CONFIG_DIR}/kafka.properties | cut -d '=' -f 2)
export ENABLE_OTEL_TRACE=$(grep "^otel.trace.enabled=" ${KAFKA_CONFIG_DIR}/kafka.properties | cut -d '=' -f 2)
export OTEL_OTLP_TRACES_CUSTOM_ENDPOINT=$(grep "^otel.exporter.otlp.traces.custom.endpoint=" ${KAFKA_CONFIG_DIR}/kafka.properties | cut -d '=' -f 2)

if [[ "${ENABLE_OTEL_JAVAAGENT^^}" == "TRUE" ]]; then
  echo "====> Adding OpenTelemetry Java agent to the java command ..."
  export OTEL_EXPORTER_OTLP_ENDPOINT=http://${HOST_IP}:14317
  if [[ "${OTEL_OTLP_TRACES_CUSTOM_ENDPOINT^^}" != "DEFAULT" ]]; then
    export OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=${OTEL_OTLP_TRACES_CUSTOM_ENDPOINT}
  fi
  export OTEL_SERVICE_NAME=${POD_NAMESPACE}
  export OTEL_AGENT_HOME=/usr/share/java/otel-agent
  source /usr/share/java/otel-agent/otel-agent-env.sh
  export JDK_JAVA_OPTIONS="${JDK_JAVA_OPTIONS} -Dotel.javaagent.configuration-file=/opt/caas/config/otel-javaagent.properties";
  if [[ "${ENABLE_OTEL_TRACE^^}" == "TRUE" ]]; then
    export JDK_JAVA_OPTIONS="${JDK_JAVA_OPTIONS} -Dotel.instrumentation.opentelemetry-api.enabled=true -Dotel.traces.exporter=otlp";
  fi
fi

echo "===> Adding JAAS security property to the java command ... "
SERVER_JAAS_FILE="${KAFKA_CONFIG_DIR}/shared/server_jaas.conf"
if [ -f "$SERVER_JAAS_FILE" ]; then
  echo "${SERVER_JAAS_FILE} exists"
  export EXTRA_ARGS="${EXTRA_ARGS} -Djava.security.auth.login.config=${KAFKA_CONFIG_DIR}/shared/server_jaas.conf"
else
  echo "No server_jaas.conf file present, using JAAS config from the server.properties file"
fi

echo "===> Adding kafka broker plugins to CLASSPATH ... "
export CLASSPATH="/opt/confluent/libs/*:$CLASSPATH"

echo "===> Adding kafka log4j config ... "
cat /opt/caas/config/kafka/log4j.properties
export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:/opt/caas/config/kafka/log4j.properties"

echo "===> Adding JVM config to the java command ... "
cat /opt/caas/config/kafka/jvm.config
export EXTRA_ARGS="$(cat /opt/caas/config/kafka/jvm.config | xargs) ${EXTRA_ARGS}"
# These ensure that the "if" sections for heap sizing, GC tuning, and JMX opts in kafka launch script do not trigger.
export KAFKA_HEAP_OPTS=' '
export KAFKA_JVM_PERFORMANCE_OPTS=' '
export JMX_PORT=' '

echo "===> Adding AWS_METADATA_SERVICE_TIMEOUT env var ... "
# This increases the timeout for connecting to IMDS service when getting token from STS.
# Ref: https://docs.aws.amazon.com/sdkref/latest/guide/feature-ec2-instance-metadata.html
export AWS_METADATA_SERVICE_TIMEOUT=3

# The following block of checks will prevent this script
# from starting up a Kafka server if confluent.repair.mode
# has been enabled (set to true) in the K8s configMap.
# This mode is targteted to allow the operator to execute certain
# runbooks on a Kafka Server when it is hosted within Kubernetes.
# This is an artifact of how Kafka is hosted in CCloud and
# some Kubernetes architectural requirements.

export logfile=${KAFKA_LOG4J_DIR}/main.log
trap_with_arg() {
    func="$1" ; shift
    for sig ; do
        trap "$func $sig" "$sig"
    done
}
export -f trap_with_arg
handle_signal() {
    echo "Caught Signal $1. Terminating Loop!" | tee -a $logfile
    exit
}
export -f handle_signal
kafka_repair_idle_loop() {
    # Signal handler registration
    trap_with_arg handle_signal SIGHUP SIGINT SIGTERM
    while true; do
        echo "$(date) Running repair mode and staying alive forever." | tee -a $logfile
        echo "$(date) Refer to runbook - https://github.com/confluentinc/cc-documentation/blob/master/Operations/RunBook/KafkaCore/Confluent%20Repair%20Mode" | tee -a $logfile
        echo "-" | tee -a $logfile
        sleep 10;
    done
}
export -f kafka_repair_idle_loop
export CONFLUENT_REPAIR_MODE=$(grep "^#confluent.repair.mode=" ${KAFKA_CONFIG_DIR}/kafka.properties | cut -d '=' -f 2)

SKIP_KAFKA_STARTUP="${CONFLUENT_REPAIR_MODE:-false}"

if [[ $SKIP_KAFKA_STARTUP == "true" || $SKIP_KAFKA_STARTUP == "TRUE" || $SKIP_KAFKA_STARTUP == "True" ]]; then
    # Run a dummy loop to keep PID 1 alive and emit some logs
    echo "===> CONFLUENT_REPAIR_MODE is ${CONFLUENT_REPAIR_MODE}. ${COMPONENT} will not be started in this mode"
    echo "===> Launching Repair Mode for ${COMPONENT} ... "
    exec /bin/bash -c 'kafka_repair_idle_loop' {} \;
fi

echo "===> Launching ${COMPONENT} ... "
exec /opt/confluent/bin/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka ${KAFKA_CONFIG_DIR}/kafka.properties
