""" Pytorch Inception-Resnet-V2 implementation
Sourced from https://github.com/Cadene/tensorflow-model-zoo.torch (MIT License) which is
based upon Google's Tensorflow implementation and pretrained weights (Apache 2.0 License)
"""
from functools import partial
import torch
import torch.nn as nn

from timm.data import IMAGENET_INCEPTION_MEAN, IMAGENET_INCEPTION_STD
from timm.layers import create_classifier, ConvNormAct
from ._builder import build_model_with_cfg
from ._manipulate import flatten_modules
from ._registry import register_model, generate_default_cfgs, register_model_deprecations

__all__ = ['InceptionResnetV2']


class Mixed_5b(nn.Module):
    def __init__(self, conv_block=None):
        super(Mixed_5b, self).__init__()
        conv_block = conv_block or ConvNormAct

        self.branch0 = conv_block(192, 96, kernel_size=1, stride=1)

        self.branch1 = nn.Sequential(
            conv_block(192, 48, kernel_size=1, stride=1),
            conv_block(48, 64, kernel_size=5, stride=1, padding=2)
        )

        self.branch2 = nn.Sequential(
            conv_block(192, 64, kernel_size=1, stride=1),
            conv_block(64, 96, kernel_size=3, stride=1, padding=1),
            conv_block(96, 96, kernel_size=3, stride=1, padding=1)
        )

        self.branch3 = nn.Sequential(
            nn.AvgPool2d(3, stride=1, padding=1, count_include_pad=False),
            conv_block(192, 64, kernel_size=1, stride=1)
        )

    def forward(self, x):
        x0 = self.branch0(x)
        x1 = self.branch1(x)
        x2 = self.branch2(x)
        x3 = self.branch3(x)
        out = torch.cat((x0, x1, x2, x3), 1)
        return out


class Block35(nn.Module):
    def __init__(self, scale=1.0, conv_block=None):
        super(Block35, self).__init__()
        self.scale = scale
        conv_block = conv_block or ConvNormAct

        self.branch0 = conv_block(320, 32, kernel_size=1, stride=1)

        self.branch1 = nn.Sequential(
            conv_block(320, 32, kernel_size=1, stride=1),
            conv_block(32, 32, kernel_size=3, stride=1, padding=1)
        )

        self.branch2 = nn.Sequential(
            conv_block(320, 32, kernel_size=1, stride=1),
            conv_block(32, 48, kernel_size=3, stride=1, padding=1),
            conv_block(48, 64, kernel_size=3, stride=1, padding=1)
        )

        self.conv2d = nn.Conv2d(128, 320, kernel_size=1, stride=1)
        self.act = nn.ReLU()

    def forward(self, x):
        x0 = self.branch0(x)
        x1 = self.branch1(x)
        x2 = self.branch2(x)
        out = torch.cat((x0, x1, x2), 1)
        out = self.conv2d(out)
        out = out * self.scale + x
        out = self.act(out)
        return out


class Mixed_6a(nn.Module):
    def __init__(self, conv_block=None):
        super(Mixed_6a, self).__init__()
        conv_block = conv_block or ConvNormAct

        self.branch0 = conv_block(320, 384, kernel_size=3, stride=2)

        self.branch1 = nn.Sequential(
            conv_block(320, 256, kernel_size=1, stride=1),
            conv_block(256, 256, kernel_size=3, stride=1, padding=1),
            conv_block(256, 384, kernel_size=3, stride=2)
        )

        self.branch2 = nn.MaxPool2d(3, stride=2)

    def forward(self, x):
        x0 = self.branch0(x)
        x1 = self.branch1(x)
        x2 = self.branch2(x)
        out = torch.cat((x0, x1, x2), 1)
        return out


class Block17(nn.Module):
    def __init__(self, scale=1.0, conv_block=None):
        super(Block17, self).__init__()
        self.scale = scale
        conv_block = conv_block or ConvNormAct

        self.branch0 = conv_block(1088, 192, kernel_size=1, stride=1)

        self.branch1 = nn.Sequential(
            conv_block(1088, 128, kernel_size=1, stride=1),
            conv_block(128, 160, kernel_size=(1, 7), stride=1, padding=(0, 3)),
            conv_block(160, 192, kernel_size=(7, 1), stride=1, padding=(3, 0))
        )

        self.conv2d = nn.Conv2d(384, 1088, kernel_size=1, stride=1)
        self.act = nn.ReLU()

    def forward(self, x):
        x0 = self.branch0(x)
        x1 = self.branch1(x)
        out = torch.cat((x0, x1), 1)
        out = self.conv2d(out)
        out = out * self.scale + x
        out = self.act(out)
        return out


