Source code for greatx.nn.layers.gcn_conv

from typing import Optional, Tuple

import torch
from torch import Tensor, nn
from torch_geometric.nn.conv.gcn_conv import gcn_norm
from torch_geometric.nn.dense.linear import Linear
from torch_geometric.nn.inits import zeros
from torch_geometric.typing import Adj, OptTensor
from torch_geometric.utils import add_self_loops
from torch_sparse import SparseTensor, fill_diag

from greatx.functional import spmm


def dense_gcn_norm(adj: Tensor, improved: bool = False,
                   add_self_loops: bool = True, rate: float = -0.5) -> Tensor:
    fill_value = 2. if improved else 1.
    if add_self_loops:
        adj = dense_add_self_loops(adj, fill_value)
    deg = adj.sum(dim=1)
    deg_inv_sqrt = deg.pow_(rate)
    deg_inv_sqrt.masked_fill_(deg_inv_sqrt == float('inf'), 0.)
    norm_src = deg_inv_sqrt.view(1, -1)
    norm_dst = deg_inv_sqrt.view(-1, 1)
    adj = norm_src * adj * norm_dst
    return adj


def dense_add_self_loops(adj: Tensor, fill_value: float = 1.0) -> Tensor:
    diag = torch.diag(adj.new_full((adj.size(0), ), fill_value))
    return adj + diag


def make_self_loops(
    edge_index: Adj,
    edge_weight: OptTensor = None,
    num_nodes: Optional[int] = None,
    fill_value: float = 1.0,
    improved: bool = False,
) -> Tuple[Adj, OptTensor]:
    r"""Add self-loop edges for input graph.

    Parameters
    ----------
    edge_index : Adj
        input graph denoted by `edge_index`, could be
        :obj:`torch.FloatTensor`,
        :obj:`torch_sparse.SparseTensor`,
        or :obj:`torch.LongTensor`.
    edge_weight : OptTensor, optional
        edge weights for the input edge_index, by default None
    num_nodes : Optional[int], optional
        number of nodes, by default None
    fill_value : float, optional
        fill value for the added self-loop edges,
        by default 1.0
    improved : bool, optional
        whether the layer computes
        :math:`\mathbf{\hat{A}}` as :math:`\mathbf{A} + 2\mathbf{I}`,
        by default False

    Returns
    -------
    Tuple[Adj, OptTensor]
        output edge indices and edge weights with
        added self-loop edges.
    """

    fill_value = 2. if improved else 1.
    if isinstance(edge_index, Tensor) and edge_index.dtype == torch.long:
        # Sparse edge_index with shape [2, M]
        edge_index, edge_weight = add_self_loops(edge_index, edge_weight,
                                                 fill_value=fill_value,
                                                 num_nodes=num_nodes)
    elif isinstance(edge_index, Tensor) and edge_index.dtype == torch.float:
        # N by N dense adjacency matrix
        edge_index = dense_add_self_loops(edge_index, fill_value)
    elif isinstance(edge_index, SparseTensor):
        edge_index = fill_diag(edge_index, fill_value)
    else:
        raise ValueError(f"Type {type(edge_index)} is not supported.")

    return edge_index, edge_weight


