OANDA网站提供的REST API可以读取账户信息("https://api-fxtrade.oanda.com/v1/accounts),以及提供流报价(streaming,比如"https://stream-fxtrade.oanda.com/v1/prices?accountId=12345&instruments=AUD_CAD%2CAUD_CHF" )。
目前在XE7版本,使用Delphi提供的RestClient组件可以实现读取账户信息。但是,对于服务端的流数据,由于其传输方式是 chunked transfer encoding,按照账户数据读取方式则会在停在最后Excute无法获得返回的Json字符串。
代码如下:
function GetStreamingPrices:TQJson
var
....
begin
Result := TQJson.Create;
// Get
_RESTRequest.Method := rmGET;
//指定资源位置:'/prices'
_RESTRequest.Resource := TRequestSourceURITemplate.Prices;
//生成query串:accountId=12345&instruments=AUD_CAD%2CAUD_CHF
_RESTRequest.AddParameter(TQueryPairKey.AccountID,accountlist[0],pkGETorPOST);
_RESTRequest.AddParameter(TQueryPairKey.Instruments,'EUR_USD',pkGETorPOST);
//URL
._RestClient.BaseURL := 'https://stream-fxtrade.oanda.com/v1';
//执行
_RestRequest.Execute;
//解析为Json
Result.tryParse(_RESTResponse);
end;
so,请教,如何使用Delphi XE7 的RestClient 获取 chunked transfer encoding 的流数据?请不吝赐教。
多谢一楼 @caozhy 提供的信息,不过没有没有解决此问题。我再描述一下:
1、上面的代码用于正常的request-response过程,是没有问题的。
2、 问题出在,报价流的获取上。报价服务器在工作时段实际是持续传输,一直没有结束,采取chunked transfer encoding模式持续地向外传输当时的即时价格信息。使用上面代码,就会处在一直等待状态下。
因此,需要达到的目标就是:一,不能等到结束标志才去解析收到的字符串。二,即便间歇获取数据,如果中间的缓冲时间过长,也不能忍受。
用TIdeventstream方法效率更高:
ResponseEventStrem:TIdEventStream;
ResponseEventStrem.OnWrite := IdEventStreamWrite;
//这个自定义的过程替代了stream的write函数。
procedure TRatesStreamWorker.IdEventStreamWrite(const ABuffer: TIdBytes; AOffset: Integer; ACount: Integer; var VResult: Integer);
var
AJson : TQJson;
begin
AJson := TQJson.Create;
AJson.TryParse(BytesToString(ABuffer,Aoffset,ACount,enUTF8)); //buffer是收到的http的content
if AJson.Count >0 then Jsons.Add(AJson);
VResult := ACount; //已经写入流的字节数,这里直接返回为count
end;
http://www.cnblogs.com/xalion/p/3370459.html
http://www.ithao123.cn/content-3462426.html
花了一点事件搜索资料分析,解决了这个问题
1、关于chunked encoding transfer 模式:数据包虽然没有contentlength,但是有数据包长度信息的,位置在content的最前面。数据包的格式的详情chunked encoding transfer的包格式
2、在stackoverflow网站,有一篇2013年的贴子,也提到了相似的需求。有的建议是用从socket做起,作者最终采取了修改idhttp的iohandler中的readbytes和readstream两个函数,并使用继承封装自己的类来实现。
3、我跟踪idhttp实现发现,indy在处理chunked encoding 数据包的处理是每个包读取,并放入stream中,这个过程是没有问题的,并不存在readbyte函数要缓冲所有的数据包之后才送出给stream的问题,因此上面修改indy函数是没有必要的。
4、问题出在,对于报价流类型的数据,虽然每个数据包有长度,但是总长度是无限的,而indy在取数据时 ,总是要等待整个数据包全部传输完成,也就是最后的0长度的chunked包作为结束的标志,才会返回。
5、在RESTClient组件上,理论上RestReponse组件应该有一个Tstream类型的变量,用于存储从RestClient得到的网络包。不过我没有找到,因此采取indyhttp 组件来实现对chunked encoding transfer的读取。
6、实现的逻辑是,正常获取idhttp的流数据,通过idhttp的onwork事件触发读取操作,从stream中取出刚刚收到的数据包。stream中的数据读取完成后清空,idhttp继续向其填充数据,因此无需担心无限长度情况下的溢出。
7、这篇文章提到了indhttp接受无限长度流数据的更多细节。解决方法看起来更好: 使用定制write方法的Tstream。或者使用TIdstream,这个类型自带了一个onwrite event可以用于输出。
我的代码如下
//在idhttp的组件event上,添加事件关联。每次数据包进来这个事件就会触发
IdHTTP.OnWork := IdHTTPWork;
rocedure TRatesStreamWorker.IdHTTPWork(ASender: TObject; AWorkMode: TWorkMode; AWorkCount: Int64);
var
s:string;
AJson :TQJson;
begin
AJson := TQJson.Create;//用于解析包内容的对象
ResponseStringStream.Position :=0;
s:=ResponseStringStream.ReadString(ResponseStringStream.Size) ;
ResponseStringStream.Clear;
AJson.TryParse(s);
if AJson.Count >0 then jsons.Add(Ajson);
end;
procedure TForm1.ButtonGetStreamPricesClick(Sender: TObject);
var
AJson :TQJson;
begin
AJson := TQJson.Create;
//这是OTL并行组件提供的函数,用于异步开一个新的Thread,在后台持续读取价格流
Parallel.async(
procedure (const task: IOmniTask)
var
source:string;
begin
source := RatesWorker.RatesURL+'EUR_USD'; //资源请求字符串
RatesWorker.IdHTTP.Get(source,RatesWorker.ResponseStringStream); //请求数据
end);
Parallel.Async( //再开一个新的thread用于从stream取数据,避免阻塞主线程。循环方式比较笨,测试先这样了,以后改成消息触发
procedure
begin
while true do
begin
if RatesWorker.Jsons.Count >0 then
begin
AJson:= RatesWorker.Jsons.Extract(RatesWorker.Jsons.First); //取一条记录
Memo1.Lines.Add(AJson.ToString); //显示到memo。子线程访问主线程不妥,以后再改了
end;
end;
end);
end;
.....
`