class Mixed_7a(nn.Module):
    def __init__(self, conv_block=None):
        super(Mixed_7a, self).__init__()
        conv_block = conv_block or ConvNormAct

        self.branch0 = nn.Sequential(
            conv_block(1088, 256, kernel_size=1, stride=1),
            conv_block(256, 384, kernel_size=3, stride=2)
        )

        self.branch1 = nn.Sequential(
            conv_block(1088, 256, kernel_size=1, stride=1),
            conv_block(256, 288, kernel_size=3, stride=2)
        )

        self.branch2 = nn.Sequential(
            conv_block(1088, 256, kernel_size=1, stride=1),
            conv_block(256, 288, kernel_size=3, stride=1, padding=1),
            conv_block(288, 320, kernel_size=3, stride=2)
        )

        self.branch3 = nn.MaxPool2d(3, stride=2)

    def forward(self, x):
        x0 = self.branch0(x)
        x1 = self.branch1(x)
        x2 = self.branch2(x)
        x3 = self.branch3(x)
        out = torch.cat((x0, x1, x2, x3), 1)
        return out


class Block8(nn.Module):

    def __init__(self, scale=1.0, no_relu=False, conv_block=None):
        super(Block8, self).__init__()
        self.scale = scale
        conv_block = conv_block or ConvNormAct

        self.branch0 = conv_block(2080, 192, kernel_size=1, stride=1)

        self.branch1 = nn.Sequential(
            conv_block(2080, 192, kernel_size=1, stride=1),
            conv_block(192, 224, kernel_size=(1, 3), stride=1, padding=(0, 1)),
            conv_block(224, 256, kernel_size=(3, 1), stride=1, padding=(1, 0))
        )

        self.conv2d = nn.Conv2d(448, 2080, kernel_size=1, stride=1)
        self.relu = None if no_relu else nn.ReLU()

    def forward(self, x):
        x0 = self.branch0(x)
        x1 = self.branch1(x)
        out = torch.cat((x0, x1), 1)
        out = self.conv2d(out)
        out = out * self.scale + x
        if self.relu is not None:
            out = self.relu(out)
        return out


