#!/usr/bin/env python
#"""
# A script to make sure all CFM Files have the CFQueryParam validation tags
#
# Copyright (C) 2008-09 Pranav Prakash pranav@myblive.com
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see http://www.gnu.org/licenses/
#
#"""
__author__ = 'Pranav Prakash'
__author_email__ = 'pranny@gmail.com'
import sgmllib, re, string
import sys
#"""
# The CFMLParser class, is used to parse the CFM/ CFC files so as to extract all the CFQuery tags.
# These tags contain the SQL queries which are vulnurable to be attacked.
#"""
class CFMLParser(sgmllib.SGMLParser):
def __init__(self):
sgmllib.SGMLParser.__init__(self)
self.inside_cfquery = False
self.inside_comment = False
self.inside_logic = False
self.SQL_queries = []
self.query_names = []
self.newSQL_queries = []
self.temp = ''
def start_cfquery(self, attributes):
self.inside_cfquery = True
for k,v in attributes:
if k == 'name':
self.query_names.append(v)
if k == 'sql':
self.query_names.pop()
self.end_cfquery()
def end_cfquery(self):
if self.temp != '':
self.SQL_queries.append(self.temp)
self.temp = ''
self.inside_cfquery = False
def start_cfqueryparam(self, attributes):
self.temp += self.get_starttag_text()
#print self.temp
def end_cfqueryparam(self):
pass
def start_cfif(self, attributes):
if self.inside_cfquery:
self.temp += self.get_starttag_text()
self.inside_logic = True
else:
self.end_cfif()
def end_cfif(self):
if self.inside_cfquery:
self.temp += '</cfif>'
self.inside_logic = False
else:
pass
def start_cfelse(self, attributes):
if self.inside_logic:
self.temp += self.get_starttag_text()
def start_cfelseif(self, attributes):
if self.inside_logic:
self.temp += self.get_starttag_text()
def handle_data(self, data):
if self.inside_logic or self.inside_cfquery and len(data.lstrip()) > 0:
self.temp += data
def handle_comment(self, comment):
if self.inside_cfquery:
self.temp += '<!--'+comment+'-->'
def report_ubalanced(self, tag):
if tag == 'cfqueryparam':
end_cfqueryparam()
if tag == 'cfelse':
end_cfelse()
def get_queries(self):
return self.SQL_queries
def get_oldqueries(self):
return self.SQL_queries
def get_newqueries(self):
return self.newSQL_queries
def ScanQueries(self):
for query in self.SQL_queries:
self.ScanSingleQuery(query)
def ScanSingleQuery(self, query):
TrainMan = SQLValidator(query)
TrainMan.Validate_SQL()
self.newSQL_queries.append(TrainMan.get_ValidatedSQL())
class SQLValidator():
def __init__(self, sql):
self.raw_sql = sql
self.refinedSQL = ''
# Mentioned below are the four regular expressions that cover 'most' of the DML syntax parameters
# Most because i am not very strong with SQL :-)
# Before using CFUnvalidatedGroupingRE, we need to make sure that a IN or VALUES has been used, else
# it goes on for stored procedures too, and that is pretty evil
self.variableRE = "'?#\w+\.?\w+\(?\w*\)?#'?(?![_|\w])"
self.CFUnvalidatedAssignmentRE = "[\s*|\,]\w+\.?\w+\s=\s'?#\w+\.?\w+\(?\w*\)?#(?![_|\w])'?"
self.CFUnvalidatedGroupingRE = "[\(|\,]\s*'?#\w+\.?\w+\(?\w*\)?#'?(?![_|\w])"
self.CFUnvalidatedInGroupingRE = re.compile("\sIN\s+\(\s*'?#\w+\.?\w+\(?\w*\)?#'?(?![_|\w])", re.IGNORECASE)
self.TypeMap = dict({'vch': 'CF_SQL_VARCHAR',
'bit': 'CF_SQL_BIT',
'time': 'CF_SQL_TIME',
'int': 'CF_SQL_INTEGER',
'now': 'CF_SQL_TIMESTAMP',
'dat': 'CF_SQL_TIMESTAMP',
})
def Validate_SQL(self):
'picks up raw SQL and creates refined SQL'
self.refinedSQL = re.sub(self.CFUnvalidatedAssignmentRE, self.handleUnvalidatedAssignment, self.raw_sql)
self.refinedSQL = re.sub(self.CFUnvalidatedInGroupingRE, self.handleCFUnvalidatedIn, self.refinedSQL)
self.refinedSQL = re.sub(self.CFUnvalidatedGroupingRE, self.handleUnvalidatedGrouping, self.refinedSQL)
def handleUnvalidatedAssignment(self, s):
tag = s.group(0)
tokenPattern = re.compile("#\w+\.?\w+\(?\w*\)?#")
LHSpattern = re.compile("[\s*|\,]\w+\.?\w+\s=\s")
varcharPattern = re.compile("'#\w+\.?\w+\(?\w*\)?#'")
token = re.findall(tokenPattern, tag)[0]
predecessor = re.findall(LHSpattern, tag)[0]
if len(re.findall(varcharPattern, tag)) != 0:
newToken = self.handleIndividualTokens(token, predecessor, 'CF_SQL_VARCHAR')
else:
newToken = self.handleIndividualTokens(token, predecessor)
finalValue = predecessor + newToken
return finalValue
def handleCFUnvalidatedIn(self, s):
tag = s.group(0)
tokenPattern = re.compile("#\w+\.?\w+\(?\w*\)?#")
lhsPattern = re.compile("\sIN\s+\(\s*", re.IGNORECASE)
varcharPattern = re.compile("'#\w+\.?\w+\(?\w*\)?#'")
token = re.findall(tokenPattern, tag)[0]
predecessor = re.findall(lhsPattern, tag)[0]
if len(re.findall(varcharPattern, tag)) != 0:
newToken = '<cfqueryparam value = "'+ token +'" cfsqltype="CF_SQL_VARCHAR" list="YES" separator="," />'
else:
newToken = '<cfqueryparam value = "'+ token +'" cfsqltype="'+self.findDataType(token, predecessor)+'" list="YES" separator="," />'
finalValue = predecessor + newToken
return finalValue
def handleUnvalidatedGrouping(self, s):
tag = s.group(0)
tokenPattern = re.compile("#\w+\.?\w+\(?\w*\)?#")
lhsPattern = re.compile("[\(|\,]\s*")
varcharPattern = re.compile("'#\w+\.?\w+\(?\w*\)?#'")
token = re.findall(tokenPattern, tag)[0]
predecessor = re.findall(lhsPattern, tag)[0]
if re.findall('IN|in|VALUES|values', self.refinedSQL) == []:
return tag
else:
if re.findall(varcharPattern, tag) != []:
newToken = self.handleIndividualTokens(token, predecessor, 'CF_SQL_VARCHAR')
else:
newToken = self.handleIndividualTokens(token, predecessor)
finalValue = predecessor + newToken
return finalValue
def get_ValidatedSQL(self):
return self.refinedSQL
def handleIndividualTokens(self, token, predecessor, hint=''):
if hint == '':
return '<cfqueryparam value = "'+ token +'" cfsqltype="'+self.findDataType(token, predecessor)+'" />'
else:
return '<cfqueryparam value = "'+ token +'" cfsqltype="'+ hint +'" />'
def findDataType(self, rvalue, lvalue='', hint=''):
lvalue = rvalue.split('.')[0]
try:
rvalue = rvalue.split('.')[1]
except:
rvalue = None
if string.find(lvalue.lower(), '#now()#') != -1:
return 'CF_SQL_TIMESTAMP'
if rvalue is not None:
for k in self.TypeMap.keys():
if rvalue.lower().startswith(k):
return self.TypeMap[k]
else:
for k in self.TypeMap.keys():
if lvalue.lower().startswith(k):
return self.TypeMap[k]
return 'CF_SQL_INTEGER'
def ScanAndReplace(o, n, fullText):
"""
This function replaces all the old sql queries in a document with the new sql
o = list of old queries
n = list of new queries
fullText = full text of the file, where in the operaration is supposed to be performed
"""
#fullText = fullText.decode('string_escape')
if len(o) == len(n):
for i in range(0,len(o)):
fullText = fullText.replace( o[i], n[i])
#if fullText.find(o[i]) == -1:
# print o[i]
# print '----------------'
# print n[i]
return fullText
def check_a_file(infilename):
f = open(infilename)
text = f.read()
f.close()
myCFMLParser = CFMLParser()
myCFMLParser.feed(text)
myCFMLParser.close()
myCFMLParser.ScanQueries()
o = myCFMLParser.get_oldqueries()
n = myCFMLParser.get_newqueries()
text = ScanAndReplace(o, n, text)
f = open('o.cfm', 'w+')
for oi in o:
f.write(str(oi))
f.close()
f = open('n.cfm', 'w+')
f.write(text)
f.close()
if __name__ == '__main__':
check_a_file('LMSCoursecreator.cfm')
Hope you all find it useful.
No comments:
Post a Comment