Skip to content
Snippets Groups Projects
Commit fcb1e016 authored by Solène's avatar Solène
Browse files

Initial commit

parents
No related branches found
No related tags found
No related merge requests found
Showing
with 638 additions and 0 deletions
.DS_Store 0 → 100644
File added
File added
File added
File added
File added
File added
File added
File added
File added
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from tools_stegano import IDCT8_Net, find_lambda
from double_tanh import double_compact_tanh, double_tanh
class BackPack(nn.Module):
def __init__(self, c_coeffs, rho_0, entropy, N, nets, params, ecdf_list):
super(BackPack, self).__init__()
self.net_decompress = IDCT8_Net(params)
self.N = N
self.entropy = torch.tensor(entropy)
self.im_size = params.image_size
self.rho_vec = torch.nn.Parameter(data=torch.tensor(rho_0), requires_grad=True)
self.c_coeffs = torch.tensor(c_coeffs).double()
self.spatial_cover = self.net_decompress.forward(torch.reshape(self.c_coeffs,(1,1,self.im_size, self.im_size)))/255
self.nets = nets
proba_cover = [torch.nn.Softmax(dim=1)(net.forward(self.spatial_cover.cuda().float()))[0,1] \
for net in nets]
self.proba_cover = np.array([ecdf(x.cpu().detach().numpy()) for x,ecdf in zip(proba_cover,ecdf_list)])
self.find_lambda_fn = find_lambda.apply
self.attack=params.attack
if(self.attack=='SGE'):
self.mod = torch.reshape(torch.tensor([-1,0,1]), (3,1,1))
self.smoothing_fn = lambda u, p, tau : \
torch.sum(torch.nn.Softmax(dim=1)((-torch.log(-torch.log(u))+torch.log(p+1e-30))/tau)*self.mod,axis=1) if tau>0 \
else torch.argmax(-torch.log(-torch.log(u))+torch.log(p+1e-30),axis=1) - 1
elif(self.attack=='DoCoTanh'):
self.smoothing_fn = double_compact_tanh.apply
elif(self.attack=='DoTanh'):
self.smoothing_fn = double_tanh
self.ecdf_list = ecdf_list
def forward(self, tau):
if(self.attack=='SGE'):
u = torch.rand(size=(self.N, 3, self.im_size, self.im_size))
else:
u = torch.rand(size=(self.N, self.im_size, self.im_size))
# Compute modifications for soft stego
lbd = self.find_lambda_fn(self.entropy, self.rho_vec)
probas = torch.nn.Softmax(dim=0)(-lbd*(self.rho_vec-torch.min(self.rho_vec,dim=0)[0]))
b = self.smoothing_fn(u, probas, tau)
stego_soft = torch.reshape(self.c_coeffs+b, (self.N,1,self.im_size,self.im_size))
spatial_image_soft = self.net_decompress.forward(stego_soft)/255
logits = [torch.reshape(net.forward(spatial_image_soft.cuda().float()),(1,self.N,2)) \
for net in self.nets]
logits = torch.cat(logits)
probas_soft = torch.nn.Softmax(dim=2)(logits)[:,:,-1]
mean_probas_soft = torch.mean(probas_soft,axis=-1)
mean_probas_soft_cpu = mean_probas_soft.clone().cpu().detach().numpy()
mean_probas_soft_cpu = np.array([ecdf(x) for ecdf,x in zip(self.ecdf_list, mean_probas_soft_cpu)])
argmax = np.argmax(mean_probas_soft_cpu)
best_probas_soft = mean_probas_soft[argmax]
# Compute modifications for real/hard stego
with torch.no_grad():
b_hard = self.smoothing_fn(u, probas, 0)
stego_hard = torch.reshape(self.c_coeffs+b_hard, (self.N, 1, self.im_size, self.im_size))
spatial_image_hard = self.net_decompress.forward(stego_hard)/255
logits = [torch.reshape(net.forward(spatial_image_hard.cuda().float()),(1,self.N,2)) \
for net in self.nets]
logits = torch.cat(logits)
probas_hard = torch.nn.Softmax(dim=2)(logits)[:,:,-1]
mean_probas_hard = torch.mean(probas_hard,axis=-1).cpu().detach().numpy()
mean_probas_hard_cpu = np.array([ecdf(x) for ecdf,x in zip(self.ecdf_list, mean_probas_hard)])
return best_probas_soft, mean_probas_soft_cpu, mean_probas_hard_cpu, stego_hard
import numpy as np
from glob import glob
from sklearn.model_selection import GroupKFold
import cv2
from skimage import io
import torch
from torch import nn
import os
from datetime import datetime
import time
import random
import cv2
import pandas as pd
import numpy as np
import albumentations as A
import matplotlib.pyplot as plt
from albumentations.pytorch.transforms import ToTensorV2
from torch.utils.data import Dataset,DataLoader
from torch.utils.data.sampler import SequentialSampler, RandomSampler
import sklearn
from PIL import Image
from os import path,mkdir,makedirs
from tools_stegano import compute_spatial_from_jpeg, compute_proba, HILL
# TRANSFORMS
def get_train_transforms():
return A.Compose([
A.HorizontalFlip(p=0.5),
A.VerticalFlip(p=0.5),
#A.ToGray(always_apply=True, p=1.0),
#A.Resize(height=512, width=512, p=1.0),
ToTensorV2(p=1.0),
], p=1.0)
def get_valid_transforms():
return A.Compose([
#A.Resize(height=512, width=512, p=1.0),
#A.ToGray(always_apply=True, p=1.0),
ToTensorV2(p=1.0),
], p=1.0)
# DATASET CLASS
def onehot(size, target):
vec = torch.zeros(size, dtype=torch.float32)
vec[target] = 1.
return vec
def onehot(size, target):
vec = torch.zeros(size, dtype=torch.float32)
vec[target] = 1.
return vec
class DatasetRetriever(Dataset):
def __init__(self, image_names, params, indexs_db, kinds=None, labels=None, transforms=None, pair_training=False, spatial=False):
super().__init__()
self.kinds = kinds
self.image_names = image_names
self.indexs_db = indexs_db
self.labels = labels
self.transforms = transforms
self.c_quant = np.load(params.folder_model + 'c_quant_'+str(params.QF)+'.npy')
self.WET_COST = 10 ** 13
self.emb_rate = params.emb_rate
self.im_size = params.image_size
self.cover_path = params.data_dir_cover
self.cost_dir = params.cost_dir
self.pair_training=pair_training
self.data_dir_prot = params.data_dir_prot
self.spatial=spatial
if self.spatial:
if(params.H1_filter is None):
self.H1_filter = 4 * np.array([[-0.25, 0.5, -0.25],
[0.5, -1, 0.5],
[-0.25, 0.5, -0.25]])
self.L1_filter = (1.0/9.0)*np.ones((3, 3))
self.L2_filter = (1.0/225.0)*np.ones((15, 15))
else:
self.H1_filter = np.load(params.H1_filter).reshape((3,3))
self.L1_filter = np.load(params.L1_filter).reshape((3,3))
self.L2_filter = np.load(params.L2_filter).reshape((15,15))
def __getitem__(self, index: int):
if self.pair_training:
image_name = self.image_names[index]
if(self.spatial):
cover = np.asarray(Image.open(path.join(self.cover_path, image_name[:-3]+'pgm')),dtype=np.float32)
message_length= cover.size*self.emb_rate
else:
cover = np.load(path.join(self.cover_path, image_name[:-3]+'npy')).astype(np.float32)
nz_AC = np.sum(cover!=0)-np.sum(cover[::8,::8]!=0)
message_length = nz_AC*self.emb_rate
index_db = self.indexs_db[index]
if(self.spatial):
rho = HILL(cover, self.H1_filter, self.L1_filter, self.L2_filter)
else:
try:
cost_dir = self.data_dir_prot + 'data_adv_'+ str(index_db) +'/adv_cost/'
rho = np.load(cost_dir+image_name[:-3]+'npy')
except:
cost_dir = self.cost_dir
rho = np.load(cost_dir+image_name[:-3]+'npy')
if(rho.shape==(self.im_size, self.im_size)):
rho = np.reshape(rho,(1,self.im_size, self.im_size))
rho = np.concatenate((np.copy(rho), np.zeros_like(rho), np.copy(rho)),axis=0)
#if(self.spatial):
# rho[0,cover <= 0] = self.WET_COST
# rho[2,cover >= 255] = self.WET_COST
if not self.spatial:
rho[0,cover < -1023] = self.WET_COST
rho[2,cover > 1023] = self.WET_COST
p = compute_proba(rho, message_length)
u = np.random.uniform(0,1,(3, self.im_size, self.im_size))
stego = (cover + np.argmax(-np.log(-np.log(u))+np.log(p+1e-30),axis=0) - 1).astype(np.float32)
if not self.spatial:
cover = compute_spatial_from_jpeg(cover, self.c_quant)/255
stego = compute_spatial_from_jpeg(stego, self.c_quant)/255
if self.transforms:
# To have the same transformation on cover and stego
seed = np.random.randint(2147483647)
random.seed(seed)
cover = self.transforms(image=cover)['image']
random.seed(seed)
stego = self.transforms(image=stego)['image']
else:
cover = torch.tensor(cover.reshape((1, self.im_size, self.im_size)))
stego = torch.tensor(stego.reshape((1, self.im_size, self.im_size)))
cover = cover[0:1,:,:]
stego = stego[0:1,:,:]
target_cover = onehot(2, 0)
target_stego = onehot(2, 1)
return((cover,stego), (target_cover, target_stego))
else:
kind, image_name, label = self.kinds[index], self.image_names[index], self.labels[index]
if(self.spatial):
cover = np.asarray(Image.open(path.join(self.cover_path, image_name[:-3]+'pgm')),dtype=np.float32)
message_length= cover.size*self.emb_rate
else:
cover = np.load(path.join(self.cover_path, image_name[:-3]+'npy')).astype(np.float32)
nz_AC = np.sum(cover!=0)-np.sum(cover[::8,::8]!=0)
message_length = nz_AC*self.emb_rate
if kind =='Cover':
image = cover
elif kind == 'Stego':
index_db = self.indexs_db[index]
if(self.spatial):
rho = HILL(cover, self.H1_filter, self.L1_filter, self.L2_filter)
else:
try:
cost_dir = self.data_dir_prot + 'data_adv_'+ str(index_db) +'/adv_cost/'
rho = np.load(cost_dir+image_name[:-3]+'npy')
except:
cost_dir = self.cost_dir
rho = np.load(cost_dir+image_name[:-3]+'npy')
if(rho.shape==(self.im_size, self.im_size)):
rho = np.reshape(rho,(1,self.im_size, self.im_size))
rho = np.concatenate((np.copy(rho), np.zeros_like(rho), np.copy(rho)),axis=0)
#if(self.spatial):
# rho[0,cover <= 0] = self.WET_COST
# rho[2,cover >= 255] = self.WET_COST
if not self.spatial:
rho[0,cover < -1023] = self.WET_COST
rho[2,cover > 1023] = self.WET_COST
p = compute_proba(rho, message_length)
u = np.random.uniform(0,1,(3, self.im_size, self.im_size))
stego = cover + np.argmax(-np.log(-np.log(u))+np.log(p+1e-30),axis=0) - 1
image = stego
image = image.astype(np.float32)
if not self.spatial:
image = compute_spatial_from_jpeg(image, self.c_quant)/255.
if self.transforms:
sample = {'image': image}
sample = self.transforms(**sample)
image = sample['image']
else:
image = image.reshape((1, self.im_size, self.im_size))
image = image[0:1,:,:]
target = onehot(2, label)
return image, target
def __len__(self) -> int:
return self.image_names.shape[0]
def get_labels(self):
return list(self.labels)
# LABEL SMOOTHING
class LabelSmoothing(nn.Module):
def __init__(self, smoothing = 0.05):
super(LabelSmoothing, self).__init__()
self.confidence = 1.0 - smoothing
self.smoothing = smoothing
def forward(self, x, target):
if self.training:
x = x.float()
target = target.float()
logprobs = torch.nn.functional.log_softmax(x, dim = -1)
nll_loss = -logprobs * target
nll_loss = nll_loss.sum(-1)
smooth_loss = -logprobs.mean(dim=-1)
loss = self.confidence * nll_loss + self.smoothing * smooth_loss
return loss.mean()
else:
return torch.nn.functional.cross_entropy(x, target)
def load_dataset(params, pair_training=False):
dataset = []
im_list = np.load(params.permutation_files)
folds = np.zeros(params.train_size + params.valid_size + params.test_size,dtype=np.int8)
folds[params.train_size:]+=1
folds[params.train_size+params.valid_size:]+=1
if(params.iteration_step>0):
indexs_db = np.load(params.data_dir_prot +'data_train_'+str(params.iteration_step)+'/index.npy')
else:
indexs_db = np.zeros(params.train_size+params.valid_size + params.test_size, dtype=np.int8)
if pair_training:
for im,fold,ind in zip(im_list, folds, indexs_db):
dataset.append({
'image_name': im,
'fold': fold,
'indexs_db': ind
})
else:
for label, kind in enumerate(['Cover', 'Stego']):
for im,fold,ind in zip(im_list, folds,indexs_db):
if(kind=='Cover'):
index = -1
else:
index=ind
dataset.append({
'kind': kind,
'image_name': im,
'label': label,
'fold': fold,
'indexs_db': index
})
dataset = pd.DataFrame(dataset)
return(dataset)
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from tools_stegano import IDCT8_Net, find_lambda
def logit(y, eps=1e-20):
return -1.0 * torch.log((1.0 - torch.min(y,torch.tensor(1.-eps).double())) / torch.max(y, torch.tensor(eps).double()))
def compact_tanh(u, tau):
return(torch.tanh(logit(u)/tau))
def g_warp(u, l, c, r):
a = (u-l)/(r-l)
a = torch.clamp((u-l)/(r-l),0,1)
b = np.log(0.5)/(torch.log((c-l)/(r-l)))
e = b*torch.log(a)
e = torch.exp(e)
return(e)
# Double Tanh
def double_tanh(u, p, tau):
if(tau>0):
b = -0.5*(torch.tanh((p[0]-u)/tau)+torch.tanh((p[0]+p[1]-u)/tau))
else:
cumsum = torch.cumsum(p,axis=0)
b = torch.full_like(u, torch.tensor(-1).double())
b[u>cumsum[0,None]]+=1
b[u>cumsum[1,None]]+=1
return(b)
# Double Compact Tanh
class double_compact_tanh(torch.autograd.Function):
"""
We can implement our own custom autograd Functions by subclassing
torch.autograd.Function and implementing the forward and backward passes
which operate on Tensors.
"""
@staticmethod
def forward(ctx, u, p, tau):
if tau>0:
with torch.enable_grad():
gamma = (p[0]+1-p[-1])/2
step1 = torch.ones_like(u).double()
step2 = -torch.ones_like(u).double()
p = (p[:,None]).repeat((1,len(u),1,1))
gamma = (gamma[None]).repeat((len(u),1,1))
# Handle 0 probabilities of -1 or +1
# (probability of modification 0 is assumed to be never equal to 0)
bool1 = (u<gamma)&(p[1]<1.)
bool2 = (u>gamma)&(p[1]<1.)
step1[bool1] = compact_tanh(g_warp(u[bool1], 0, p[0,bool1], gamma[bool1]), tau)
step2[bool2] = compact_tanh(g_warp(u[bool2], gamma[bool2], (p[0]+p[1])[bool2], 1.), tau)
b = 0.5*(step1 + step2)
# Save data
ctx.tau=tau
ctx.gamma=gamma
ctx.u = u
ctx.p = p
else:
cumsum = torch.cumsum(p,axis=0)
b = torch.full_like(u, torch.tensor(-1).double())
b[u>cumsum[0,None]]+=1
b[u>cumsum[1,None]]+=1
return(b)
# Compute manually the gradient for "if" cases
@staticmethod
def backward(ctx, grad_output):
# Same computation than in forward, to retrieve the gradient
with torch.enable_grad():
gamma = (ctx.p[0]+1-ctx.p[-1])/2
bool1 = (ctx.u<gamma)&(ctx.p[1]<1.)
bool2 = (ctx.u>gamma)&(ctx.p[1]<1.)
p1 = ctx.p[:,bool1]
p2 = ctx.p[:,bool2]
gam1 = ctx.gamma[bool1]
gam2 = ctx.gamma[bool2]
u1 = ctx.u[bool1]
u2 = ctx.u[bool2]
step1 = compact_tanh(g_warp(u1, 0, p1[0], gam1), ctx.tau)
step2 = compact_tanh(g_warp(u2, gam2, p2[0]+p2[1], 1.), ctx.tau)
gr = torch.zeros_like(ctx.p)
gr1 = 0.5*torch.autograd.grad(step1,p1,grad_outputs=grad_output[bool1],retain_graph=True)[0]
gr2 = 0.5*torch.autograd.grad(step2,p2,grad_outputs=grad_output[bool2],retain_graph=True)[0]
gr[:,bool1]=gr1
gr[:,bool2]=gr2
gr = gr[:,0]
# Avoid NaNs
gr[torch.isnan(gr)]=0
return(None, gr, None)
import os
import numpy as np
from scipy.fftpack import dct, idct
import sys
import torch
from functools import partial
import argparse
from data_loader import compute_spatial_from_jpeg
from efficientnet import get_net
def softmax(array):
exp = np.exp(array-np.max(array,axis=1, keepdims=True))
return(exp/np.sum(exp, axis=1, keepdims=True))
class cover_stego_loader(object):
def __init__(self, params, iteration, mode): # mode = stego or cover
self.params = params
n_images = params.train_size + params.valid_size + params.test_size
self.files = np.load(params.folder_model + 'permutation_files.npy')[:n_images]
self.train_counter = 0
self.train_data_size = len(self.files)
self.train_num_batches = int(np.ceil(1.0 * self.train_data_size / params.batch_size_eval))
self.iteration_step = iteration
self.mode = mode
self.c_quant = np.load(params.folder_model + 'c_quant_'+str(params.QF)+'.npy')
def next_batch(self):
borne_sup = min(self.train_counter + self.params.batch_size_eval, len(self.files))
n_images = borne_sup-self.train_counter
next_batch_X = np.zeros((n_images,self.params.image_size,self.params.image_size),dtype=np.float32)
for i,file in enumerate(self.files[self.train_counter:borne_sup]):
if(self.mode=='stego'):
if(self.iteration_step>0):
try:
image = np.load(self.params.data_dir_prot+'data_adv_'+str(self.iteration_step)+'/adv_final/'+file[:-4]+'.npy')
except:
image = np.load(self.params.data_dir_stego_0 + file[:-4]+'.npy')
else:
image = np.load(self.params.data_dir_stego_0 + file[:-4]+'.npy')
elif(self.mode=='cover'):
image = np.load(self.params.data_dir_cover + file[:-4] + '.npy')
spat_image = compute_spatial_from_jpeg(image, self.c_quant)
next_batch_X[i,:,:]=spat_image
next_batch_X = np.reshape(next_batch_X,(next_batch_X.shape[0], 1, next_batch_X.shape[1],next_batch_X.shape[2]))
self.train_counter = (self.train_counter + self.params.batch_size_eval) % self.train_data_size
return(next_batch_X, self.files[self.train_counter:borne_sup])
def reset_counter(self):
self.train_counter = 0
def evaluate_step_i(params, iteration_f, iteration_adv): # if iteration_adv == -1 : cover
net = get_net().cuda()
path = params.save_path + "last-checkpoint.bin"
checkpoint = torch.load(path)
net.load_state_dict(checkpoint['model_state_dict'])
# Create directory
if(iteration_adv==-1):
directory = params.data_dir_prot+'cover/eval_f'+str(iteration_f)+'/'
dataloader = cover_stego_loader(params,iteration_adv,'cover')
else:
directory = params.data_dir_prot+'data_adv_'+str(iteration_adv)+'/eval_f'+str(iteration_f)+'/'
dataloader = cover_stego_loader(params,iteration_adv,'stego')
dataloader = cover_stego_loader(params, iteration_adv, 'stego')
result_fi = np.empty((0,2))
dataloader.reset_counter()
for batch in range(dataloader.train_num_batches):
batch_x, images_path = dataloader.next_batch()
with torch.no_grad():
l = net.forward(torch.tensor(batch_x).cuda())
result_fi = np.concatenate((result_fi,l.cpu().detach().numpy()))
np.save(directory+'probas',softmax(result_fi)[:,1])
np.save(directory+'logits',result_fi)
return(result_fi, softmax(result_fi)[:,1])
import numpy as np
import argparse
import sys
from shutil import copy
import os
import glob
from time import time
from statsmodels.distributions.empirical_distribution import ECDF
def generate_train_db(params, iteration_step, strategy):
files = np.load(params.folder_model + 'permutation_files.npy')
if strategy=='minmax':
# Probas = Probas(image==stego==1)
probas = np.zeros((iteration_step+1,iteration_step*len(params.models),len(files))) # nb_data_bases * nb_classifs * n_images
# lignes * colonnes * profondeur
#CALIBRATION
ecdf_list=[]
for i in range(iteration_step):
for model in params.models:
ecdf = ECDF(np.load(params.data_dir_prot+'cover/eval_'+model+'_'+str(i)+'/probas.npy'))
probas[0,i]=ecdf(np.load(params.data_dir_prot+'data_adv_0/eval_'+model+'_'+str(i)+'/probas.npy'))
ecdf_list.append(ecdf)
for j in range(1,iteration_step+1):
for i in range(iteration_step):
for k,model in enumerate(params.models):
ecdf = ecdf_list[i*len(params.models)+k]
probas[j,i*len(params.models)+k]=ecdf(np.load(params.data_dir_prot+'data_adv_'+str(j)+'/eval_'+model+'_'+str(i)+'/probas.npy'))
index = np.argmin(np.max(probas,axis=1),axis=0)
elif strategy=='random':
index = np.random.randint(iteration_step+1, size=len(files))
elif strategy=='lastit':
index = np.zeros(len(files),dtype=int)+iteration_step
if not os.path.exists(params.data_dir_prot+'data_train_'+str(iteration_step)+'/'):
os.makedirs(params.data_dir_prot+'data_train_'+str(iteration_step)+'/')
np.save(params.data_dir_prot+'data_train_'+str(iteration_step)+'/index.npy',index)
return(index)
#!/bin/bash
module load pytorch-gpu/py3/1.7.1
python main.py --label='70' \
--begin_step=1 \
--number_steps=1 \
--folder_model= './models/' \
--data_dir_prot='./experiment/' \
--data_dir_cover= \
--data_dir_stego_0= \
--cost_dir= \
--strategy='minmax' \
--image_size=512 \
--QF=75 \
--emb_rate=0.4 \
--model='xunet,srnet,efnet' \
--version_eff='b0' \
--stride=1 \
--batch_size_classif_ef=16 \
--batch_size_eval_ef=20 \
--epoch_num_ef=30 \
--CL_ef='yes' \
--start_emb_rate_ef=1.2 \
--pair_training_ef='no' \
--batch_size_classif_xu=30 \
--batch_size_eval_xu=30 \
--epoch_num_xu=30 \
--CL_xu='yes' \
--start_emb_rate_xu=1.2 \
--pair_training_xu='yes' \
--batch_size_classif_sr=16 \
--batch_size_eval_sr=30 \
--epoch_num_sr=40 \
--CL_sr='yes' \
--start_emb_rate_sr=1.4 \
--pair_training_sr='yes' \
--n_iter_max_backpack=200 \
--tau_0=5. \
--precision=0.01 \
--N_samples=2 \
--attack='SGE' \
--attack_last='no' \
--lr=0.01
This diff is collapsed.
File added
File added
File added
File added
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment