"""httpcache
A caching http interface that supports ETags and gzip
to conserve bandwidth. All of the headers and the content
at that URL are cached for quick retrieval later.
ETags are supported for doing a conditional GET, which
only retrieves the content when it has really changed.
Direction for use:
cache = HTTPCache('http://127.0.0.1/testContent.xml')
content = cache.content()
The 'content' is the content retrieved from the URL.
info = cache.info()
The 'info' returned is an rfc822 Message object for
reading rfc822 headers. Note that this object
is only useful for reading only. If you want to add
new headers and have them updated in the cache you
will need to call 'add_headers()'
cache.add_headers({"md5" : "03030eeeef33, 2034023feef, 23493208903"})
Requires Python 2.2 or later
"""
__author__ = "Joe Gregorio (joe@bitworking.org)"
__copyright__ = "Copyright 2004, Joe Gregorio"
__contributors__ = ["Kendall Clark", "Beat Bolli"]
__version__ = "1.0.2 $Rev: 33 $"
__license__ = "MIT"
__history__ = """
"""
import os, md5, urllib2, rfc822, StringIO, gzip
cacheSubDir__ = ".cache"
if not os.path.exists(cacheSubDir__):
os.mkdir(cacheSubDir__)
class HTTPCache:
"""Represents a single cached URL"""
def __init__(self, url, headers={}):
self.info_ = None
self.content_ = None
self.fresh_ = False
self.url_ = url
#Create a non-clashing name for each url in the cache.
digest = md5.new(url).digest()
cacheFileName = "".join(["%02x" % (ord(c),) for c in digest])
self.cacheFullPath_ = os.path.join(cacheSubDir__, cacheFileName)
self.headers_ = {"Accept-Encoding": "gzip"}
self.headers_.update(headers)
if (os.path.exists(self.cacheFullPath_)):
# Load up the cached version and use it's 'ETag' header value, if it exists.
f = file(self.cacheFullPath_, "r")
self.info_ = rfc822.Message(f)
f.seek(0)
self.content_ = f.read().split('\n\n', 1)[1]
f.close()
request = urllib2.Request(url, None, self.headers_)
if self.info_.has_key('ETag'):
request.add_header("If-None-Match", self.info_['ETag'])
try:
response = urllib2.urlopen(request)
except urllib2.HTTPError, e:
if (304 == e.code):
self.fresh_ = True
else:
raise urllib2.HTTPError, e
else:
info = response.info()
for key in info.keys():
self.info_[key] = info[key]
self.content_ = self._writeContent(self.info_, response)
else:
# There isn't a cached version of this URL yet.
request = urllib2.Request(url, None, self.headers_)
response = urllib2.urlopen(request)
response.info()['Url'] = url
self.content_ = self._writeContent(response.info(), response)
self.info_ = response.info()
def content(self):
"""Get the content as a single string."""
return self.content_
def filename(self):
"""Get the full path file name of the cached file."""
return self.cacheFullPath_
def fresh(self):
"""Get the state of the cache; if true, the cached copy is
fresh; if false, it's stale."""
return self.fresh_
def info(self):
"""Returns and rfc822.Message for manipulating headers.
Note that you can use this to read headers but not
to add or change headers. Use the 'add_headers()' for
adding/changing header values permanently in the cache."""
return self.info_
def add_headers(self, headers):
"""Add/change header values in the cache.
Note that if the key/value pair you change is used
by HTTP then you risk the possibility that the value
will be over-written the next time content is retrieved
from that URL.
"""
for key in headers.keys():
self.info_[key] = headers[key]
f = file(self.cacheFullPath_, "w")
f.write(str(self.info_))
f.write("\n")
f.write(self.content_)
f.close()
def _writeContent(self, info, response):
f = file(self.cacheFullPath_, "w")
f.write(str(info))
f.write("\n")
content = ""
if response.info().get('content-encoding', None) == 'gzip':
import StringIO
zip_content = response.read()
content = gzip.GzipFile(fileobj=StringIO.StringIO(zip_content)).read()
f.write(content)
else:
content = response.read()
f.write(content)
f.close()
return content
if __name__ == '__main__':
# To run these unit tests you should have a web server installed on the local machine.
# Alter rootPath to point to the location of the root directory for said server.
import unittest
rootPath = 'c:/Apache/Apache2/htdocs'
def clearCache():
[os.unlink(os.path.join(cacheSubDir__, name)) for name in os.listdir(cacheSubDir__)]
def writeTargetFile():
f = file(os.path.join(rootPath, 'testContent.xml'), "w")
f.write(""" """)
f.close()
def writeTargetFileAlternate():
f = file(os.path.join(rootPath, 'testContent.xml'), "w")
f.write("""""")
f.close()
class BasicTest(unittest.TestCase):
def testCreation(self):
clearCache()
writeTargetFile()
cache = HTTPCache('http://127.0.0.1/testContent.xml')
content = cache.content()
info = cache.info()
self.assertEqual(info['Url'], 'http://127.0.0.1/testContent.xml')
self.assertEqual(cache.filename(), ".cache\827456fdc98606d568b6f03e09168738")
fileNames = os.listdir(cacheSubDir__)
self.assertEqual(len(fileNames), 1)
f = file(os.path.join(cacheSubDir__, fileNames[0]), "r")
orig_content = f.read()
self.assertNotEqual(orig_content.find('Url: http://127.0.0.1/testContent.xml'), -1)
f.close()
def testCachedGet(self):
clearCache()
writeTargetFile()
cache0 = HTTPCache('http://127.0.0.1/testContent.xml')
cache = HTTPCache('http://127.0.0.1/testContent.xml')
self.assertEqual(cache0.fresh(), False)
self.assertEqual(cache.fresh(), True)
content = cache.content()
info = cache.info()
self.assertEqual(info['Url'], 'http://127.0.0.1/testContent.xml')
def testChangedGet(self):
clearCache()
writeTargetFile()
cache = HTTPCache('http://127.0.0.1/testContent.xml')
content = cache.content()
info = cache.info()
self.assertEqual(info['Url'], 'http://127.0.0.1/testContent.xml')
writeTargetFileAlternate()
cache2 = HTTPCache('http://127.0.0.1/testContent.xml')
content2 = cache2.content()
info2 = cache2.info()
self.assertNotEqual(content, content2)
self.assertEqual("""""", content2)
def testPreserveChangedHeader(self):
"""Add a new header to a cached file. Then do a pull
where the content has been updating, the new cached
file should still contain the old added header."""
clearCache()
writeTargetFile()
cache = HTTPCache('http://127.0.0.1/testContent.xml')
content = cache.content()
info = cache.info()
self.assertEqual(info['Url'], 'http://127.0.0.1/testContent.xml')
cache.add_headers({"md5" : "03030eeeef33, 2034023feef, 23493208903"})
writeTargetFileAlternate()
cache2 = HTTPCache('http://127.0.0.1/testContent.xml')
content2 = cache2.content()
info2 = cache2.info()
self.assertNotEqual(content, content2)
self.assertEqual("""""", content2)
self.assertEqual(info2['md5'], "03030eeeef33, 2034023feef, 23493208903")
def testVeryLastCachedGetZip(self):
clearCache()
writeTargetFile()
cache = HTTPCache('http://diveintomark.org/')
content = cache.content()
info = cache.info()
self.assertEqual(info['Url'], 'http://diveintomark.org/')
self.assertEqual(info['content-encoding'], 'gzip')
unittest.main()