基于离散余弦变换DCT实现彩色数字水印嵌入与提取(python)
内容介绍
离散余弦变换DCT
离散余弦变换(Discrete Cosine Transform,DCT)是一种常用于信号处理和图像压缩的数学算法。DCT将信号或图像数据从空间域转换到频率域。
DCT的计算涉及将N个数据点通过以下公式转换:
x[n] 是原始信号,X[k] 是变换后的系数,N是数据点的数量,k是频率变量。
思路概述
水印嵌入过程
-
初始化与参数设置
在DCT_Embed
类的构造函数__init__
中,获取待嵌入水印的图像和水印图像的尺寸信息,进行尺寸合法性校验,确保水印图像尺寸不大于背景图像尺寸按设定块大小划分后的尺寸。同时初始化一些关键参数,如分块大小block_size
(默认值为 8)、嵌入强度系数alpha
,以及用于嵌入时区分水印值(0 或 1)的两个随机向量k1
和k2
。注意根据分块大小block_size的值,水印图片的尺寸为
待嵌入水印的图像的1/block_size。
-
图像分块与 DCT 变换
将输入的背景图像按照设定的块大小(如 8×8)进行分块。对于划分好的每个块,使用cv2.dct
函数进行离散余弦变换,将图像从空间域转换到频域,得到对应块的 DCT 系数矩阵,最终形成一个包含所有分块 DCT 系数的多维数组,用于后续嵌入水印操作。 -
水印嵌入
确保输入的水印图像是经过二值归一化处理的(像素值仅为 0 或 1)。遍历水印图像的每个像素位置(对应到背景图像分块的位置),根据当前水印像素值是 0 还是 1,选择对应的随机向量k1
或k2
,将水印信息嵌入到对应块的 DCT 系数中。具体是修改每个块的最后一列(特定频率分量)的 DCT 系数,通过加上与嵌入强度系数alpha
、所选随机向量相关的值来实现水印嵌入。 -
逆 DCT 变换与图像合成
对嵌入水印后的 DCT 系数矩阵进行逆离散余弦变换(cv2.idct
),将频域数据转换回空间域。依次对每个块进行逆变换后,再将这些块按照原来的位置拼接起来,得到嵌入水印后的单通道图像,对于彩色图像则是分别处理每个通道后再合并通道,最终得到嵌入水印后的彩色图像。
水印提取过程
-
图像分块与 DCT 变换
对嵌入水印后的合成图像,按照与嵌入时相同的块大小,使用dct_blkproc
方法进行分块并进行 DCT 变换,获取各块的 DCT 系数矩阵,以便从中提取水印信息。 -
水印提取
遍历与原始水印图像尺寸对应的各块位置,提取每个块最后一列(嵌入水印时所使用的频率分量)的 DCT 系数组成向量p
。通过计算向量p
与嵌入时使用的两个随机向量k1
和k2
的相关性(利用corr2
函数计算相关系数),根据相关性大小来判断当前块位置对应的水印值是 0 还是 1。相关性更大的一方对应的水印值(1 对应k1
,0 对应k2
)作为提取出的水印像素值,遍历完所有对应位置后,得到完整的提取水印图像,对于彩色图像的各通道提取结果再进行合并等处理。
代码实现
import cv2
import numpy as np
import matplotlib.pyplot as plt
class DCT_Embed(object):
def __init__(self, background, watermark, block_size=8, alpha=30):
b_h, b_w = background.shape[:2]
w_h, w_w = watermark.shape[:2] # Adjust to handle 2D watermark
assert w_h <= b_h / block_size and w_w <= b_w / block_size, \
"\r\n请确保您的的水印图像尺寸 不大于 背景图像尺寸的1/{:}\r\nbackground尺寸{:}\r\nwatermark尺寸{:}".format(
block_size, background.shape, watermark.shape
)
self.block_size = block_size
self.alpha = alpha
self.k1 = np.random.randn(block_size)
self.k2 = np.random.randn(block_size)
def dct_blkproc(self, background):
background_dct_blocks_h = background.shape[0] // self.block_size
background_dct_blocks_w = background.shape[1] // self.block_size
background_dct_blocks = np.zeros(shape=(
(background_dct_blocks_h, background_dct_blocks_w, self.block_size, self.block_size)
))
h_data = np.vsplit(background, background_dct_blocks_h)
for h in range(background_dct_blocks_h):
block_data = np.hsplit(h_data[h], background_dct_blocks_w)
for w in range(background_dct_blocks_w):
a_block = block_data[w]
background_dct_blocks[h, w, ...] = cv2.dct(a_block.astype(np.float64))
return background_dct_blocks
def dct_embed(self, dct_data, watermark):
temp = watermark.flatten()
assert temp.max() == 1 and temp.min() == 0, "为方便处理,请保证输入的watermark是被二值归一化的"
result = dct_data.copy()
for h in range(watermark.shape[0]):
for w in range(watermark.shape[1]):
k = self.k1 if watermark[h, w] == 1 else self.k2
for i in range(self.block_size):
result[h, w, i, self.block_size - 1] = dct_data[h, w, i, self.block_size - 1] + self.alpha * k[i]
return result
def idct_embed(self, dct_data):
row = None
result = None
h, w = dct_data.shape[0], dct_data.shape[1]
for i in range(h):
for j in range(w):
block = cv2.idct(dct_data[i, j, ...])
row = block if j == 0 else np.hstack((row, block))
result = row if i == 0 else np.vstack((result, row))
return result.astype(np.uint8)
def dct_extract(self, synthesis, watermark_size):
w_h, w_w = watermark_size
recover_watermark = np.zeros(shape=watermark_size)
synthesis_dct_blocks = self.dct_blkproc(background=synthesis)
p = np.zeros(8)
for h in range(w_h):
for w in range(w_w):
for k in range(self.block_size):
p[k] = synthesis_dct_blocks[h, w, k, self.block_size - 1]
if corr2(p, self.k1) > corr2(p, self.k2):
recover_watermark[h, w] = 1
else:
recover_watermark[h, w] = 0
return recover_watermark
def mean2(x):
y = np.sum(x) / np.size(x)
return y
def corr2(a, b):
a = a - mean2(a)
b = b - mean2(b)
r = (a * b).sum() / np.sqrt((a * a).sum() * (b * b).sum())
return r
if __name__ == '__main__':
alpha = 10
blocksize = 8
watermark = cv2.imread(r"watermark.bmp")
watermark = cv2.cvtColor(watermark, cv2.COLOR_BGR2RGB)
watermark_bin = np.where(watermark < np.mean(watermark, axis=(0, 1)), 0, 1)
background = cv2.imread(r"image.bmp")
background = cv2.cvtColor(background, cv2.COLOR_BGR2RGB)
background_backup = background.copy()
yuv_background = cv2.cvtColor(background, cv2.COLOR_RGB2YUV)
Y, U, V = yuv_background[..., 0], yuv_background[..., 1], yuv_background[..., 2]
channels = cv2.split(background)
embed_synthesis = []
extract_watermarks = []
for i in range(3):
dct_emb = DCT_Embed(background=channels[i], watermark=watermark_bin[..., i], block_size=blocksize, alpha=alpha)
background_dct_blocks = dct_emb.dct_blkproc(background=channels[i])
embed_watermark_blocks = dct_emb.dct_embed(dct_data=background_dct_blocks, watermark=watermark_bin[..., i])
synthesis = dct_emb.idct_embed(dct_data=embed_watermark_blocks)
embed_synthesis.append(synthesis)
extract_watermarks.append(dct_emb.dct_extract(synthesis=synthesis, watermark_size=watermark_bin[..., i].shape) * 255)
rbg_synthesis = cv2.merge(embed_synthesis)
extract_watermark = cv2.merge([ew.astype(np.uint8) for ew in extract_watermarks])
images = [background_backup, watermark, rbg_synthesis, extract_watermark]
titles = ["image", "watermark", "systhesis", "extract"]
for i in range(4):
plt.subplot(2, 2, i + 1)
if i == 1 or i == 3:
plt.imshow(images[i])
else:
plt.imshow(images[i])
plt.title(titles[i])
plt.axis("off")
plt.show()
实现截图
参考资料
七月的和弦. DCT水印嵌入与提取_(Python Version)[EB/OL]. (2022-05-29)[2024-12-25]. https://blog.csdn.net/qq_44009107/article/details/125036240?ops_request_misc=&request_id=&biz_id=102&utm_term=dct%E5%B5%8C%E5%85%A5%E6%B0%B4%E5%8D%B0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~sobaiduweb~default-4-125036240.142^v100^pc_search_result_base2&spm=1018.2226.3001.4187.
文章参考链接:DCT水印嵌入与提取_(Python Version)_dct嵌入韩束-CSDN博客
作者:伊南风