import io
import shutil
import os
import zipfile
import xml.etree.ElementTree as ET
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
from functools import partial
import base64
import zlib
OUTPUT_METADATA_XML_FILENAME = 'metadata.xml'
OUTPUT_SUFFIX_COMPRESSED = '.deflate'
OUTPUT_SUFFIX_ENCRYPTED = '.enc'
INPUT_JOB_FILENAME =''
INPUT_PASSPHRASE_FILENAME ='passphrase_base64.txt'
OUTPUT_FOLDER_NAME='\output'
def unpack_job(self):
create_or_clear_output_folder(OUTPUT_FOLDER_NAME)
contentXmlFilename = unzip_job_container_content_xml_file(self,INPUT_JOB_FILENAME)
encryptedSessionKey, metadataEncryptionIV, metadataXmlFilenameInZip = read_content_xml_file(self, contentXmlFilename)
retrieve_BinaryFile_file_info(self, contentXmlFilename,INPUT_JOB_FILENAME)
sessionKey = decrypt_session_key(self,encryptedSessionKey,INPUT_PASSPHRASE_FILENAME)
encryptedMetadataXmlFilename = unzip_job_container_metadata_xml_file(self,INPUT_JOB_FILENAME,metadataXmlFilenameInZip)
compressedMetadataXmlFile = decrypt_metadata_xml(encryptedMetadataXmlFilename,sessionKey, metadataEncryptionIV)
metadataXmlFilename = uncompress_metadata_xml(compressedMetadataXmlFile)
move_output_file()
self.RichText.append("Done. Metadata xml unpacked at following location:chengl ")
self.RichText.append(" \""+ metadataXmlFilename+"\"")
def create_or_clear_output_folder(folder):
#clear and create output folder
if os.path.exists(folder):
shutil.rmtree(folder)
os.makedirs(folder)
def unzip_job_container_content_xml_file(self,inputJobFileName):
zip = zipfile.ZipFile(inputJobFileName, 'r')
zip.extract('Content.xml',OUTPUT_FOLDER_NAME)
zip.close()
self.RichText.append("Decompression Content.xml in the path :"+ OUTPUT_FOLDER_NAME+'\r\n' )
return os.path.join(OUTPUT_FOLDER_NAME,'Content.xml')
def read_content_xml_file(self,contentXmlFilename):
ns = {'cc' : 'http://schemas.materialise.com/AM/MatJob/Content'}
contentXmlRoot = ET.parse(contentXmlFilename).getroot()
metadataXmlFilenameInZip, encryptionStrategyRef=retrieve_metadata_file_info(self,contentXmlRoot,ns)
encryptedSessionKey = retrieve_encrypted_session_key_from_content_xml_tree(self,contentXmlRoot,ns,encryptionStrategyRef)
encryptionIV = retrieve_metadata_encryption_iv(self,contentXmlRoot,ns,encryptionStrategyRef)
return encryptedSessionKey, encryptionIV, metadataXmlFilenameInZip
def retrieve_BinaryFile_file_info(self,contentXmlFilename,INPUT_JOB_FILENAME):
# get element MetadataFile at following XPath location:
# ContainerContent/ContainerFiles/MetadataFile/@FileName
ns = {'cc': 'http://schemas.materialise.com/AM/MatJob/Content'}
contentXmlRoot = ET.parse(contentXmlFilename).getroot()
metadataFileElement = contentXmlRoot.findall('cc:ContainerFiles/cc:BinaryFile',ns)
# get filename of metadatafile within job container (zip) from attribute @FileName
for i in metadataFileElement:
self.RichText.append ('BinaryFile:\r\n ' + i.get('FileName') + ' IsOutsideContainer:' + i.get('IsOutsideContainer') + '\r\n')
IsOutsideContainer =i.get('IsOutsideContainer')
if IsOutsideContainer == 'false':
zip = zipfile.ZipFile(INPUT_JOB_FILENAME, 'r')
zip.extract(i.get('FileName'), OUTPUT_FOLDER_NAME)
zip.close()
self.RichText.append('Decompression BinaryFile:\r\n ' + i.get('FileName') + ' in the path :OUTPUT_FOLDER_NAME' + '\r\n')
def retrieve_metadata_file_info(self,contentXmlRoot,ns):
# get element MetadataFile at following XPath location:
# ContainerContent/ContainerFiles/MetadataFile/@FileName
metadataFileElement = contentXmlRoot.find('cc:ContainerFiles/cc:MetadataFile',ns)
# get filename of metadatafile within job container (zip) from attribute @FileName
metadataXmlFilenameInZip = metadataFileElement.get('FileName')
self.RichText.append ('metadataXmlFilenameInZip:\r\n "' + metadataXmlFilenameInZip + '"\r\n')
encryptionStrategyRef = metadataFileElement.get('EncryptionStrategyRef')
self.RichText.append ('encryptionStrategyRef:\r\n "' + encryptionStrategyRef + '"\r\n')
return metadataXmlFilenameInZip, encryptionStrategyRef
def retrieve_encrypted_session_key_from_content_xml_tree(self,contentXmlRoot,ns,encryptionStrategyRef):
# get encrypted session key element from following xpath location:
# ContainerContent/EncryptionStrategy[@sessionKey=<encryptionStrategyRef>]
xpathEncryptionStrategy = 'cc:EncryptionStrategies/cc:EncryptionStrategy[@Id=\'' + encryptionStrategyRef + '\']'
if (contentXmlRoot.find(xpathEncryptionStrategy,ns).get('Method')!="AES-256_SessionKey"):
raise Exception("This script will only handle files where the metadata.xml is encrypted with method 'AES-256_SessionKey'")
xpathEncryptedSessionKey = xpathEncryptionStrategy + '/cc:SessionKey/cc:EncryptedContent[@EncryptionScheme="PreSharedPassPhrase"]'
encryptedSessionKeyElement = contentXmlRoot.find(xpathEncryptedSessionKey,ns)
# retrieve initialization vector and cipher text of session key and store as dictionary
encryptedSessionKeyIV = encryptedSessionKeyElement.find('cc:InitializationVector',ns).text
encryptedSessionKeyCiphertextAndTag = encryptedSessionKeyElement.find('cc:CipherText',ns).text
self.RichText.append ('encryptedSessionKeyIV (base64):\r\n "' + encryptedSessionKeyIV + '"\r\n')
self.RichText.append ('encryptedSessionKeyCiphertextAndTag (base64):\r\n "' + encryptedSessionKeyCiphertextAndTag + '"\r\n')
encryptedSessionKey = {"IV":encryptedSessionKeyIV,"CiphertextAndTag":encryptedSessionKeyCiphertextAndTag}
return encryptedSessionKey
def retrieve_metadata_encryption_iv(self,contentXmlRoot,ns,encryptionStrategyRef):
xpathEncryptionIV = 'cc:EncryptionStrategies/cc:EncryptionStrategy[@Id=\'' + encryptionStrategyRef + '\']/cc:InitializationVector'
encryptionIV = contentXmlRoot.find(xpathEncryptionIV,ns).text
self.RichText.append ('encryptionIV (base64):\r\n "' + encryptionIV + '"\r\n')
return encryptionIV
def decrypt_session_key(self,encryptedSessionKey,passphraseFilename):
ciphertextAndTag=base64.b64decode(encryptedSessionKey["CiphertextAndTag"])
ciphertext = ciphertextAndTag[:-16] # everything except last 16 bytes is cipher
tag = ciphertextAndTag[-16:] # last 16 bytes is tag
iv=base64.b64decode(encryptedSessionKey["IV"])
passphrase=base64.b64decode(read_file_as_text(passphraseFilename))
decryptedSessionKey = aes_decrypt(ciphertext,passphrase,iv,tag)
self.RichText.append ('decryptedSessionKey (base64):\r\n "' + base64.b64encode(decryptedSessionKey).decode() + '"\r\n')
return decryptedSessionKey
def unzip_job_container_metadata_xml_file(self,inputJobFilename,metadataXmlFilenameInZip):
zip = zipfile.ZipFile(inputJobFilename, 'r')
zip.extract(metadataXmlFilenameInZip,OUTPUT_FOLDER_NAME)
zip.close()
extractedTemporaryFileName = os.path.join(OUTPUT_FOLDER_NAME,metadataXmlFilenameInZip)
encryptedMetadataXmlFilename = os.path.join(OUTPUT_FOLDER_NAME,OUTPUT_METADATA_XML_FILENAME + OUTPUT_SUFFIX_COMPRESSED + OUTPUT_SUFFIX_ENCRYPTED)
os.rename (extractedTemporaryFileName,encryptedMetadataXmlFilename)
self.RichText.append ('encryptedMetadataXmlFilename:\r\n "' + encryptedMetadataXmlFilename + '"\r\n')
self.RichText.append("Decompression metadataXml in the path :" + OUTPUT_FOLDER_NAME + '\r\n')
return encryptedMetadataXmlFilename
def decrypt_metadata_xml(encryptedMetadataXmlFilename,sessionKey,ivBase64):
iv=base64.b64decode(ivBase64)
tag = get_and_remove_tag_from_file(encryptedMetadataXmlFilename)
decryptedMetadataXmlFilename = os.path.join(OUTPUT_FOLDER_NAME,OUTPUT_METADATA_XML_FILENAME + OUTPUT_SUFFIX_COMPRESSED)
with io.open(encryptedMetadataXmlFilename,'rb') as streamIn,\
io.open(decryptedMetadataXmlFilename,'w+b') as streamOut:
aes_decrypt_stream(streamIn,streamOut,sessionKey,iv,tag)
return decryptedMetadataXmlFilename
def aes_decrypt(ciphertext, key, iv, tag):
backend = default_backend()
cipher = Cipher(algorithms.AES(key), modes.GCM(iv, tag), backend=backend)
decryptor = cipher.decryptor()
cleartext = decryptor.update(ciphertext) + decryptor.finalize()
return cleartext
def aes_decrypt_stream(ciphertextStream, cleartextStream, key, iv, tag):
bufferSize = 1024
backend = default_backend()
cipher = Cipher(algorithms.AES(key), modes.GCM(iv, tag), backend=backend)
decryptor = cipher.decryptor()
for chunk in iter(partial(ciphertextStream.read,bufferSize),b''):
cleartextStream.write(decryptor.update(chunk))
cleartextStream.write(decryptor.finalize())
def get_and_remove_tag_from_file(filename):
with open(filename, 'r+b') as fileStream:
fileStream.seek(-16, os.SEEK_END) # move position to 16 bytes before the end of the file
tag = fileStream.peek(16) # retrieve the tag by peeking the last 16 bytes (without advancing the position)
fileStream.truncate() # remove the tag by truncating the last 16 bytes
return tag
def uncompress_metadata_xml(compressedMetadataXmlFile):
bufferSize = 1024
metadataXmlFilename = os.path.join(OUTPUT_FOLDER_NAME,OUTPUT_METADATA_XML_FILENAME)
with io.open(compressedMetadataXmlFile,'rb') as streamIn,\
io.open(metadataXmlFilename,'w+b') as streamOut:
compressor=zlib.decompressobj(-zlib.MAX_WBITS) #deflate
for chunk in iter(partial(streamIn.read,bufferSize),b''):
streamOut.write(compressor.decompress(chunk))
streamOut.write(compressor.flush())
return metadataXmlFilename
def read_file_as_text(filename):
with open(filename, 'r') as content_file:
content = content_file.read()
return content;
def move_output_file():
MetadataXmlFilename = os.path.join(os.path.dirname(INPUT_JOB_FILENAME),OUTPUT_METADATA_XML_FILENAME)
FromoMetadataXmlFilename = OUTPUT_FOLDER_NAME+'/'+OUTPUT_METADATA_XML_FILENAME
if os.path.exists(MetadataXmlFilename):
os.remove(MetadataXmlFilename)
else:
shutil.copyfile(FromoMetadataXmlFilename,MetadataXmlFilename)
可以通过py调用c#的包来搞这个
c# BouncyCastle.Crypto.dll库能够实现