Skip to content

Commit

Permalink
WARC writer support HTTP/2
Browse files Browse the repository at this point in the history
- HTTP headers: replace HTTP/2 and alike by HTTP/1.1 to
  ensure backward-compatibility for WARC readers, see
   iipc/warc-specifications#15
- store protocol versions and cipher suites in WARC headers
  WARC-Protocol and WARC-Cipher-Suite, see
   iipc/warc-specifications#42
   iipc/warc-specifications#86
- allow multiple WARC headers of the same name (WARC-Protocol
  may occur twice to hold the HTTP and TLS version)
  • Loading branch information
sebastian-nagel committed Jul 18, 2024
1 parent 39d731a commit 5f43692
Show file tree
Hide file tree
Showing 3 changed files with 210 additions and 44 deletions.
11 changes: 6 additions & 5 deletions src/java/org/commoncrawl/util/WarcCdxWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,13 @@ public WarcCdxWriter(OutputStream warcOut, OutputStream cdxOut,
public URI writeWarcRevisitRecord(final URI targetUri, final String ip,
final int httpStatusCode, final Date date, final URI warcinfoId,
final URI relatedId, final String warcProfile, final Date refersToDate,
final String payloadDigest, final String blockDigest, byte[] block,
final String payloadDigest, final String blockDigest,
String[] protocolVersions, String[] cipherSuites, byte[] block,
Content content) throws IOException {
long offset = countingOut.getByteCount();
URI recordId = super.writeWarcRevisitRecord(targetUri, ip, httpStatusCode,
date, warcinfoId, relatedId, warcProfile, refersToDate, payloadDigest,
blockDigest, block, content);
blockDigest, protocolVersions, cipherSuites, block, content);
long length = (countingOut.getByteCount() - offset);
writeCdxLine(targetUri, date, offset, length, payloadDigest, content, true,
null, null);
Expand All @@ -114,12 +115,12 @@ public URI writeWarcRevisitRecord(final URI targetUri, final String ip,
public URI writeWarcResponseRecord(final URI targetUri, final String ip,
final int httpStatusCode, final Date date, final URI warcinfoId,
final URI relatedId, final String payloadDigest, final String blockDigest,
final String truncated, final byte[] block, Content content)
throws IOException {
final String truncated, String[] protocolVersions, String[] cipherSuites,
final byte[] block, Content content) throws IOException {
long offset = countingOut.getByteCount();
URI recordId = super.writeWarcResponseRecord(targetUri, ip, httpStatusCode,
date, warcinfoId, relatedId, payloadDigest, blockDigest, truncated,
block, content);
protocolVersions, cipherSuites, block, content);
long length = (countingOut.getByteCount() - offset);
String redirectLocation = null;
if (isRedirect(httpStatusCode)) {
Expand Down
109 changes: 94 additions & 15 deletions src/java/org/commoncrawl/util/WarcRecordWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,15 @@ class WarcRecordWriter extends RecordWriter<Text, WarcCapture> {
protected static final String X_HIDE_HEADER = "X-Crawler-";
public static final String WARC_WRITER_COUNTER_GROUP = "WARC-Writer";

protected static final Pattern STATUS_LINE_PATTERN = Pattern
.compile("^HTTP/1\\.[01] [0-9]{3}(?: .*)?$");
protected static final Pattern WS_PATTERN = Pattern.compile("\\s+");
protected static final Pattern HTTP_VERSION_PATTERN = Pattern
.compile("^HTTP/1\\.[01]$");
protected static final Pattern HTTP_STATUS_CODE_PATTERN = Pattern
.compile("^[0-9]{3}$");
protected static final String HTTP_VERSION_FALLBACK = "HTTP/1.1";

private TaskAttemptContext context;
private DataOutputStream warcOut;
private WarcWriter warcWriter;
Expand Down Expand Up @@ -291,20 +300,54 @@ public static String formatHttpHeaders(String statusLine, List<String> headers)
}

/**
* Fix the HTTP version in the status line - replace <code>HTTP/2</code>
* by <code>HTTP/1.1</code> ({@link this#HTTP_VERSION_FALLBACK}}.
*
* See also {@link #fixHttpHeaders(String, int)}
*
* @param headers
* HTTP 1.1 or 1.0 request header string, CR-LF-separated lines,
* first line is the status line
* @return safe HTTP request header
*/
public static String fixHttpRequestHeaders(String headers) {
String http2version = " HTTP/2\r\n";
int pos = headers.indexOf(http2version);
if (pos >= 0) {
StringBuilder replacement = new StringBuilder();
String statusLinePrefix = headers.substring(0, pos);
if (statusLinePrefix.indexOf(CRLF) > 0) {
// match in subsequent header lines (should not or rarely happen)
return headers;
}
replacement.append(statusLinePrefix);
replacement.append(' ');
replacement.append(HTTP_VERSION_FALLBACK);
replacement.append(CRLF);
replacement.append(headers.substring(pos + http2version.length()));
return replacement.toString();
}
return headers;
}

/**
* Modify verbatim HTTP response headers: fix, remove or replace headers
* <code>Content-Length</code>, <code>Content-Encoding</code> and
* <code>Transfer-Encoding</code> which may confuse WARC readers. Ensure that
* returned header end with a single empty line (<code>\r\n\r\n</code>).
*
* If the HTTP version in the status line is <code>HTTP/2</code>, replace it
* by <code>HTTP/1.1</code> ({@link this#HTTP_VERSION_FALLBACK}}.
*
* @param headers
* HTTP 1.1 or 1.0 response header string, CR-LF-separated lines,
* first line is status line
* first line is the status line
* @return safe HTTP response header
*/
public static String fixHttpHeaders(String headers, int contentLength) {
int start = 0, lineEnd = 0, last = 0, trailingCrLf= 0;
boolean hasContentLength = false;
StringBuilder replace = new StringBuilder();
StringBuilder replacement = new StringBuilder();
while (start < headers.length()) {
lineEnd = headers.indexOf(CRLF, start);
trailingCrLf = 1;
Expand All @@ -323,7 +366,32 @@ public static String fixHttpHeaders(String headers, int contentLength) {
boolean valid = true;
if (start == 0) {
// status line (without colon)
// TODO: http/2
final String statusLine = headers.substring(0, lineEnd);
if (!STATUS_LINE_PATTERN.matcher(statusLine).matches()) {
final String[] parts = WS_PATTERN
.split(headers.substring(0, lineEnd), 3);
if (parts.length < 2
|| !HTTP_STATUS_CODE_PATTERN.matcher(parts[1]).matches()) {
// nothing we can do here, leave status line as is
LOG.warn(
"WARC parsers may fail on non-standard HTTP 1.0 / 1.1 response status line: {}",
statusLine);
} else {
if (HTTP_VERSION_PATTERN.matcher(parts[0]).matches()) {
replacement.append(parts[0]);
} else {
replacement.append(HTTP_VERSION_FALLBACK);
}
replacement.append(' ');
replacement.append(parts[1]); // status code
replacement.append(' ');
if (parts.length == 3) {
replacement.append(parts[2]); // message
}
replacement.append(CRLF);
last = lineEnd + 2 * trailingCrLf;
}
}
} else if ((lineEnd + 4) == headers.length()
&& headers.endsWith(CRLF + CRLF)) {
// ok, trailing empty line
Expand All @@ -339,7 +407,7 @@ public static String fixHttpHeaders(String headers, int contentLength) {
}
if (!valid) {
if (last < start) {
replace.append(headers.substring(last, start));
replacement.append(headers.substring(last, start));
}
last = lineEnd + 2 * trailingCrLf;
}
Expand Down Expand Up @@ -367,18 +435,18 @@ public static String fixHttpHeaders(String headers, int contentLength) {
}
if (needsFix) {
if (last < start) {
replace.append(headers.substring(last, start));
replacement.append(headers.substring(last, start));
}
last = lineEnd + 2 * trailingCrLf;
replace.append(X_HIDE_HEADER)
replacement.append(X_HIDE_HEADER)
.append(headers.substring(start, lineEnd + 2 * trailingCrLf));
if (trailingCrLf == 0) {
replace.append(CRLF);
replacement.append(CRLF);
trailingCrLf = 1;
}
if (name.equalsIgnoreCase("content-length")) {
// add effective uncompressed and unchunked length of content
replace.append("Content-Length").append(COLONSP)
replacement.append("Content-Length").append(COLONSP)
.append(contentLength).append(CRLF);
}
}
Expand All @@ -388,17 +456,17 @@ public static String fixHttpHeaders(String headers, int contentLength) {
if (last > 0 || trailingCrLf != 2 || !hasContentLength) {
if (last < headers.length()) {
// append trailing headers
replace.append(headers.substring(last));
replacement.append(headers.substring(last));
}
if (!hasContentLength) {
replace.append("Content-Length").append(COLONSP).append(contentLength)
replacement.append("Content-Length").append(COLONSP).append(contentLength)
.append(CRLF);
}
while (trailingCrLf < 2) {
replace.append(CRLF);
replacement.append(CRLF);
trailingCrLf++;
}
return replace.toString();
return replacement.toString();
}
return headers;
}
Expand Down Expand Up @@ -558,6 +626,8 @@ public synchronized void write(Text key, WarcCapture value)
int httpStatusCode = 200;
String fetchDuration = null;
String truncatedReason = null;
String[] protocolVersions = null;
String[] cipherSuites = null;

if (value.datum != null) {
date = new Date(value.datum.getFetchTime());
Expand Down Expand Up @@ -666,6 +736,12 @@ public synchronized void write(Text key, WarcCapture value)
case Response.TRUNCATED_CONTENT_REASON:
truncatedReason = val;
break;
case Response.PROTOCOL_VERSIONS:
protocolVersions = val.split(",");
break;
case Response.CIPHER_SUITES:
cipherSuites = val.split(",");
break;
case Nutch.SEGMENT_NAME_KEY:
case Nutch.FETCH_STATUS_KEY:
case Nutch.SCORE_KEY:
Expand Down Expand Up @@ -739,7 +815,9 @@ public synchronized void write(Text key, WarcCapture value)
URI requestId = null;
if (verbatimRequestHeaders != null) {
requestId = writer.writeWarcRequestRecord(targetUri, ip, date, infoId,
verbatimRequestHeaders.getBytes(StandardCharsets.UTF_8));
protocolVersions, cipherSuites,
fixHttpRequestHeaders(verbatimRequestHeaders)
.getBytes(StandardCharsets.UTF_8));
}

if (generateCdx) {
Expand Down Expand Up @@ -804,7 +882,8 @@ public synchronized void write(Text key, WarcCapture value)
String payloadDigest = null;
writer.writeWarcRevisitRecord(targetUri, ip, httpStatusCode, date, infoId,
requestId, WarcWriter.PROFILE_REVISIT_NOT_MODIFIED, lastModifiedDate,
payloadDigest, blockDigest, responseHeaderBytes, value.content);
payloadDigest, blockDigest, protocolVersions, cipherSuites,
responseHeaderBytes, value.content);
} else {
StringBuilder responsesb = new StringBuilder(4096);
responsesb.append(responseHeaders);
Expand All @@ -822,7 +901,7 @@ public synchronized void write(Text key, WarcCapture value)
String blockDigest = getSha1DigestWithAlg(responseBytes);
URI responseId = writer.writeWarcResponseRecord(targetUri, ip,
httpStatusCode, date, infoId, requestId, payloadDigest, blockDigest,
truncatedReason, responseBytes, value.content);
truncatedReason, protocolVersions, cipherSuites, responseBytes, value.content);

// Write metadata record
StringBuilder metadatasb = new StringBuilder(4096);
Expand Down
Loading

0 comments on commit 5f43692

Please sign in to comment.