Source code for greatx.attack.targeted.gf_attack

from typing import Optional

import numpy as np
import scipy.sparse as sp
import torch
from scipy import linalg
from torch import Tensor
from torch_geometric.data import Data
from tqdm.auto import tqdm

from greatx.attack.targeted.targeted_attacker import TargetedAttacker
from greatx.utils import singleton_filter


[docs]class GFAttack(TargetedAttacker): r"""Implementation of `GFA` attack from the: `"A Restricted Black - box Adversarial Framework Towards Attacking Graph Embedding Models" <https://arxiv.org/abs/1908.01297>`_ paper (AAAI'20) Parameters ---------- data : Data PyG-like data denoting the input graph K : int, optional the order of graph filter, by default 2 T : int, optional top-T largest eigen-values/vectors selected, by default 128 device : str, optional the device of the attack running on, by default "cpu" seed : Optional[int], optional the random seed of reproduce the attack, by default None name : Optional[str], optional name of the attacker, if None, it would be `__class__.__name__`, by default None Raises ------ TypeError unexpected keyword argument in :obj:`kwargs` Example ------- .. code-block:: python from greatx.dataset import GraphDataset import torch_geometric.transforms as T import os.path as osp dataset = GraphDataset(root='.', name='Cora', transform=T.LargestConnectedComponents()) data = dataset[0] from greatx.attack.targeted import IGAttack attacker = IGAttack(data) # attacking target node `1` with default budget set as node degree attacker.attack(target=1) # attacking target node `1` with budget set as 1 attacker.attack(target=1, num_budgets=1) attacker.data() # get attacked graph attacker.edge_flips() # get edge flips after attack attacker.added_edges() # get added edges after attack attacker.removed_edges() # get removed edges after attack Note ---- * In the paper, the authors mainly consider the single edge perturbations, i.e., :obj:`num_budgets=1`. # noqa * Please remember to call :meth:`reset` before each attack. * T=128 for citeseer and pubmed, T=num_nodes//2 for cora to reproduce results in paper. # noqa """ def __init__(self, data: Data, K: int = 2, T: int = 128, device: str = "cpu", seed: Optional[int] = None, name: Optional[str] = None, **kwargs): super().__init__(data=data, device=device, seed=seed, name=name, **kwargs) adj = self.adjacency_matrix adj = adj + sp.eye(adj.shape[0], format='csr') deg = np.diag(adj.sum(1).A1) eig_vals, eig_vec = linalg.eigh(adj.A, deg) self.eig_vals = torch.as_tensor(eig_vals, device=self.device, dtype=torch.float32) self.eig_vec = torch.as_tensor(eig_vec, device=self.device, dtype=torch.float32) feat = self.feat # the author named this as `x_mean`, I don't understand why not `x_sum` self.x_mean = feat.sum(1) self.K = K self.T = T
[docs] def get_candidate_edges(self): target = self.target N = self.num_nodes nodes_set = set(range(N)) - set([target]) if self.direct_attack: influencers = [target] row = np.repeat(influencers, N - 1) col = list(nodes_set) else: influencers = self.adjacency_matrix[target].indices row = np.repeat(influencers, N - 2) col = np.hstack( [list(nodes_set - set([infl])) for infl in influencers]) candidate_edges = np.stack([row, col], axis=1) if not self._allow_singleton: candidate_edges = singleton_filter(candidate_edges, self.adjacency_matrix) return candidate_edges
[docs] def attack(self, target, *, num_budgets=None, direct_attack=True, structure_attack=True, feature_attack=False, ll_constraint=False, ll_cutoff=0.004, disable=False): super().attack(target, target_label=None, num_budgets=num_budgets, direct_attack=direct_attack, structure_attack=structure_attack, feature_attack=feature_attack) candidate_edges = self.get_candidate_edges() score = self.structure_score(self.adjacency_matrix, self.x_mean, self.eig_vals, self.eig_vec, candidate_edges, K=self.K, T=self.T, method="nosum") topk = torch.topk(score, k=self.num_budgets).indices.cpu() edges = candidate_edges[topk].reshape(-1, 2) edge_weights = self.adjacency_matrix[edges[:, 0], edges[:, 1]].A1 for it, edge_weight in tqdm(enumerate(edge_weights), desc='Peturbing graph...', disable=disable): u, v = edges[it] if edge_weight > 0: self.remove_edge(u, v, it) else: self.add_edge(u, v, it) return self
[docs] @staticmethod def structure_score(A: sp.csr_matrix, x_mean: Tensor, eig_vals: Tensor, eig_vec: Tensor, candidate_edges: np.ndarray, K: int, T: int, method: str = "nosum"): """Calculate the score of potential edges as formulated in paper. Parameters ---------- A : sp.csr_matrix the graph adjacency matrix x_mean : torch.Tensor eig_vals : torch.Tensor the eigen value eig_vec : torch.Tensor the eigen vector candidate_edges : np.ndarray the candidate_edges to be selected K : int The order of graph filter K. T : int Selecting the Top-T largest eigen-values/vectors. method : str, optional "sum" or "nosum" Indicates the score are calculated from which loss as in Equation (8) or Equation (12). "nosum" denotes Equation (8), where the loss is derived from Graph Convolutional Networks, "sum" denotes Equation (12), where the loss is derived from Sampling-based Graph Embedding Methods, by default "nosum" Returns ------- Tensor Scores for potential edges. """ assert method in ['sum', 'nosum'] D_min = A.sum(1).A1.min() + 1 # `+1` for the added selfloop score = [] for (u, v) in candidate_edges: eig_vals_res = (1 - 2 * A[ (u, v)]) * (2 * eig_vec[u] * eig_vec[v] - eig_vals * (eig_vec[u].square() + eig_vec[v].square())) eig_vals_res = eig_vals + eig_vals_res if method == "sum": if K == 1: eig_vals_res = (eig_vals_res / K).abs().mul(1. / D_min) else: for itr in range(1, K): eig_vals_res = eig_vals_res + eig_vals_res.pow(itr + 1) eig_vals_res = (eig_vals_res / K).abs().mul(1. / D_min) else: eig_vals_res = (eig_vals_res + 1.).square().pow(K) # from small to large least_t = torch.topk(eig_vals_res, k=T, largest=False).indices eig_vals_k_sum = eig_vals_res[least_t].sum() u_k = eig_vec[:, least_t] u_x_mean = u_k.t() @ x_mean score_u_v = eig_vals_k_sum * \ torch.square(torch.linalg.norm(u_x_mean)) score.append(score_u_v.item()) return torch.as_tensor(score)