Source code for aiokafka.errors

import inspect
import sys

from kafka.errors import (
    KafkaError,
    IllegalStateError,
    IllegalArgumentError,
    NoBrokersAvailable,
    NodeNotReadyError,
    KafkaProtocolError,
    CorrelationIdError,
    Cancelled,
    TooManyInFlightRequests,
    StaleMetadata,
    UnrecognizedBrokerVersion,
    CommitFailedError,
    AuthenticationMethodNotSupported,
    AuthenticationFailedError,
    BrokerResponseError,

    # Numbered errors
    NoError,  # 0
    UnknownError,  # -1
    OffsetOutOfRangeError,  # 1
    CorruptRecordException,  # 2
    UnknownTopicOrPartitionError,  # 3
    InvalidFetchRequestError,  # 4
    LeaderNotAvailableError,  # 5
    NotLeaderForPartitionError,  # 6
    RequestTimedOutError,  # 7
    BrokerNotAvailableError,  # 8
    ReplicaNotAvailableError,  # 9
    MessageSizeTooLargeError,  # 10
    StaleControllerEpochError,  # 11
    OffsetMetadataTooLargeError,  # 12
    StaleLeaderEpochCodeError,  # 13
    GroupLoadInProgressError,  # 14
    GroupCoordinatorNotAvailableError,  # 15
    NotCoordinatorForGroupError,  # 16
    InvalidTopicError,  # 17
    RecordListTooLargeError,  # 18
    NotEnoughReplicasError,  # 19
    NotEnoughReplicasAfterAppendError,  # 20
    InvalidRequiredAcksError,  # 21
    IllegalGenerationError,  # 22
    InconsistentGroupProtocolError,  # 23
    InvalidGroupIdError,  # 24
    UnknownMemberIdError,  # 25
    InvalidSessionTimeoutError,  # 26
    RebalanceInProgressError,  # 27
    InvalidCommitOffsetSizeError,  # 28
    TopicAuthorizationFailedError,  # 29
    GroupAuthorizationFailedError,  # 30
    ClusterAuthorizationFailedError,  # 31
    InvalidTimestampError,  # 32
    UnsupportedSaslMechanismError,  # 33
    IllegalSaslStateError,  # 34
    UnsupportedVersionError,  # 35
    TopicAlreadyExistsError,  # 36
    InvalidPartitionsError,  # 37
    InvalidReplicationFactorError,  # 38
    InvalidReplicationAssignmentError,  # 39
    InvalidConfigurationError,  # 40
    NotControllerError,  # 41
    InvalidRequestError,  # 42
    UnsupportedForMessageFormatError,  # 43
    PolicyViolationError,  # 44

    KafkaUnavailableError,
    KafkaTimeoutError,
    KafkaConnectionError,
    UnsupportedCodecError,
)

__all__ = [
    # aiokafka custom errors
    "ConsumerStoppedError", "NoOffsetForPartitionError", "RecordTooLargeError",
    "ProducerClosed",

    # Kafka Python errors
    "KafkaError",
    "IllegalStateError",
    "IllegalArgumentError",
    "NoBrokersAvailable",
    "NodeNotReadyError",
    "KafkaProtocolError",
    "CorrelationIdError",
    "Cancelled",
    "TooManyInFlightRequests",
    "StaleMetadata",
    "UnrecognizedBrokerVersion",
    "CommitFailedError",
    "AuthenticationMethodNotSupported",
    "AuthenticationFailedError",
    "BrokerResponseError",

    # Numbered errors
    "NoError",  # 0
    "UnknownError",  # -1
    "OffsetOutOfRangeError",  # 1
    "CorruptRecordException",  # 2
    "UnknownTopicOrPartitionError",  # 3
    "InvalidFetchRequestError",  # 4
    "LeaderNotAvailableError",  # 5
    "NotLeaderForPartitionError",  # 6
    "RequestTimedOutError",  # 7
    "BrokerNotAvailableError",  # 8
    "ReplicaNotAvailableError",  # 9
    "MessageSizeTooLargeError",  # 10
    "StaleControllerEpochError",  # 11
    "OffsetMetadataTooLargeError",  # 12
    "StaleLeaderEpochCodeError",  # 13
    "GroupLoadInProgressError",  # 14
    "GroupCoordinatorNotAvailableError",  # 15
    "NotCoordinatorForGroupError",  # 16
    "InvalidTopicError",  # 17
    "RecordListTooLargeError",  # 18
    "NotEnoughReplicasError",  # 19
    "NotEnoughReplicasAfterAppendError",  # 20
    "InvalidRequiredAcksError",  # 21
    "IllegalGenerationError",  # 22
    "InconsistentGroupProtocolError",  # 23
    "InvalidGroupIdError",  # 24
    "UnknownMemberIdError",  # 25
    "InvalidSessionTimeoutError",  # 26
    "RebalanceInProgressError",  # 27
    "InvalidCommitOffsetSizeError",  # 28
    "TopicAuthorizationFailedError",  # 29
    "GroupAuthorizationFailedError",  # 30
    "ClusterAuthorizationFailedError",  # 31
    "InvalidTimestampError",  # 32
    "UnsupportedSaslMechanismError",  # 33
    "IllegalSaslStateError",  # 34
    "UnsupportedVersionError",  # 35
    "TopicAlreadyExistsError",  # 36
    "InvalidPartitionsError",  # 37
    "InvalidReplicationFactorError",  # 38
    "InvalidReplicationAssignmentError",  # 39
    "InvalidConfigurationError",  # 40
    "NotControllerError",  # 41
    "InvalidRequestError",  # 42
    "UnsupportedForMessageFormatError",  # 43
    "PolicyViolationError",  # 44

    "KafkaUnavailableError",
    "KafkaTimeoutError",
    "KafkaConnectionError",
    "UnsupportedCodecError",
]