class InceptionResnetV2(nn.Module):
    def __init__(
            self,
            num_classes=1000,
            in_chans=3,
            drop_rate=0.,
            output_stride=32,
            global_pool='avg',
            norm_layer='batchnorm2d',
            norm_eps=1e-3,
            act_layer='relu',
    ):
        super(InceptionResnetV2, self).__init__()
        self.num_classes = num_classes
        self.num_features = self.head_hidden_size = 1536
        assert output_stride == 32
        conv_block = partial(
            ConvNormAct,
            padding=0,
            norm_layer=norm_layer,
            act_layer=act_layer,
            norm_kwargs=dict(eps=norm_eps),
            act_kwargs=dict(inplace=True),
        )

        self.conv2d_1a = conv_block(in_chans, 32, kernel_size=3, stride=2)
        self.conv2d_2a = conv_block(32, 32, kernel_size=3, stride=1)
        self.conv2d_2b = conv_block(32, 64, kernel_size=3, stride=1, padding=1)
        self.feature_info = [dict(num_chs=64, reduction=2, module='conv2d_2b')]

        self.maxpool_3a = nn.MaxPool2d(3, stride=2)
        self.conv2d_3b = conv_block(64, 80, kernel_size=1, stride=1)
        self.conv2d_4a = conv_block(80, 192, kernel_size=3, stride=1)
        self.feature_info += [dict(num_chs=192, reduction=4, module='conv2d_4a')]

        self.maxpool_5a = nn.MaxPool2d(3, stride=2)
        self.mixed_5b = Mixed_5b(conv_block=conv_block)
        self.repeat = nn.Sequential(*[Block35(scale=0.17, conv_block=conv_block) for _ in range(10)])
        self.feature_info += [dict(num_chs=320, reduction=8, module='repeat')]

        self.mixed_6a = Mixed_6a(conv_block=conv_block)
        self.repeat_1 = nn.Sequential(*[Block17(scale=0.10, conv_block=conv_block) for _ in range(20)])
        self.feature_info += [dict(num_chs=1088, reduction=16, module='repeat_1')]

        self.mixed_7a = Mixed_7a(conv_block=conv_block)
        self.repeat_2 = nn.Sequential(*[Block8(scale=0.20, conv_block=conv_block) for _ in range(9)])

        self.block8 = Block8(no_relu=True, conv_block=conv_block)
        self.conv2d_7b = conv_block(2080, self.num_features, kernel_size=1, stride=1)
        self.feature_info += [dict(num_chs=self.num_features, reduction=32, module='conv2d_7b')]

        self.global_pool, self.head_drop, self.classif = create_classifier(
            self.num_features, self.num_classes, pool_type=global_pool, drop_rate=drop_rate)

    @torch.jit.ignore
    def group_matcher(self, coarse=False):
        module_map = {k: i for i, (k, _) in enumerate(flatten_modules(self.named_children(), prefix=()))}
        module_map.pop(('classif',))

        def _matcher(name):
            if any([name.startswith(n) for n in ('conv2d_1', 'conv2d_2')]):
                return 0
            elif any([name.startswith(n) for n in ('conv2d_3', 'conv2d_4')]):
                return 1
            elif any([name.startswith(n) for n in ('block8', 'conv2d_7')]):
                return len(module_map) + 1
            else:
                for k in module_map.keys():
                    if k == tuple(name.split('.')[:len(k)]):
                        return module_map[k]
                return float('inf')
        return _matcher

    @torch.jit.ignore
    def set_grad_checkpointing(self, enable=True):
        assert not enable, "checkpointing not supported"

    @torch.jit.ignore
    def get_classifier(self) -> nn.Module:
        return self.classif

    def reset_classifier(self, num_classes: int, global_pool: str = 'avg'):
        self.num_classes = num_classes
        self.global_pool, self.classif = create_classifier(self.num_features, self.num_classes, pool_type=global_pool)

    def forward_features(self, x):
        x = self.conv2d_1a(x)
        x = self.conv2d_2a(x)
        x = self.conv2d_2b(x)
        x = self.maxpool_3a(x)
        x = self.conv2d_3b(x)
        x = self.conv2d_4a(x)
        x = self.maxpool_5a(x)
        x = self.mixed_5b(x)
        x = self.repeat(x)
        x = self.mixed_6a(x)
        x = self.repeat_1(x)
        x = self.mixed_7a(x)
        x = self.repeat_2(x)
        x = self.block8(x)
        x = self.conv2d_7b(x)
        return x

    def forward_head(self, x, pre_logits: bool = False):
        x = self.global_pool(x)
        x = self.head_drop(x)
        return x if pre_logits else self.classif(x)

    def forward(self, x):
        x = self.forward_features(x)
        x = self.forward_head(x)
        return x


def _create_inception_resnet_v2(variant, pretrained=False, **kwargs):
    return build_model_with_cfg(InceptionResnetV2, variant, pretrained, **kwargs)


default_cfgs = generate_default_cfgs({
    # ported from http://download.tensorflow.org/models/inception_resnet_v2_2016_08_30.tar.gz
    'inception_resnet_v2.tf_in1k': {
        'hf_hub_id': 'timm/',
        'num_classes': 1000, 'input_size': (3, 299, 299), 'pool_size': (8, 8),
        'crop_pct': 0.8975, 'interpolation': 'bicubic',
        'mean': IMAGENET_INCEPTION_MEAN, 'std': IMAGENET_INCEPTION_STD,
        'first_conv': 'conv2d_1a.conv', 'classifier': 'classif',
    },
    # As per https://arxiv.org/abs/1705.07204 and
    # ported from http://download.tensorflow.org/models/ens_adv_inception_resnet_v2_2017_08_18.tar.gz
    'inception_resnet_v2.tf_ens_adv_in1k': {
        'hf_hub_id': 'timm/',
        'num_classes': 1000, 'input_size': (3, 299, 299), 'pool_size': (8, 8),
        'crop_pct': 0.8975, 'interpolation': 'bicubic',
        'mean': IMAGENET_INCEPTION_MEAN, 'std': IMAGENET_INCEPTION_STD,
        'first_conv': 'conv2d_1a.conv', 'classifier': 'classif',
    }
})


@register_model
def inception_resnet_v2(pretrained=False, **kwargs) -> InceptionResnetV2:
    return _create_inception_resnet_v2('inception_resnet_v2', pretrained=pretrained, **kwargs)


register_model_deprecations(__name__, {
    'ens_adv_inception_resnet_v2': 'inception_resnet_v2.tf_ens_adv_in1k',
})