def make_gcn_norm(
    edge_index: Adj,
    edge_weight: OptTensor = None,
    num_nodes: Optional[int] = None,
    add_self_loops: bool = True,
    dtype: Optional[torch.dtype] = None,
) -> Tuple[Adj, OptTensor]:
    r"""Perform GCN-normalization :math:`\mathbf{\hat{D}}^{-1/2}
    \mathbf{\hat{A}} \mathbf{\hat{D}}^{-1/2}`for input graph.

    Parameters
    ----------
    edge_index : Adj
        input graph denoted by `edge_index`, could be
        :obj:`torch.FloatTensor`,
        :obj:`torch_sparse.SparseTensor`,
        or :obj:`torch.LongTensor`.
    edge_weight : OptTensor, optional
        edge weights for the input edge_index, by default None
    num_nodes : Optional[int], optional
        number of nodes, by default None
    add_self_loops : bool, optional
        whether to add self-loop edges, by default True
    dtype : Optional[torch.dtype], optional
        types of edge weights of added self-loop edges
        if :obj:`add_self_loops=True`.

    Returns
    -------
    Tuple[Adj, OptTensor]
        output normalized graph denoted as
        :obj:`edge_index` and :obj:`edge_weight`.
    """
    if isinstance(edge_index, Tensor) and edge_index.dtype == torch.long:
        # Sparse edge_index with shape [2, M]
        edge_index, edge_weight = gcn_norm(edge_index, edge_weight,
                                           num_nodes=num_nodes, improved=False,
                                           add_self_loops=add_self_loops,
                                           dtype=dtype)
    elif isinstance(edge_index, Tensor) and edge_index.dtype == torch.float:
        # N by N dense adjacency matrix
        edge_index = dense_gcn_norm(edge_index, improved=False,
                                    add_self_loops=add_self_loops)
    elif isinstance(edge_index, SparseTensor):
        edge_index = gcn_norm(edge_index, num_nodes=num_nodes, improved=False,
                              add_self_loops=add_self_loops, dtype=dtype)
    else:
        raise ValueError(f"Type {type(edge_index)} is not supported.")

    return edge_index, edge_weight


[docs]class GCNConv(nn.Module): r"""The graph convolutional operator from the `"Semi-supervised Classification with Graph Convolutional Networks" <https://arxiv.org/abs/1609.02907>`_ paper (ICLR'17) Parameters ---------- in_channels : int dimensions of int samples out_channels : int dimensions of output samples improved : bool, optional whether the layer computes :math:`\mathbf{\hat{A}}` as :math:`\mathbf{A} + 2\mathbf{I}`, by default False cached : bool, optional (*UNUSED*) whether the layer will cache the computation of :math:`\mathbf{\hat{D}}^{-1/2} \mathbf{\hat{A}} \mathbf{\hat{D}}^{-1/2}` on first execution, and will use the cached version for further executions, by default False add_self_loops : bool, optional whether to add self-loops to the input graph, by default True normalize : bool, optional whether to compute symmetric normalization coefficients on the fly, by default True bias : bool, optional whether to use bias in the layers, by default True Note ---- Different from that in :class:`torch_geometric`, for the input :obj:`edge_index`, our implementation supports :obj:`torch.FloatTensor`, :obj:`torch.LongTensor` and obj:`torch_sparse.SparseTensor`. In addition, the argument :obj:`cached` is unused. We add this argument to be compatible with :class:`torch_geometric`. See also -------- :class:`greatx.nn.models.supervised.GCN` """ def __init__(self, in_channels: int, out_channels: int, improved: bool = False, cached: bool = False, add_self_loops: bool = True, normalize: bool = True, bias: bool = True): super().__init__() self.in_channels = in_channels self.out_channels = out_channels self.improved = improved self.cached = cached # NOTE: unused now self.add_self_loops = add_self_loops self.normalize = normalize self.lin = Linear(in_channels, out_channels, bias=False, weight_initializer='glorot') if bias: self.bias = nn.Parameter(torch.Tensor(out_channels)) else: self.register_parameter('bias', None) self.reset_parameters()
[docs] def reset_parameters(self): self.lin.reset_parameters() zeros(self.bias)
[docs] def forward(self, x: Tensor, edge_index: Adj, edge_weight: OptTensor = None) -> Tensor: """""" x = self.lin(x) if self.add_self_loops: edge_index, edge_weight = make_self_loops(edge_index, edge_weight, num_nodes=x.size(0), improved=self.improved) if self.normalize: edge_index, edge_weight = make_gcn_norm(edge_index, edge_weight, num_nodes=x.size(0), dtype=x.dtype, add_self_loops=False) out = spmm(x, edge_index, edge_weight) if self.bias is not None: out += self.bias return out
def __repr__(self) -> str: return (f'{self.__class__.__name__}({self.in_channels}, ' f'{self.out_channels})')