[docs]class CoordinatorNotAvailableError(GroupCoordinatorNotAvailableError): message = "COORDINATOR_NOT_AVAILABLE"
[docs]class NotCoordinatorError(NotCoordinatorForGroupError): message = "NOT_COORDINATOR"
[docs]class CoordinatorLoadInProgressError(GroupLoadInProgressError): message = "COORDINATOR_LOAD_IN_PROGRESS"
InvalidMessageError = CorruptRecordException GroupCoordinatorNotAvailableError = CoordinatorNotAvailableError NotCoordinatorForGroupError = NotCoordinatorError GroupLoadInProgressError = CoordinatorLoadInProgressError
[docs]class ConsumerStoppedError(Exception): """ Raised on `get*` methods of Consumer if it's cancelled, even pending ones. """
[docs]class IllegalOperation(Exception): """ Raised if you try to execute an operation, that is not available with current configuration. For example trying to commit if no group_id was given. """
[docs]class NoOffsetForPartitionError(KafkaError): pass
[docs]class RecordTooLargeError(KafkaError): pass
[docs]class ProducerClosed(KafkaError): pass
[docs]class ProducerFenced(KafkaError): """Another producer with the same transactional ID went online. NOTE: As it seems this will be raised by Broker if transaction timeout occurred also. """ def __init__( self, msg="There is a newer producer using the same transactional_id or" "transaction timeout occurred (check that processing time is " "below transaction_timeout_ms)" ): super().__init__(msg)
[docs]class OutOfOrderSequenceNumber(BrokerResponseError): errno = 45 message = 'OUT_OF_ORDER_SEQUENCE_NUMBER' description = 'The broker received an out of order sequence number'
[docs]class DuplicateSequenceNumber(BrokerResponseError): errno = 46 message = 'DUPLICATE_SEQUENCE_NUMBER' description = 'The broker received a duplicate sequence number'
[docs]class InvalidProducerEpoch(BrokerResponseError): errno = 47 message = 'INVALID_PRODUCER_EPOCH' description = ( 'Producer attempted an operation with an old epoch. Either ' 'there is a newer producer with the same transactionalId, or the ' 'producer\'s transaction has been expired by the broker.' )
[docs]class InvalidTxnState(BrokerResponseError): errno = 48 message = 'INVALID_TXN_STATE' description = ( 'The producer attempted a transactional operation in an invalid state' )
[docs]class InvalidProducerIdMapping(BrokerResponseError): errno = 49 message = 'INVALID_PRODUCER_ID_MAPPING' description = ( 'The producer attempted to use a producer id which is not currently ' 'assigned to its transactional id' )
[docs]class InvalidTransactionTimeout(BrokerResponseError): errno = 50 message = 'INVALID_TRANSACTION_TIMEOUT' description = ( 'The transaction timeout is larger than the maximum value allowed by' ' the broker (as configured by transaction.max.timeout.ms).' )
[docs]class ConcurrentTransactions(BrokerResponseError): errno = 51 message = 'CONCURRENT_TRANSACTIONS' description = ( 'The producer attempted to update a transaction while another ' 'concurrent operation on the same transaction was ongoing' )
[docs]class TransactionCoordinatorFenced(BrokerResponseError): errno = 52 message = 'TRANSACTION_COORDINATOR_FENCED' description = ( 'Indicates that the transaction coordinator sending a WriteTxnMarker' ' is no longer the current coordinator for a given producer' )
[docs]class TransactionalIdAuthorizationFailed(BrokerResponseError): errno = 53 message = 'TRANSACTIONAL_ID_AUTHORIZATION_FAILED' description = 'Transactional Id authorization failed'
[docs]class SecurityDisabled(BrokerResponseError): errno = 54 message = 'SECURITY_DISABLED' description = 'Security features are disabled'
[docs]class OperationNotAttempted(BrokerResponseError): errno = 55 message = 'OPERATION_NOT_ATTEMPTED' description = ( 'The broker did not attempt to execute this operation. This may happen' ' for batched RPCs where some operations in the batch failed, causing ' 'the broker to respond without trying the rest.' )
[docs]class KafkaStorageError(BrokerResponseError): errno = 56 message = 'KAFKA_STORAGE_ERROR' description = ( 'The user-specified log directory is not found in the broker config.' )
[docs]class LogDirNotFound(BrokerResponseError): errno = 57 message = 'LOG_DIR_NOT_FOUND' description = ( 'The user-specified log directory is not found in the broker config.' )
[docs]class SaslAuthenticationFailed(BrokerResponseError): errno = 58 message = 'SASL_AUTHENTICATION_FAILED' description = 'SASL Authentication failed.'
[docs]class UnknownProducerId(BrokerResponseError): errno = 59 message = 'UNKNOWN_PRODUCER_ID' description = ( 'This exception is raised by the broker if it could not locate the ' 'producer metadata associated with the producerId in question. This ' 'could happen if, for instance, the producer\'s records were deleted ' 'because their retention time had elapsed. Once the last records of ' 'the producerId are removed, the producer\'s metadata is removed from' ' the broker, and future appends by the producer will return this ' 'exception.' )
[docs]class ReassignmentInProgress(BrokerResponseError): errno = 60 message = 'REASSIGNMENT_IN_PROGRESS' description = 'A partition reassignment is in progress'
[docs]class DelegationTokenAuthDisabled(BrokerResponseError): errno = 61 message = 'DELEGATION_TOKEN_AUTH_DISABLED' description = 'Delegation Token feature is not enabled'
[docs]class DelegationTokenNotFound(BrokerResponseError): errno = 62 message = 'DELEGATION_TOKEN_NOT_FOUND' description = 'Delegation Token is not found on server.'
[docs]class DelegationTokenOwnerMismatch(BrokerResponseError): errno = 63 message = 'DELEGATION_TOKEN_OWNER_MISMATCH' description = 'Specified Principal is not valid Owner/Renewer.'
[docs]class DelegationTokenRequestNotAllowed(BrokerResponseError): errno = 64 message = 'DELEGATION_TOKEN_REQUEST_NOT_ALLOWED' description = ( 'Delegation Token requests are not allowed on PLAINTEXT/1-way SSL ' 'channels and on delegation token authenticated channels.' )
[docs]class DelegationTokenAuthorizationFailed(BrokerResponseError): errno = 65 message = 'DELEGATION_TOKEN_AUTHORIZATION_FAILED' description = 'Delegation Token authorization failed.'
[docs]class DelegationTokenExpired(BrokerResponseError): errno = 66 message = 'DELEGATION_TOKEN_EXPIRED' description = 'Delegation Token is expired.'
[docs]class InvalidPrincipalType(BrokerResponseError): errno = 67 message = 'INVALID_PRINCIPAL_TYPE' description = 'Supplied principalType is not supported'
[docs]class NonEmptyGroup(BrokerResponseError): errno = 68 message = 'NON_EMPTY_GROUP' description = 'The group is not empty'
[docs]class GroupIdNotFound(BrokerResponseError): errno = 69 message = 'GROUP_ID_NOT_FOUND' description = 'The group id does not exist'
[docs]class FetchSessionIdNotFound(BrokerResponseError): errno = 70 message = 'FETCH_SESSION_ID_NOT_FOUND' description = 'The fetch session ID was not found'
[docs]class InvalidFetchSessionEpoch(BrokerResponseError): errno = 71 message = 'INVALID_FETCH_SESSION_EPOCH' description = 'The fetch session epoch is invalid'
[docs]class ListenerNotFound(BrokerResponseError): errno = 72 message = 'LISTENER_NOT_FOUND' description = ( 'There is no listener on the leader broker that matches the' ' listener on which metadata request was processed' )
def _iter_broker_errors(): for name, obj in inspect.getmembers(sys.modules[__name__]): if inspect.isclass(obj) and issubclass(obj, BrokerResponseError) and \ obj != BrokerResponseError: yield obj kafka_errors = {x.errno: x for x in _iter_broker_errors()} def for_code(error_code): return kafka_errors.get(error_code, UnknownError)